Skip to content

Instantly share code, notes, and snippets.

@petermolnar
Last active March 4, 2018 21:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save petermolnar/78f748231306632d548b19f22d158471 to your computer and use it in GitHub Desktop.
Save petermolnar/78f748231306632d548b19f22d158471 to your computer and use it in GitHub Desktop.
Python script to convert varios logs (MSN Plus!, Skype v2, Trillian v3, etc) to Pidgin
import os
import sqlite3
import logging
import re
import glob
import sys
import hashlib
import arrow
import argparse
from bs4 import BeautifulSoup
import csv
from pprint import pprint
def logfilename(dt, nulltime=False):
if nulltime:
t = '000000'
else:
t = dt.format('HHmmss')
return "%s.%s%s%s.txt" % (
dt.format("YYYY-MM-DD"),
t,
dt.datetime.strftime("%z"),
dt.datetime.strftime("%Z")
)
def logappend(fpath,dt,sender,msg):
logging.debug('appending log: %s' % (fpath))
with open(fpath, 'at') as f:
f.write("(%s) %s: %s\n" % (
dt.format('YYYY-MM-DD HH:mm:ss'),
sender,
msg
))
os.utime(fpath, (dt.timestamp, dt.timestamp))
os.utime(os.path.dirname(fpath), (dt.timestamp, dt.timestamp))
def logcreate(fpath,contact, dt,account,plugin):
logging.debug('creating converted log: %s' % (fpath))
if not os.path.exists(fpath):
with open(fpath, 'wt') as f:
f.write("Conversation with %s at %s on %s (%s)\n" % (
contact,
dt.format('ddd dd MMM YYYY hh:mm:ss A ZZZ'),
account,
plugin
))
def do_facebook(account, logpathbase):
plugin = 'facebook'
# the source data is from a facebook export and pidgin buddy list xml
# after the alias was set for every facebook user by hand
# the file contains lines constructed:
# UID\tDisplay Nice Name
lookupf = os.path.expanduser('~/tmp/facebook_lookup.csv')
lookup = {}
with open(lookupf, newline='') as csvfile:
reader = csv.reader(csvfile, delimiter='\t')
for row in reader:
lookup.update({row[1]: row[0]})
# the csv file for the messages is from the Facebook Data export
# converted with https://pypi.python.org/pypi/fbchat_archive_parser
# as: fbcap messages.htm -f csv > ~/tmp/facebook-messages.csv
dataf = os.path.expanduser('~/tmp/facebook-messages.csv')
reader = csv.DictReader(open(dataf),skipinitialspace=True)
for row in reader:
# skip conversations for now because I don't have any way of getting
# the conversation id
if ', ' in row['thread']:
continue
# the seconds are sometimes missing from the timestamps
try:
dt = arrow.get(row.get('date'), 'YYYY-MM-DDTHH:mmZZ')
except:
try:
dt = arrow.get(row.get('date'), 'YYYY-MM-DDTHH:mm:ssZZ')
except:
logging.error('failed to parse entry: %s', row)
dt = dt.to('UTC')
contact = lookup.get(row.get('thread'))
if not contact:
continue
msg = row.get('message')
sender = row.get('sender')
fpath = os.path.join(
logpathbase,
plugin,
account,
contact,
logfilename(dt, nulltime=True)
)
if not os.path.isdir(os.path.dirname(fpath)):
os.makedirs(os.path.dirname(fpath))
logcreate(fpath, contact, dt, account, plugin)
logappend(fpath, dt, sender, msg)
def do_zncfixed(znclogs, logpathbase, znctz):
# I manually organised the ZNC logs into pidgin-like
# plugin/account/contact/logfiles.log
# structure before parsing them
LINESPLIT = re.compile(
r'^\[(?P<hour>[0-9]+):(?P<minute>[0-9]+):(?P<second>[0-9]+)\]\s+'
r'<(?P<sender>.*?)>\s+(?P<msg>.*)$'
)
searchin = os.path.join(
znclogs,
'**',
'*.log'
)
logs = glob.glob(searchin, recursive=True)
for log in logs:
contact = os.path.basename(os.path.dirname(log))
account = os.path.basename(os.path.dirname(os.path.dirname(log)))
plugin = os.path.basename(os.path.dirname(os.path.dirname(os.path.dirname(log))))
logging.info('converting log file: %s' % (log))
dt = arrow.get(os.path.basename(log).replace('.log', ''), 'YYYY-MM-DD')
dt = dt.replace(tzinfo=znctz)
if contact.startswith("#"):
fname = "%s.chat" % (contact)
else:
fname = contact
fpath = os.path.join(
logpathbase,
plugin,
account,
fname,
logfilename(dt)
)
if not os.path.isdir(os.path.dirname(fpath)):
os.makedirs(os.path.dirname(fpath))
with open(log, 'rb') as f:
for line in f:
line = line.decode('utf8', 'ignore')
match = LINESPLIT.match(line)
if not match:
continue
dt = dt.replace(
hour=int(match.group('hour')),
minute=int(match.group('minute')),
second=int(match.group('second'))
)
logcreate(fpath, contact, dt, account, plugin)
logappend(fpath, dt, match.group('sender'), match.group('msg'))
def do_msnplus(msgpluslogs, logpathbase, msgplustz):
# from bs4 import BeautifulSoup
NOPAR = re.compile(r'\((.*)\)')
NOCOLON = re.compile(r'(.*):?')
searchin = os.path.join(
msgpluslogs,
'**',
'*.html'
)
logs = glob.glob(searchin, recursive=True)
plugin = 'msn'
for log in logs:
logging.info('converting log file: %s' % (log))
contact = os.path.basename(os.path.dirname(log))
with open(log, 'rt', encoding='UTF-16') as f:
html = BeautifulSoup(f.read(), "html.parser")
account = html.find_all('li', attrs={'class':'in'}, limit=1)[0]
account = NOPAR.sub('\g<1>', account.span.string)
for session in html.findAll(attrs={'class': 'mplsession'}):
dt = arrow.get(
session.get('id').replace('Session_', ''),
'YYYY-MM-DDTHH-mm-ss'
)
dt = dt.replace(tzinfo=msgplustz)
seconds = int(dt.format('s'))
fpath = os.path.join(
logpathbase,
plugin,
account,
contact,
logfilename(dt)
)
if not os.path.isdir(os.path.dirname(fpath)):
os.makedirs(os.path.dirname(fpath))
for line in session.findAll('tr'):
if seconds == 59:
seconds = 0
else:
seconds = seconds + 1
tspan = line.find(attrs={'class': 'time'}).extract()
time = tspan.string.replace('(', '').replace(')','').strip().split(':')
sender = line.find('th').string
if not sender:
continue
sender = sender.strip().split(':')[0]
msg = line.find('td').get_text()
mindt = dt.replace(
hour=int(time[0]),
minute=int(time[1]),
second=int(seconds)
)
logcreate(fpath, contact, dt, account, plugin)
logappend(fpath, mindt, sender, msg)
def do_trillian(trillianlogs, logpathbase, trilliantz):
SPLIT_SESSIONS = re.compile(
r'^Session Start\s+\((?P<participants>.*)?\):\s+(?P<timestamp>[^\n]+)'
r'\n(?P<session>(?:.|\n)*?)(?=Session)',
re.MULTILINE
)
SPLIT_MESSAGES = re.compile(
r'\[(?P<time>[^\]]+)\]\s+(?P<sender>.*?):\s+'
r'(?P<msg>(?:.|\n)*?)(?=\n\[|$)'
)
searchin = os.path.join(
trillianlogs,
'**',
'*.log'
)
logs = glob.glob(searchin, recursive=True)
for log in logs:
if 'Channel' in log:
logging.warn(
"Group conversations are not supported yet, skipping %s" % log
)
continue
logging.info('converting log file: %s' % (log))
contact = os.path.basename(log).replace('.log', '')
plugin = os.path.basename(os.path.dirname(os.path.dirname(log))).lower()
with open(log, 'rb') as f:
c = f.read().decode('utf8', 'ignore')
for session in SPLIT_SESSIONS.findall(c):
participants, timestamp, session = session
logging.debug('converting session starting at: %s' % (timestamp))
participants = participants.split(':')
account = participants[0]
dt = arrow.get(timestamp, 'ddd MMM DD HH:mm:ss YYYY')
dt = dt.replace(tzinfo=trilliantz)
fpath = os.path.join(
logpathbase,
plugin,
participants[0],
contact,
logfilename(dt)
)
if not os.path.isdir(os.path.dirname(fpath)):
os.makedirs(os.path.dirname(fpath))
seconds = int(dt.format('s'))
curr_mindt = dt
for line in SPLIT_MESSAGES.findall(session):
# this is a fix for ancient trillian logs where seconds
# were missing
if seconds == 59:
seconds = 0
else:
seconds = seconds + 1
time, sender, msg = line
try:
mindt = arrow.get(time,
'YYYY.MM.DD HH:mm:ss')
except:
time = time.split(':')
mindt = dt.replace(
hour=int(time[0]),
minute=int(time[1]),
second=int(seconds)
)
# creating the filw with the header has to be here to
# avoid empty or status-messages only files
logcreate(fpath, participants[1], dt, account, plugin)
logappend(fpath, mindt, sender, msg)
if params.get('cleanup'):
print('deleting old log: %s' % (log))
os.unlink(log)
def do_skype(skypedbpath, logpathbase):
db = sqlite3.connect(skypedbpath)
cursor = db.cursor()
cursor.execute('''SELECT `skypename` from Accounts''')
accounts = cursor.fetchall()
for account in accounts:
account = account[0]
cursor.execute('''
SELECT
`timestamp`,
`dialog_partner`,
`author`,
`from_dispname`,
`body_xml`
FROM
`Messages`
WHERE
`chatname` LIKE ?
ORDER BY
`timestamp` ASC
''', ('%' + account + '%',))
messages = cursor.fetchall()
for r in messages:
dt = arrow.get(r[0])
dt = dt.replace(tzinfo='UTC')
fpath = os.path.join(
logpathbase,
account,
r[1],
logfilename(dt, nulltime=True)
)
if not os.path.isdir(os.path.dirname(fpath)):
os.makedirs(os.path.dirname(fpath))
logcreate(fpath, r[1], dt, account, 'skype')
logappend(fpath, dt, r[3], r[4])
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Parameters for Skype v2 logs to Pidgin logs converter')
parser.add_argument(
'--skype_db',
default=os.path.expanduser('~/.skype/main.db'),
help='absolute path to skype main.db'
)
parser.add_argument(
'--pidgin_logs',
default=os.path.expanduser('~/.purple/logs/skype'),
help='absolute path to Pidgin skype logs'
)
parser.add_argument(
'--facebook_account',
default='',
help='facebook account name'
)
parser.add_argument(
'--loglevel',
default='warning',
help='change loglevel'
)
for allowed in ['skype', 'trillian', 'msnplus', 'znc', 'facebook']:
parser.add_argument(
'--%s' % allowed,
action='store_true',
default=False,
help='convert %s logs' % allowed
)
if allowed != 'skype' or allowed != 'facebook':
parser.add_argument(
'--%s_logs' % allowed,
default=os.path.expanduser('~/.%s/logs' % allowed),
help='absolute path to %s logs' % allowed
)
parser.add_argument(
'--%s_timezone' % allowed,
default='UTC',
help='timezone name for %s logs (eg. US/Pacific)' % allowed
)
params = vars(parser.parse_args())
# remove the rest of the potential loggers
while len(logging.root.handlers) > 0:
logging.root.removeHandler(logging.root.handlers[-1])
LLEVEL = {
'critical': 50,
'error': 40,
'warning': 30,
'info': 20,
'debug': 10
}
logging.basicConfig(
level=LLEVEL[params.get('loglevel')],
format='%(asctime)s - %(levelname)s - %(message)s'
)
if params.get('facebook'):
logging.info('facebook enabled')
do_facebook(
params.get('facebook_account'),
params.get('pidgin_logs')
)
if params.get('skype'):
logging.info('Skype enabled; parsing skype logs')
do_skype(
params.get('skype_db'),
params.get('pidgin_logs')
)
if params.get('trillian'):
logging.info('Trillian enabled; parsing trillian logs')
do_trillian(
params.get('trillian_logs'),
params.get('pidgin_logs'),
params.get('trillian_timezone'),
)
if params.get('msnplus'):
logging.info('MSN Plus! enabled; parsing logs')
do_msnplus(
params.get('msnplus_logs'),
params.get('pidgin_logs'),
params.get('msnplus_timezone'),
)
if params.get('znc'):
logging.info('ZNC enabled; parsing znc logs')
do_zncfixed(
params.get('znc_logs'),
params.get('pidgin_logs'),
params.get('znc_timezone'),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment