Skip to content

Instantly share code, notes, and snippets.

Created November 18, 2013 04:22
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save erikmack/7522440 to your computer and use it in GitHub Desktop.
Disco map job to transform ledger inputs to mapreduce-friendly flat file
Transforms raw ledger input into a denormalized line-per-entry-leg flat file
disco run ledger.Ledger tag://ledger
from disco.job import Job
from disco.worker.classic.func import nop_map
import io, re, uuid
datepattern = re.compile('\d\d\d\d/\d\d/\d\d')
lineno = -1
class Ledger(Job):
map = staticmethod(nop_map)
def map_reader(fd, size, url, params):
while True:
entry = ledger.Ledger.emit_entry(fd)
if entry:
# Hmm, this gives consistent hashing, but rules out multiple same-description same-date entries
entryid = uuid.uuid5(uuid.NAMESPACE_URL,"{0}-{1}".format(entry['date'],entry['description']))
explicit_qtys = {}
for leg in entry['legs']:
currency = ledger.Ledger.get_currency(leg)
if not 'qty' in leg: continue
if currency in explicit_qtys:
explicit_qtys[currency] = explicit_qtys[currency] + leg['qty']
explicit_qtys[currency] = leg['qty']
# There should only be one leg with an empty qty, but it can have multiple currencies
# (see Opening Balances).
# TODO: parser could handle multiple explicity currencies per leg, we just don't have any
implicit_qtys = []
implicit_qty_assigned = False
for leg in entry['legs']:
if not 'qty' in leg:
if implicit_qty_assigned:
raise Exception('An entry had multiple omitted quantities')
implicit_qty_assigned = True
for currency in explicit_qtys:
if explicit_qtys[currency] != 0.0:
yield entryid, (entry['date'],entry['description'],leg['account'],-explicit_qtys[currency],currency)
currency = ledger.Ledger.get_currency(leg)
yield entryid, (entry['date'],entry['description'],leg['account'],leg['qty'],currency)
else: break
def get_currency(leg):
if 'currency' in leg:
return leg['currency']
return 'defcur' # Magic value, assume not in use
def emit_entry(f):
global lineno
in_entry = False
entry = {}
for line in f:
lineno = lineno + 1
line = line.decode()
# Empty line
if len(line.strip()) == 0:
# Return entry if we can
if in_entry:
return entry
# Otherwise eat leading newline
else: continue
# Skip price entries and emacs format line
elif line.startswith('P ') or line.startswith('; -*-ledger-*-'):
# entry start
elif datepattern.match(line) and not in_entry:
in_entry = True
datepart = line[0:10]
description = line[11:].strip()
entry['date'] = datepart
entry['description'] = description
entry['legs'] = []
# in an entry leg
elif in_entry and line[0] == ' ':
line = line.strip()
mainparts = re.split('\s{2,}',line,maxsplit=1)
leg = {}
leg['account'] = mainparts[0]
if len(mainparts) > 1:
amountparts = mainparts[1].split()
leg['qty'] = float(amountparts[0])
if len(amountparts) > 1:
leg['currency'] = amountparts[1]
raise Exception("Not sure what to do with line {0}: {1}",lineno,line)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment