Skip to content

Instantly share code, notes, and snippets.

@monokrome
Last active December 19, 2015 17:39
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save monokrome/5992936 to your computer and use it in GitHub Desktop.
Save monokrome/5992936 to your computer and use it in GitHub Desktop.
Search within a mailbox for iTunes receipts, and write a JSON representation of all iTunes receipts as 'receipts.json'.
#!/usr/bin/env python
import mailbox
mailbox_root = 'your_mailbox'
mailbox_type = mailbox.Maildir
###########################################
###########################################
## Configuration section finished. ##
###########################################
###########################################
import json
import os
import re
import sys
import hashlib
def dictize(root, key, value):
node = root
parts = re.split(r'\s+', key)
for part in parts:
if part not in node:
if parts.index(part) == len(parts) - 1:
node[part] = value
else:
node[part] = {}
node = node[part]
node = value
def build_cache():
box = mailbox_type(mailbox_root)
def is_itunes_message(message):
if 'from' in message:
return 'itunes.com' in message['from']
def is_receipt(message):
return 'receipt' in message['subject']
def related_messages(box):
for identifier, message in box.iteritems():
if is_itunes_message(message):
if is_receipt(message):
yield identifier, message
for identifier, message in related_messages(box):
f = open('.cache/{0}.json'.format(identifier), 'w')
result = {
'body': message.fp.read().replace('"', '\\"'),
'from': message['from'],
'subject': message['subject'],
}
f.write(json.dumps(result))
f.close()
class Receipt(object):
def to_dict(self):
return {
'billing': self.billing,
'order': self.order,
'items': self.items,
'totals': self.totals,
}
@classmethod
def make_address(cls, data):
firstBreakIndex = data[3:].index('') + 4
secondBreakIndex = data[firstBreakIndex:].index('') + firstBreakIndex
return data[secondBreakIndex:], {
'street': ' '.join(data[3:firstBreakIndex]),
'area': ' '.join(data[firstBreakIndex:secondBreakIndex])
}
@classmethod
def order_information(cls, data):
results = {}
for index in xrange(len(data)):
datum = data[index]
try:
datum.index('Item')
return data[index:], results
except ValueError:
if datum == '':
continue
datum = re.sub(r'^(\s+)', '', datum)
datum = re.sub(r'(\s+)$', '', datum)
key, value = datum.split(': ')
if '=' in value:
value, _ = value.split('=')
dictize(results, key, value)
return results
@classmethod
def line_items(cls, data):
results = []
currentLine = ''
for index in xrange(len(data)):
line = data[index]
if line == '':
continue
if line[-1] == '=':
currentLine = currentLine + line[:len(line)-1]
else:
line = currentLine + line
currentLine = ''
parts = re.split('\s\s\s+', line)
# Ignore headers
if parts[0] == 'Item':
headers = parts
continue
if parts[0][0:4] == '----':
continue
if parts[0] == '':
break
result = {}
for index in xrange(len(parts)):
category = headers[index].lower()
if category == 'type' and parts[index].strip()[0] == '$':
category = 'unit price'
result[category] = parts[index]
results.append(result)
return data[index-1:], results
@classmethod
def totals(cls, data):
results = {}
currentLine = ''
for index in xrange(len(data)):
line = data[index]
if line == '' or line[0] == '-':
continue
if line[-1] == '=':
currentLine = currentLine + line[:len(line)-1]
else:
line = currentLine + line
line = re.sub(r'^(\s+)', '', line)
currentLine = ''
key, value = re.split(':\s+', line)
dictize(results, key, value)
return data, results
@classmethod
def factory(cls, report):
data = report.split('\n')
receipt = cls()
receipt.billing = {
'email': data[0],
'name': data[1],
}
data, address = cls.make_address(data)
receipt.billing['address'] = address
while data[0] == '':
data = data[1:]
data, receipt.order = cls.order_information(data)
data, receipt.items = cls.line_items(data)
data, receipt.totals = cls.totals(data)
return receipt
class ReceiptParser(object):
def parse(self, receipt):
body = receipt['body']
exclusion = body[body.index('Apple Receipt'):]
exclusion = exclusion[:exclusion.index('<!')]
exclusion = exclusion[:exclusion.index('Please retain for your records.')]
report = '\n'.join(exclusion.split('\n')[5:-3])
return Receipt.factory(report)
parser = ReceiptParser()
def receipts():
hashes = []
for filename in os.listdir('.cache'):
hasher = hashlib.md5()
contents = open('.cache/{0}'.format(filename), 'r').read()
try:
data = json.loads(contents)
except ValueError:
print('Invalid JSON in {0}'.format(filename))
hasher.update(contents)
current_hash = hasher.digest()
if not current_hash in hashes:
hashes.append(current_hash)
yield parser.parse(data), data['subject'], data['from']
def main():
data = []
if len(sys.argv) > 1 and sys.argv[1] == 'reparse':
print('Reparsing maildir. This could take quite a while.')
os.remove('.cache')
build_cache()
for receipt, subject, from_address in receipts():
result = receipt.to_dict()
result['subject'] = subject
result['from'] = from_address
data.append(result)
open('receipts.json', 'w').write(json.dumps({
"receipts": data
}, indent=2))
if __name__ == '__main__':
main()
#!/usr/bin/env python
import json
import csv
receipt_data = open('receipts.json', 'r')
receipts = json.load(receipt_data)['receipts']
output_file = open('orders.csv', 'w')
def normalize(receipt):
results = []
for item in receipt['items']:
result = {
'order number': receipt['order']['Order']['Number'],
'item': item['item'],
'price': '',
'type': '',
'artist': '',
'ordered date': receipt['order']['Receipt']['Date'],
'ordered via': receipt['order']['Billed']['To'],
}
if 'unit price' in item:
result['price'] = item['unit price']
if 'artist' in item:
result['artist'] = item['artist']
if 'type' in item:
result['type'] = item['type']
results.append(result)
return results
headers = [
'Order Number',
'Item',
'Price',
'Type',
'Artist',
'Ordered Date',
'Ordered Via',
]
data = []
writer = csv.writer(output_file, dialect='excel')
writer.writerow(headers)
print('')
rows = []
for receipt in receipts:
for data in normalize(receipt):
row = []
for header in headers:
row.append(data[header.lower()])
rows.append(row)
for row in rows:
writer.writerow(row)
print('{0} created with {1} items.'.format('orders.csv', len(rows)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment