Skip to content

Instantly share code, notes, and snippets.

@brianv0
Last active December 17, 2021 17:01
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save brianv0/35f36a32366a2c34be8d to your computer and use it in GitHub Desktop.
Save brianv0/35f36a32366a2c34be8d to your computer and use it in GitHub Desktop.
iOS messages html and json dump with file copying
#!/usr/bin/python
import json
import sys
import os
import datetime
import codecs
import sqlite3
import re
import struct
import sha
import shutil
from dateutil import parser
from dateutil.tz import tzlocal, tzutc
dirname = sys.argv[1]
targetdir = sys.argv[2]
messages_db = '3d0d7e5fb2ce288813306e4d4636395e047a3d28'
contacts_db = '31bb7ba8914766d4ba40d6dfb6113c8b614be442'
mbdb = "Manifest.mbdb"
COPY_FILES = True
conn = sqlite3.connect(dirname + "/" + messages_db)
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
conn.row_factory = dict_factory
curs = conn.cursor()
class Record:
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return repr(self.__dict__)
class Reader:
def __init__(self, dirname):
self.dir = dirname
self.dat = open(dirname + "/" + mbdb,"r+b")
self.dat.read(6)
def __iter__(self):
self.dat.seek(6)
return self
def next(self):
n = self.readRecord()
if n is None:
raise StopIteration
return n
def readRecord(self):
rec = Record()
rec.domain = self._recString()
if len(rec.domain) == 0:
return None
rec.path = self._recString()
rec.link = self._recString()
rec.sha = sha.new("%s-%s" %(rec.domain, rec.path)).hexdigest()
rec.hash = self._recString()
rec.encKey = self._recString()
(rec.mode, rec.inode, rec.uid, rec.gid) =struct.unpack(">HQII",self.dat.read(18))
(rec.mtime, rec.atime, rec.ctime) = struct.unpack(">III",self.dat.read(12))
(rec.fsize, rec.prot, rec.propCount) = struct.unpack(">QBB", self.dat.read(10))
if rec.propCount > 0:
rec.prop = {}
for i in range(rec.propCount):
key = self._recString()
rec.prop[key] = self._recString()
return rec
def _recString(self):
rd = self.dat.read(2)
if len(rd) < 2:
return u''
dsize = struct.unpack(">H",rd)[0]
if dsize == 65535 or dsize == 0:
return u''
st = self.dat.read(dsize)
return st
def verifyFileOnDisk(self,rec):
spath = os.path.join(self.dir, rec.sha)
stat = os.stat(spath)
if stat.st_size != rec.fsize:
print stat
print rec.fsize
print "File mismatch: " + rec.sha
print rec.hash
files = {}
contacts = {}
def build_contacts():
contacts_conn = sqlite3.connect(dirname + "/" + contacts_db)
contacts_conn.row_factory = dict_factory
curs = contacts_conn.cursor()
def normalize(id):
if id.find("@") > 0:
return id
elif id.find("http") == 0:
return id
elif id.find("itunes") == 0:
return None
fixed = re.compile(r'[^\d.]*').sub('',id)
if len(fixed) == 0:
return None
fixed = "1" + fixed if fixed[0] != "1" else fixed
fixed = "+" + fixed if fixed[0] != "+" else fixed
return fixed
sql = """SELECT first first, last last, value FROM ABMultiValue, ABPerson WHERE record_id = ROWID AND value is not null"""
curs.execute(sql)
for row in curs.fetchall():
id = normalize(row['value'])
if id is not None:
e = {'first':row['first'], 'last':row['last']}
contacts[id] = e
contacts_conn.close()
def full_name(contact):
if contact['alias']:
al = contact['alias']
return (al['first'] or '') + (' ' if al['first'] and al['last'] else '') + (al['last'] or '')
return None
def full_id(contact):
return "%s (%s)" %(full_name(contact), contact['id']) if full_name(contact) else contact['id']
def open_chats():
chat_meta = """
SELECT distinct chat.chat_identifier, h.id
from chat chat
JOIN chat_handle_join chj on (chat.rowid = chj.chat_id)
join handle h on (chj.handle_id = h.rowid)
order by chat.rowid
"""
curs.execute(chat_meta)
last = None
rows = curs.fetchall()
end = len(rows) - 1
meta = None
houtput = None
joutput = None
for i in range(len(rows)):
row = rows[i]
cname = row["chat_identifier"]
if last == cname:
contact = {'id':row['id'], 'alias':contacts.get(row['id'], None)}
files[cname]['meta']['contacts'].append(contact)
if i < end:
continue
def init():
contact = {'id':row['id'], 'alias':contacts.get(row['id'], None)}
files[cname] = {'html':cname+".html", 'json':cname+".json",'meta':{'chat':cname, 'contacts': [contact]}}
houtput = codecs.open(os.path.join(targetdir, cname + ".html"), "w+b",'utf-8')
joutput = codecs.open(os.path.join(targetdir, cname + ".json"), "w+b",'utf-8')
return meta, houtput, joutput
def close(chat, houtput, joutput):
m = files[chat]['meta']
m['group'] = len(m['contacts']) > 1
joutput.write(u'{"meta":')
joutput.write(json.dumps(m))
joutput.write(u',\n"messages":[')
contactlist = ", ".join([full_id(contact) for contact in m['contacts']])
houtput.write(
u"""
<html>
<head>
<meta charset="utf-8" />
<link rel="stylesheet" href="./theme.css">
</head>
<body>
<div class="info">%s</div><br>
""" %(contactlist))
joutput.close()
houtput.close()
files[chat]['init'] = True
if last != cname:
if last in files:
close(last, houtput, joutput)
meta, houtput, joutput = init()
if i == end:
close(cname, houtput, joutput)
last = cname
def rewrite_path(path):
if path is None:
return None
if path[0] == u'~':
return path[2:]
if path.find(u"Library") > 0:
path = path[path.find(u"Library"):]
return path
def dump_messages(records):
message_dump = """
SELECT
chat.chat_identifier as conversation,
h.id AS user_id,
case when m.service = 'SMS' then 1 else null end sms,
m.cache_roomnames as chatroom,
is_from_me from_me,
CASE
WHEN date > 0 THEN strftime('%Y-%m-%dT%H:%M:%SZ', date + 978307200, 'unixepoch')
ELSE NULL
END as timestamp,
text as text,
atch.filename filename
FROM chat chat
join chat_message_join cmj on chat.rowid = cmj.chat_id
join message m on cmj.message_id = m.rowid
LEFT JOIN handle h ON h.rowid = m.handle_id
LEFT JOIN message_attachment_join maj
ON maj.message_id = m.rowid
LEFT JOIN attachment atch on maj.attachment_id = atch.rowid
ORDER BY chat.chat_identifier, m.rowid asc, timestamp
"""
curs.execute(message_dump)
def init(chat):
houtput = codecs.open(os.path.join(targetdir,chat['html']), "a+b",'utf-8')
joutput = codecs.open(os.path.join(targetdir,chat['json']), "a+b",'utf-8')
return houtput, joutput
def close(houtput, joutput):
houtput.write(u'\n </body>\n</html>')
joutput.write(u']}')
houtput.close()
joutput.close()
last = None
while 1:
rows = curs.fetchmany(1000)
if len(rows) == 0:
close(houtput, joutput)
break
for row in rows:
dname = row['conversation']
dat = dict(row)
del dat['conversation'] # redundant in json dump
dat['filename'] = rewrite_path(dat['filename'])
chat = files[dname]
chat_meta = chat['meta']
if last != dname:
if last in files:
close(houtput, joutput)
houtput, joutput = init(chat)
joutput.write(json.dumps(dat))
clz = u"me" if dat["from_me"] == 1 else u"them"
clz = clz if dat['sms'] is None else clz + ' sms' #extra class if sms
if chat_meta['group'] and dat['user_id'] is not None:
user = contacts.get(dat['user_id'], {'first': dat['user_id']})
houtput.write(u'<span class="group">%s</span>' %(user['first']))
houtput.write(u'<div class="' + clz + u'">')
def process_attachment(dat):
filename = dat['filename']
print("Processing attachment: %s" %(filename))
record = records.get(filename, None)
if record is None:
# Try without the file extension
record = records.get(u'.'.join(filename.split(u'.')[0:-1]))
if records is None:
print("Error: Unable to find attachment for %s with %s" %(dname, filename))
print("repr:" + repr(filename))
houtput.write(u'<span>Attachment Error:%s</span>' %(filename))
return
else:
mms_dir = os.path.join(targetdir,dname)
sha_file = record.sha
if len(record.link) > 0:
new_record = records.get(record.link, None)
if new_record is not None:
sha_file = new_record.sha
oldname = os.path.join(dirname,sha_file)
newfile = u"%s-%s" %(sha_file, os.path.split(filename)[1])
newname = os.path.join(mms_dir, newfile)
if COPY_FILES:
if not os.path.exists(mms_dir):
os.mkdir(mms_dir)
if not os.path.exists(newname) and os.path.exists(oldname):
print("Archiving attachment: %s to %s" %(filename, newname))
shutil.copy2(os.path.join(dirname,sha_file), newname)
elif os.path.exists(newname):
print("Found file %s, skipping" %newname)
elif not os.path.exists(oldname):
print("Error: Unable to find attachment for %s with %s" %(dname, filename))
print("repr:" + repr(filename))
if os.path.exists(newname):
houtput.write(u'<a href="' + newname + u'">Attachment </a>')
else:
houtput.write(u'<span>Attachment Error:%s</span>' %(filename))
if dat['filename'] is not None:
process_attachment(dat)
if dat['text'] is not None:
houtput.write(dat['text'])
houtput.write(u'</div>')
time = parser.parse(dat['timestamp'])
houtput.write(u'<time datetime="%s">%s</time>' %(dat['timestamp'], time.astimezone(tzlocal()).strftime("%a %b %d %y %I:%M %p")))
joutput.write(u',')
last = dname
manifest_reader = Reader(dirname)
records = {i.path.decode('utf-8'):i for i in manifest_reader}
build_contacts()
open_chats()
dump_messages(records)
theme_css = u"""
.info {
font-size: 20px;
}
.them {
clear: both;
margin: 4px;
padding: 5px;
background-color: #cef;
border: 1px solid #bde;
border-radius: 8px 8px 8px 0px;
}
.sms {
background-color: #cfc;
border-color: #9e9;
}
.me {
clear: both;
margin: 4px;
padding: 5px;
background-color: #eee;
text-align: right;
border: 1px solid #ddd;
border-radius: 8px 8px 0px 8px;
}
time {
margin: 0px 4px 0px 0px;
float: right;
text-align: right;
font-size: 12px;
}
.group {
margin: 6px;
float: left;
}
"""
with f as codecs.open(os.path.join(targetdir,"theme.css"),"w+b", 'utf-8'):
f.write(theme_css)
@brianv0
Copy link
Author

brianv0 commented Oct 21, 2014

You might need to install dateutil.

usage:
python message_backup.py [backup dir] [target dir]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment