Skip to content

Instantly share code, notes, and snippets.

@shellexy
Created November 8, 2018 17:25
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shellexy/8d3e8a49bcd46915da48b403580b74ca to your computer and use it in GitHub Desktop.
Save shellexy/8d3e8a49bcd46915da48b403580b74ca to your computer and use it in GitHub Desktop.
导出微信聊天记录为 txt
#!/usr/bin/python2.7
# -*- coding: UTF-8 -*-
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79:
'''导出微信聊天记录为 txt
@author: Shellexy Wang <shellexyone@gmail.com>
@license: LGPLv3+
@see:
'''
import os
import sys
import re
import hashlib
import csv
import time
import locale
import getopt
WECHAT_DB_KEY = ''
def get_db_path():
'''get wechat EnMicroMsg.db path
like /data/data/com.tencent.mm/MicroMsg/********************************/EnMicroMsg.db
'''
os.popen('adb root').close()
txt = os.popen("adb shell su -c 'ls /data/data/com.tencent.mm/MicroMsg/*/EnMicroMsg.db' ").read()
return txt.splitlines()[-1] if txt else ''
def get_uin():
'''get wechat uin
'''
os.popen('adb root').close()
txt = os.popen("adb shell su -c 'cat /data/data/com.tencent.mm/shared_prefs/system_config_prefs.xml' ").read()
uin = re.findall('name="default_uin" value="([0-9]+)"', txt)
return uin[0] if uin else 0
def get_imei():
'''get IMEI
'''
txt = os.popen('adb shell dumpsys iphonesubinfo').read()
imei = re.findall('Device ID = ([0-9]+)', txt)
return imei[0] if imei else 0
def get_key():
'''get key
'''
global WECHAT_DB_KEY
if WECHAT_DB_KEY:
return WECHAT_DB_KEY
uin = get_uin()
imei = get_imei()
if uin and imei:
return hashlib.md5(imei + uin).hexdigest()[0:7]
return ''
def messagecsv2chat(msgcsv):
'''message csv to chatlog
header: msgId,msgSvrId,type,status,isSend,isShowTimer,createTime,talker,content,imgPath,reserved,lvbuffer
'''
locale.setlocale(locale.LC_ALL, '')
if hasattr(msgcsv, 'title'):
msgcsv = [line + '\n' for line in msgcsv.splitlines()]
pass
lines = csv.reader(msgcsv)
lines.next()
# 聊天记录用 createTime 来重新排序,因为数据库里的序号可能有乱的,目前 csv 格式 创建时间 是在 [6]
lines = sorted(list(lines), key = lambda line: line[6])
for line in lines:
try:
msgId,msgSvrId,type_,status,isSend,isShowTimer,createTime,talker,content,imgPath,reserved,lvbuffer = line[:12]
pass
except:
continue
sender = 'me' if (isSend == '1') else talker
sendtime = time.localtime(int(createTime)/1000)
sendat = time.strftime("%Y-%m-%d 周%a %H:%M:%S", sendtime)
yield [talker, sendat, sender, content, imgPath]
pass
pass
def chat2talkers(chat):
'''get talker list from chat
'''
names = {}
for talker, sendat, sender, content, imgPath in chat:
names[talker] = 1
pass
return names.keys()
def chat2txt(chat, name = ''):
'''chatlog to txt
'''
txt = []
name = name.lower()
for talker, sendat, sender, content, imgPath in chat:
# 去掉群聊记录里 content 第一行 name: 后额外的换行
if '@chatroom' in name: content = content.replace(':\n', ': ', 1)
# 其他换行在行首加空格
content = content.replace('\n', '\n ')
imgPath = ('\t' + imgPath) if imgPath else ''
if not name:
txt.append('%s: %s %s: %s %s' % (talker, sendat, sender, content, imgPath))
pass
elif talker.lower() == name:
txt.append('%s %s: %s %s' % (sendat, sender, content, imgPath))
pass
pass
return '\n'.join(txt) + '\n'
def chat2txts(chat, names = []):
'''chat log to txts for names
'''
txts = {}
names = [name.lower() for name in names]
for talker, sendat, sender, content, imgPath in chat:
name = talker.lower()
if name in names:
# 去掉群聊记录里 content 第一行 name: 后额外的换行
if '@chatroom' in name: content = content.replace(':\n', ': ', 1)
# 其他换行在行首加空格
content = content.replace('\n', '\n ')
imgPath = ('\t' + imgPath) if imgPath else ''
txt = txts.get(name, [])
if not txt: txts[name] = txt
txt.append('%s %s: %s %s' % (sendat, sender, content, imgPath))
pass
pass
txts = { name: '\n'.join(txt) + '\n' for name, txt in txts.items() }
return txts
def get_sqlc_tables(dbn, key = ''):
'''list tables of db
'''
i, o = os.popen2(['sqlcipher', dbn])
if key:
i.write('PRAGMA key=%s;\n' % `key`)
i.write('PRAGMA cipher_migrate;\n')
pass
i.write('.tables\n')
i.close()
## 由于加入 PRAGMA cipher_migrate; 后输出开头会多了 "cipher_migrate\r\n0\r\n",所以需要去掉头两行
return o.read().split()[2:]
def sqlc2csv(dbn, key = '', table = 'message'):
'''export csv of db
'''
table = table or 'message'
i, o = os.popen2(['sqlcipher', dbn])
i.write('.header on\n')
i.write('.mode csv\n')
if key:
i.write('PRAGMA key=%s;\n' % `key`)
i.write('PRAGMA cipher_migrate;\n')
pass
i.write('select * from %s;\n' % `table`)
i.close()
## 由于加入 PRAGMA cipher_migrate; 后输出开头会多了 "cipher_migrate\r\n0\r\n",所以需要去掉头两行
o.readline()
o.readline()
return o.read()
def wechat2csv(tables = []):
'''export db to csv
'''
ldbn = 'EnMicroMsg.db'
dbn = get_db_path()
key = get_key()
os.popen('adb wait-for-device')
#os.popen('adb pull %s %s' % (`dbn`, `ldbn`)).close()
'''
dbbin = os.popen("adb shell su -c 'cat %s' " % `dbn`).read().replace("\r\n", "\n")
if len(dbbin) < 10:
return {}
file(ldbn, 'w').write(dbbin)
'''
os.popen('adb shell su -c "cp %s /storage/sdcard1/EnMicroMsg.db" ; adb pull /storage/sdcard1/EnMicroMsg.db %s && adb shell rm /storage/sdcard1/EnMicroMsg.db ' % (`dbn`, `ldbn`)).close()
if not tables:
tables = get_sqlc_tables(ldbn, key)
pass
for table in tables:
csvfn = '%s.csv' % table
csvtxt = sqlc2csv(ldbn, key, table)
if csvtxt.find('\n') > 0:
file(csvfn, 'w').write(csvtxt)
pass
pass
return
def wechat2csvtxt(tables = []):
'''export db to csv
'''
ldbn = 'EnMicroMsg.db'
dbn = get_db_path()
key = get_key()
csvtxts = {}
os.popen('adb wait-for-device')
'''
dbbin = os.popen("adb shell su -c 'cat %s' " % `dbn`).read().replace("\r\n", "\n")
if len(dbbin) < 10:
return {}
file(ldbn, 'w').write(dbbin)
'''
os.popen('adb shell su -c "cp %s /storage/sdcard1/EnMicroMsg.db" ; adb pull /storage/sdcard1/EnMicroMsg.db %s && adb shell rm /storage/sdcard1/EnMicroMsg.db ' % (`dbn`, `ldbn`)).close()
if not tables:
tables = get_sqlc_tables(ldbn, key)
pass
for table in tables:
csvtxt = sqlc2csv(ldbn, key, table)
if csvtxt.find('\n') > 0:
csvtxts[table] = csvtxt
pass
pass
return csvtxts
def wechat2txt(names = []):
'''export chatlog to txt
'''
csvtxt = wechat2csvtxt(tables=['message']).get('message')
file('message.csv', 'w').write(csvtxt)
chat = list(messagecsv2chat(csvtxt))
if not chat:
return 1
if not names:
names = chat2talkers(chat)
pass
txts = chat2txts(chat, names)
for name in names:
fn = 'message.%s.txt' % name
txt = txts.get(name.lower())
if len(txt) > 4:
file(fn, 'w').write(txt)
pass
pass
pass
USAGE = '''Usage: wechat2txt.py [OPTIONS] [NAME]...
OPTIONS:
-h display this help and exit
-t export csv of database tables
-k <KEY> set the wechat db sqlcipher key
'''
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], 'htk:')
except getopt.error, msg:
print USAGE
return 1
global WECHAT_DB_KEY
tables = []
for opt, arg in opts:
if opt == '-h':
print USAGE
return 1
elif opt == '-t':
tables = args
pass
elif opt == '-k':
key = arg
WECHAT_DB_KEY = arg
pass
pass
if tables:
wechat2csv(tables)
return
names = args
txt = wechat2txt(names)
return not txt
if __name__=="__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment