Skip to content

Instantly share code, notes, and snippets.

@pedramamini
Created May 20, 2015 05:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pedramamini/51f450fe8ec24a8705d3 to your computer and use it in GitHub Desktop.
Save pedramamini/51f450fe8ec24a8705d3 to your computer and use it in GitHub Desktop.
Extract URLs and related contact information from your OSX Messages.app database.
#!/usr/bin/env python
# Extract URLs and related contact information from your OSX Messages.app database.
#
# TODO
# - automatically resolve username and discover contacts database (by largest item count if there is more than one).
# - make a machine parseable format.
# - keep track of last found URL (by hash?), allow for periodic run of script and addition to output.
# - update to latest gruber regex.
import re
import sys
import time
import sqlite3
# change these:
MESSAGES_DB = "/Users/pedram/Library/Messages/chat.db"
CONTACTS_DB = "/Users/pedram/Library/Application Support/AddressBook/Sources/CADDFCF3-FE1E-40E4-A1D0-5F3E011A9AB9/AddressBook-v22.abcddb"
# no need to edit below this line.
DEBUG = False
# https://gist.github.com/gruber/8891611
URL_EXTRACT = re.compile(ur'(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?\xab\xbb\u201c\u201d\u2018\u2019]))')
# apple epoch is 1/1/2001, whereas unix is 1/1/1971.
UNIX_TS_MOD = 978307200
# SQL queries.
QUERY_MESSAGES = """
SELECT
message.date AS date,
handle.id AS recipient,
message.text AS text
FROM
message, handle
WHERE
message.handle_id = handle.ROWID AND
(
LOWER(text) LIKE '%www.%' OR
LOWER(text) LIKE '%.com%' OR
LOWER(text) LIKE '%.net%' OR
LOWER(text) LIKE '%.org%' OR
LOWER(text) LIKE '%http://%' OR
LOWER(text) LIKE '%https://%'
)
ORDER BY
message.date DESC
"""
QUERY_CONTACTS = """
SELECT
ZABCDRECORD.Z_PK AS record_id,
ZABCDRECORD.ZFIRSTNAME AS first,
ZABCDRECORD.ZLASTNAME AS last,
ZABCDPHONENUMBER.ZOWNER AS number_id,
ZABCDPHONENUMBER.ZFULLNUMBER AS number
FROM
ZABCDRECORD,
ZABCDPHONENUMBER
WHERE
ZABCDRECORD.Z_PK = ZABCDPHONENUMBER.ZOWNER AND
ZABCDPHONENUMBER.ZFULLNUMBER LIKE ?
"""
########################################################################################################################
class db_wrapper:
def __init__ (self, db_path, mode="rw"):
"""
"""
self.db_path = db_path
self.mode = mode
self.conn_str = "file:" + self.db_path + "?mode=" + self.mode
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
self.conn.text_factory = str
self.cursor = self.conn.cursor()
####################################################################################################################
def execute (self, query, params=(), MAX_ATTEMPTS=5):
"""
Execute the supplied query.
@type query: str
@param query: SQL query to execute.
@type params: Tuple
@param params: Optional SQL parameters to bind.
@rtype: self
@return: self.
"""
query = query.lstrip().rstrip()
succeeded = False
attempts = 0
if type(params) != tuple:
params = (params, )
while not succeeded and attempts < MAX_ATTEMPTS:
try:
self.cursor.execute(query, params)
self.conn.commit()
succeeded = True
except sqlite3.Error, e:
finding = "\n[FAIL] sqlite3 #%d: %s\n\n" % (attempts, e)
finding += "query: %s\n" % query
finding += "params: %s\n\n" % str(params)
sys.stderr.write(finding)
attempts += 1
time.sleep(.5)
# if execute() failed, now is the time to whine about it.
if not succeeded:
raise Exception("sqlite failure count exceeded max of %d" % MAX_ATTEMPTS)
return self
####################################################################################################################
def get (self):
"""
Retrieve the first item from the underlying cursor.
@rtype: str
@return: First row.
"""
return self.cursor.fetchone()
####################################################################################################################
def iterate (self):
"""
Iterate over the cursor.
@rtype: Iterative row
@return: Yield row dictionary.
"""
for row in self.cursor.fetchall():
yield row
########################################################################################################################
if __name__ == "__main__":
messages = db_wrapper(MESSAGES_DB, mode="memory")
contacts = db_wrapper(CONTACTS_DB, mode="memory")
# keep a set of URLs.
urls = set()
for record in messages.execute(QUERY_MESSAGES).iterate():
timestamp = time.ctime(UNIX_TS_MOD + record['date'])
if DEBUG:
print
print timestamp
print "\t", record['recipient'], record['text']
# this ugly shit below will splice and wrap the generic number with sql wild card '%'.
recipient_normalized = "%" + "".join([a + b for a, b in zip(record['recipient'][2:], "%" * 20)])
if DEBUG:
print "\t", recipient_normalized
# query the contact db using the normalized recipient we made above.
for contact in contacts.execute(QUERY_CONTACTS, recipient_normalized).iterate():
name = ""
if contact['first']:
name += contact['first']
if contact['last']:
name += " " + contact['last']
if not name:
name = "unknown"
if DEBUG:
print "\t", name
# apply the gruber regex to detect and extract URLs from the SMS body.
for url in URL_EXTRACT.findall(record['text']):
url = url[0]
urls.add(url)
if DEBUG:
print "\t\t", url
# output the gleaned URL synopsis.
if not DEBUG:
print timestamp, name, url
# show the unique set of discovered URLs.
# for url in urls:
# print url
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment