Last active
March 4, 2018 21:09
-
-
Save petermolnar/78f748231306632d548b19f22d158471 to your computer and use it in GitHub Desktop.
Python script to convert varios logs (MSN Plus!, Skype v2, Trillian v3, etc) to Pidgin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sqlite3 | |
import logging | |
import re | |
import glob | |
import sys | |
import hashlib | |
import arrow | |
import argparse | |
from bs4 import BeautifulSoup | |
import csv | |
from pprint import pprint | |
def logfilename(dt, nulltime=False): | |
if nulltime: | |
t = '000000' | |
else: | |
t = dt.format('HHmmss') | |
return "%s.%s%s%s.txt" % ( | |
dt.format("YYYY-MM-DD"), | |
t, | |
dt.datetime.strftime("%z"), | |
dt.datetime.strftime("%Z") | |
) | |
def logappend(fpath,dt,sender,msg): | |
logging.debug('appending log: %s' % (fpath)) | |
with open(fpath, 'at') as f: | |
f.write("(%s) %s: %s\n" % ( | |
dt.format('YYYY-MM-DD HH:mm:ss'), | |
sender, | |
msg | |
)) | |
os.utime(fpath, (dt.timestamp, dt.timestamp)) | |
os.utime(os.path.dirname(fpath), (dt.timestamp, dt.timestamp)) | |
def logcreate(fpath,contact, dt,account,plugin): | |
logging.debug('creating converted log: %s' % (fpath)) | |
if not os.path.exists(fpath): | |
with open(fpath, 'wt') as f: | |
f.write("Conversation with %s at %s on %s (%s)\n" % ( | |
contact, | |
dt.format('ddd dd MMM YYYY hh:mm:ss A ZZZ'), | |
account, | |
plugin | |
)) | |
def do_facebook(account, logpathbase): | |
plugin = 'facebook' | |
# the source data is from a facebook export and pidgin buddy list xml | |
# after the alias was set for every facebook user by hand | |
# the file contains lines constructed: | |
# UID\tDisplay Nice Name | |
lookupf = os.path.expanduser('~/tmp/facebook_lookup.csv') | |
lookup = {} | |
with open(lookupf, newline='') as csvfile: | |
reader = csv.reader(csvfile, delimiter='\t') | |
for row in reader: | |
lookup.update({row[1]: row[0]}) | |
# the csv file for the messages is from the Facebook Data export | |
# converted with https://pypi.python.org/pypi/fbchat_archive_parser | |
# as: fbcap messages.htm -f csv > ~/tmp/facebook-messages.csv | |
dataf = os.path.expanduser('~/tmp/facebook-messages.csv') | |
reader = csv.DictReader(open(dataf),skipinitialspace=True) | |
for row in reader: | |
# skip conversations for now because I don't have any way of getting | |
# the conversation id | |
if ', ' in row['thread']: | |
continue | |
# the seconds are sometimes missing from the timestamps | |
try: | |
dt = arrow.get(row.get('date'), 'YYYY-MM-DDTHH:mmZZ') | |
except: | |
try: | |
dt = arrow.get(row.get('date'), 'YYYY-MM-DDTHH:mm:ssZZ') | |
except: | |
logging.error('failed to parse entry: %s', row) | |
dt = dt.to('UTC') | |
contact = lookup.get(row.get('thread')) | |
if not contact: | |
continue | |
msg = row.get('message') | |
sender = row.get('sender') | |
fpath = os.path.join( | |
logpathbase, | |
plugin, | |
account, | |
contact, | |
logfilename(dt, nulltime=True) | |
) | |
if not os.path.isdir(os.path.dirname(fpath)): | |
os.makedirs(os.path.dirname(fpath)) | |
logcreate(fpath, contact, dt, account, plugin) | |
logappend(fpath, dt, sender, msg) | |
def do_zncfixed(znclogs, logpathbase, znctz): | |
# I manually organised the ZNC logs into pidgin-like | |
# plugin/account/contact/logfiles.log | |
# structure before parsing them | |
LINESPLIT = re.compile( | |
r'^\[(?P<hour>[0-9]+):(?P<minute>[0-9]+):(?P<second>[0-9]+)\]\s+' | |
r'<(?P<sender>.*?)>\s+(?P<msg>.*)$' | |
) | |
searchin = os.path.join( | |
znclogs, | |
'**', | |
'*.log' | |
) | |
logs = glob.glob(searchin, recursive=True) | |
for log in logs: | |
contact = os.path.basename(os.path.dirname(log)) | |
account = os.path.basename(os.path.dirname(os.path.dirname(log))) | |
plugin = os.path.basename(os.path.dirname(os.path.dirname(os.path.dirname(log)))) | |
logging.info('converting log file: %s' % (log)) | |
dt = arrow.get(os.path.basename(log).replace('.log', ''), 'YYYY-MM-DD') | |
dt = dt.replace(tzinfo=znctz) | |
if contact.startswith("#"): | |
fname = "%s.chat" % (contact) | |
else: | |
fname = contact | |
fpath = os.path.join( | |
logpathbase, | |
plugin, | |
account, | |
fname, | |
logfilename(dt) | |
) | |
if not os.path.isdir(os.path.dirname(fpath)): | |
os.makedirs(os.path.dirname(fpath)) | |
with open(log, 'rb') as f: | |
for line in f: | |
line = line.decode('utf8', 'ignore') | |
match = LINESPLIT.match(line) | |
if not match: | |
continue | |
dt = dt.replace( | |
hour=int(match.group('hour')), | |
minute=int(match.group('minute')), | |
second=int(match.group('second')) | |
) | |
logcreate(fpath, contact, dt, account, plugin) | |
logappend(fpath, dt, match.group('sender'), match.group('msg')) | |
def do_msnplus(msgpluslogs, logpathbase, msgplustz): | |
# from bs4 import BeautifulSoup | |
NOPAR = re.compile(r'\((.*)\)') | |
NOCOLON = re.compile(r'(.*):?') | |
searchin = os.path.join( | |
msgpluslogs, | |
'**', | |
'*.html' | |
) | |
logs = glob.glob(searchin, recursive=True) | |
plugin = 'msn' | |
for log in logs: | |
logging.info('converting log file: %s' % (log)) | |
contact = os.path.basename(os.path.dirname(log)) | |
with open(log, 'rt', encoding='UTF-16') as f: | |
html = BeautifulSoup(f.read(), "html.parser") | |
account = html.find_all('li', attrs={'class':'in'}, limit=1)[0] | |
account = NOPAR.sub('\g<1>', account.span.string) | |
for session in html.findAll(attrs={'class': 'mplsession'}): | |
dt = arrow.get( | |
session.get('id').replace('Session_', ''), | |
'YYYY-MM-DDTHH-mm-ss' | |
) | |
dt = dt.replace(tzinfo=msgplustz) | |
seconds = int(dt.format('s')) | |
fpath = os.path.join( | |
logpathbase, | |
plugin, | |
account, | |
contact, | |
logfilename(dt) | |
) | |
if not os.path.isdir(os.path.dirname(fpath)): | |
os.makedirs(os.path.dirname(fpath)) | |
for line in session.findAll('tr'): | |
if seconds == 59: | |
seconds = 0 | |
else: | |
seconds = seconds + 1 | |
tspan = line.find(attrs={'class': 'time'}).extract() | |
time = tspan.string.replace('(', '').replace(')','').strip().split(':') | |
sender = line.find('th').string | |
if not sender: | |
continue | |
sender = sender.strip().split(':')[0] | |
msg = line.find('td').get_text() | |
mindt = dt.replace( | |
hour=int(time[0]), | |
minute=int(time[1]), | |
second=int(seconds) | |
) | |
logcreate(fpath, contact, dt, account, plugin) | |
logappend(fpath, mindt, sender, msg) | |
def do_trillian(trillianlogs, logpathbase, trilliantz): | |
SPLIT_SESSIONS = re.compile( | |
r'^Session Start\s+\((?P<participants>.*)?\):\s+(?P<timestamp>[^\n]+)' | |
r'\n(?P<session>(?:.|\n)*?)(?=Session)', | |
re.MULTILINE | |
) | |
SPLIT_MESSAGES = re.compile( | |
r'\[(?P<time>[^\]]+)\]\s+(?P<sender>.*?):\s+' | |
r'(?P<msg>(?:.|\n)*?)(?=\n\[|$)' | |
) | |
searchin = os.path.join( | |
trillianlogs, | |
'**', | |
'*.log' | |
) | |
logs = glob.glob(searchin, recursive=True) | |
for log in logs: | |
if 'Channel' in log: | |
logging.warn( | |
"Group conversations are not supported yet, skipping %s" % log | |
) | |
continue | |
logging.info('converting log file: %s' % (log)) | |
contact = os.path.basename(log).replace('.log', '') | |
plugin = os.path.basename(os.path.dirname(os.path.dirname(log))).lower() | |
with open(log, 'rb') as f: | |
c = f.read().decode('utf8', 'ignore') | |
for session in SPLIT_SESSIONS.findall(c): | |
participants, timestamp, session = session | |
logging.debug('converting session starting at: %s' % (timestamp)) | |
participants = participants.split(':') | |
account = participants[0] | |
dt = arrow.get(timestamp, 'ddd MMM DD HH:mm:ss YYYY') | |
dt = dt.replace(tzinfo=trilliantz) | |
fpath = os.path.join( | |
logpathbase, | |
plugin, | |
participants[0], | |
contact, | |
logfilename(dt) | |
) | |
if not os.path.isdir(os.path.dirname(fpath)): | |
os.makedirs(os.path.dirname(fpath)) | |
seconds = int(dt.format('s')) | |
curr_mindt = dt | |
for line in SPLIT_MESSAGES.findall(session): | |
# this is a fix for ancient trillian logs where seconds | |
# were missing | |
if seconds == 59: | |
seconds = 0 | |
else: | |
seconds = seconds + 1 | |
time, sender, msg = line | |
try: | |
mindt = arrow.get(time, | |
'YYYY.MM.DD HH:mm:ss') | |
except: | |
time = time.split(':') | |
mindt = dt.replace( | |
hour=int(time[0]), | |
minute=int(time[1]), | |
second=int(seconds) | |
) | |
# creating the filw with the header has to be here to | |
# avoid empty or status-messages only files | |
logcreate(fpath, participants[1], dt, account, plugin) | |
logappend(fpath, mindt, sender, msg) | |
if params.get('cleanup'): | |
print('deleting old log: %s' % (log)) | |
os.unlink(log) | |
def do_skype(skypedbpath, logpathbase): | |
db = sqlite3.connect(skypedbpath) | |
cursor = db.cursor() | |
cursor.execute('''SELECT `skypename` from Accounts''') | |
accounts = cursor.fetchall() | |
for account in accounts: | |
account = account[0] | |
cursor.execute(''' | |
SELECT | |
`timestamp`, | |
`dialog_partner`, | |
`author`, | |
`from_dispname`, | |
`body_xml` | |
FROM | |
`Messages` | |
WHERE | |
`chatname` LIKE ? | |
ORDER BY | |
`timestamp` ASC | |
''', ('%' + account + '%',)) | |
messages = cursor.fetchall() | |
for r in messages: | |
dt = arrow.get(r[0]) | |
dt = dt.replace(tzinfo='UTC') | |
fpath = os.path.join( | |
logpathbase, | |
account, | |
r[1], | |
logfilename(dt, nulltime=True) | |
) | |
if not os.path.isdir(os.path.dirname(fpath)): | |
os.makedirs(os.path.dirname(fpath)) | |
logcreate(fpath, r[1], dt, account, 'skype') | |
logappend(fpath, dt, r[3], r[4]) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Parameters for Skype v2 logs to Pidgin logs converter') | |
parser.add_argument( | |
'--skype_db', | |
default=os.path.expanduser('~/.skype/main.db'), | |
help='absolute path to skype main.db' | |
) | |
parser.add_argument( | |
'--pidgin_logs', | |
default=os.path.expanduser('~/.purple/logs/skype'), | |
help='absolute path to Pidgin skype logs' | |
) | |
parser.add_argument( | |
'--facebook_account', | |
default='', | |
help='facebook account name' | |
) | |
parser.add_argument( | |
'--loglevel', | |
default='warning', | |
help='change loglevel' | |
) | |
for allowed in ['skype', 'trillian', 'msnplus', 'znc', 'facebook']: | |
parser.add_argument( | |
'--%s' % allowed, | |
action='store_true', | |
default=False, | |
help='convert %s logs' % allowed | |
) | |
if allowed != 'skype' or allowed != 'facebook': | |
parser.add_argument( | |
'--%s_logs' % allowed, | |
default=os.path.expanduser('~/.%s/logs' % allowed), | |
help='absolute path to %s logs' % allowed | |
) | |
parser.add_argument( | |
'--%s_timezone' % allowed, | |
default='UTC', | |
help='timezone name for %s logs (eg. US/Pacific)' % allowed | |
) | |
params = vars(parser.parse_args()) | |
# remove the rest of the potential loggers | |
while len(logging.root.handlers) > 0: | |
logging.root.removeHandler(logging.root.handlers[-1]) | |
LLEVEL = { | |
'critical': 50, | |
'error': 40, | |
'warning': 30, | |
'info': 20, | |
'debug': 10 | |
} | |
logging.basicConfig( | |
level=LLEVEL[params.get('loglevel')], | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
if params.get('facebook'): | |
logging.info('facebook enabled') | |
do_facebook( | |
params.get('facebook_account'), | |
params.get('pidgin_logs') | |
) | |
if params.get('skype'): | |
logging.info('Skype enabled; parsing skype logs') | |
do_skype( | |
params.get('skype_db'), | |
params.get('pidgin_logs') | |
) | |
if params.get('trillian'): | |
logging.info('Trillian enabled; parsing trillian logs') | |
do_trillian( | |
params.get('trillian_logs'), | |
params.get('pidgin_logs'), | |
params.get('trillian_timezone'), | |
) | |
if params.get('msnplus'): | |
logging.info('MSN Plus! enabled; parsing logs') | |
do_msnplus( | |
params.get('msnplus_logs'), | |
params.get('pidgin_logs'), | |
params.get('msnplus_timezone'), | |
) | |
if params.get('znc'): | |
logging.info('ZNC enabled; parsing znc logs') | |
do_zncfixed( | |
params.get('znc_logs'), | |
params.get('pidgin_logs'), | |
params.get('znc_timezone'), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment