Skip to content

Instantly share code, notes, and snippets.

@williballenthin
Last active June 24, 2022 15:32
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save williballenthin/600a3898f43b7ad3f8aa4a5f4156941d to your computer and use it in GitHub Desktop.
Save williballenthin/600a3898f43b7ad3f8aa4a5f4156941d to your computer and use it in GitHub Desktop.
extract entries from the osx sticky database
'''
parse osx sticky databases.
author: Willi Ballenthin <william.ballenthin@fireeye.com>
license: Apache 2.0
usage:
$ python extract_stickies.py /path/to/input.bin /path/to/output/directory/
'''
import re
import sys
import struct
import hashlib
import logging
import os.path
import datetime
logger = logging.getLogger('osx.stickydatabase')
logging.basicConfig(level=logging.INFO)
# recovered empirically
STREAMTYPED_HEADER = b'\x04\x0Bstreamtyped'
def md5(buf):
m = hashlib.md5()
m.update(buf)
return m.hexdigest()
def carve_databases(buf):
'''
carve sticky databases from the given binary data.
assume the databases are separated by the header, or until end of file.
'''
if not buf.startswith(STREAMTYPED_HEADER):
# scan forward until the first header
_, _, buf = buf.partition(STREAMTYPED_HEADER)
else:
buf = buf[len(STREAMTYPED_HEADER):]
while buf:
db, _, buf = buf.partition(STREAMTYPED_HEADER)
db = STREAMTYPED_HEADER + db
yield db
def read_u32(buf, offset):
v = struct.unpack_from('<I', buf, offset)[0]
return v, offset + 4
def read_bytes(buf, offset, size):
return buf[offset:offset+size], offset + size
def read_str(buf, offset):
# layout:
#
# length u32
# s [char]
size, offset = read_u32(buf, offset)
s, offset = read_bytes(buf, offset, size)
return s.decode('utf-8'), offset
def read_dict(buf, offset):
# layout:
#
# count u32
# keys [str]
# value lengths [u32]
# values [value]
keys = []
sizes = []
values = []
count, offset = read_u32(buf, offset)
logger.debug('dict: found %d items', count)
for _ in range(count):
key, offset = read_str(buf, offset)
logger.debug('dict: read key: %s', key)
keys.append(key)
for _ in range(count):
size, offset = read_u32(buf, offset)
logger.debug('dict: read size: 0x%x', size)
sizes.append(size)
for i in range(count):
value, offset = read_bytes(buf, offset, sizes[i])
logger.debug('dict: read value, 0x%x bytes', len(value))
values.append(value)
return {keys[i]: values[i] for i in range(count)}, offset
def read_buffer(buf, offset):
size, offset = read_u32(buf, offset)
if size == 0x80000000:
# the buffer has some padding first.
#
# layout:
#
# flags u32 == 0x80000000
# size u32
# padding-size u32
# padding [u8]
# b [u8]
flags = size
size, offset = read_u32(buf, offset)
padsize, offset = read_u32(buf, offset)
padding = buf[offset:offset+padsize]
offset += padsize
b = buf[offset:offset+size]
offset += size
return b, offset
else:
# buffer is directly inline
#
# layout:
#
# size u32
# b [u8]
b, offset = read_bytes(buf, offset, size)
return b, offset
def read_value(buf, offset):
# i'm not 100% confident on these interpretations.
# but, seems to make sense for right now.
tag, offset = read_u32(buf, offset)
if tag == 0x01:
logger.debug('value: found buffer')
return read_buffer(buf, offset)
elif tag == 0x03:
logger.debug('value: found dict')
return read_dict(buf, offset)
else:
raise NotImplementedError('value type: %08x' % tag)
def parse_sticky(buf):
# example of header::
#
# 08 5B 31 30 37 30 31 63 5D 72 74 66 64
# . [ 1 0 7 0 1 c ] r t f d
# | ------ name ---------- -- magic -
# | 1 2 3 4 5 6 7 8
# +-----------------------^
namelen = buf[0]
if sys.version_info[0] < 3:
namelen = ord(namelen)
header_descriptor = '<b%ds4sI' % (namelen)
header_size = struct.calcsize(header_descriptor)
header = buf[:header_size]
body = buf[header_size:]
namelen, name, magic, zero = struct.unpack(header_descriptor, header)
name = name.decode('ascii')
assert magic == b'rtfd'
assert zero == 0x0
logger.info('found sticky: %s', name)
# the top level object is a value (specifically, a dict)
sticky, offset = read_value(body, 0x0)
assert isinstance(sticky, dict)
# all the value objects are values.
# in the case of files (name != '.'), then this is file content (buffer).
# for the metadata file (name == '.'), then this is a dict serialized into a buffer.
for key in sticky.keys():
sticky[key], offset = read_value(sticky[key], 0x0)
# unwrap the metadata
sticky['.'], _ = read_dict(sticky['.'], 0x0)
# the metadata file contains a mapping from filename to timestamp (time_t)
for filename, buf in sticky['.'].items():
# 0D F0 29 54 B6 01 00 00 00 00 00 00 00 00 00 00
# ^^ ^^ ^^ ^^ ?? ??
# time_t unk
q = struct.unpack_from('<I', buf, 0x0)[0]
try:
ts = datetime.datetime.fromtimestamp(q)
except (OSError, ValueError):
ts = datetime.datetime.min
sticky['.'][filename] = ts
for filename, buf in sticky.items():
if filename == '.':
continue
logger.info('found file: %s timestamp: %s', filename, sticky['.'][filename].isoformat('T'))
return {
'name': name,
'stickies': {
filename: {
'buf': sticky[filename],
'ts': sticky['.'][filename],
}
for filename in sticky.keys()
if filename != '.'
}
}
def carve_stickies(buf):
for match in re.finditer(b'(.)\[([0-9a-f]+)\]rtfd', buf):
start = match.span()[0]
try:
yield parse_sticky(buf[start:])
except Exception:
logger.warning('failed to parse sticky', exc_info=True)
outdir = sys.argv[2]
with open(sys.argv[1], 'rb') as f:
buf = f.read()
for i, db in enumerate(carve_databases(buf)):
logger.debug('found database, size: 0x%08x bytes, hash: %s', len(db), md5(db))
dbdir = os.path.join(outdir, 'database-%d' % i)
logger.info('creating database directory: %s', dbdir)
try:
os.makedirs(dbdir)
except:
pass
with open(os.path.join(dbdir, 'metadata.txt'), 'wb') as f:
f.write(('input file: %s\n' % (sys.argv[1])).encode('utf-8'))
f.write(('input md5: %s\n' % (md5(buf))).encode('ascii'))
f.write(('recovered database index: %d\n' % (i)).encode('ascii'))
f.write(('recovered database md5: %s\n' % (md5(db))).encode('ascii'))
for sticky in carve_stickies(db):
stickydir = os.path.join(dbdir, 'sticky-' + sticky['name'].strip('[]'))
logger.info('creating sticky directory: %s', stickydir)
try:
os.makedirs(stickydir)
except:
pass
with open(os.path.join(stickydir, 'metadata.txt'), 'wb') as f:
f.write(('input file: %s\n' % (sys.argv[1])).encode('ascii'))
f.write(('input md5: %s\n' % (md5(buf))).encode('ascii'))
f.write(('recovered database index: %d\n' % (i)).encode('ascii'))
f.write(('recovered database md5: %s\n' % (md5(db))).encode('ascii'))
for filename, s in sticky['stickies'].items():
f.write(('recovered filename: %s\n' % (filename)).encode('utf-8'))
f.write(('recovered timestamp for %s: %s\n' % (filename, s['ts'].isoformat('T'))).encode('utf-8'))
f.write(('recovered md5 for %s: %s\n' % (filename, md5(s['buf']))).encode('utf-8'))
with open(os.path.join(stickydir, filename), 'wb') as g:
g.write(s['buf'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment