Disco map job to transform ledger inputs to mapreduce-friendly flat file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Transforms raw ledger input into a denormalized line-per-entry-leg flat file | |
disco run ledger.Ledger tag://ledger | |
""" | |
from disco.job import Job | |
from disco.worker.classic.func import nop_map | |
import io, re, uuid | |
datepattern = re.compile('\d\d\d\d/\d\d/\d\d') | |
lineno = -1 | |
class Ledger(Job): | |
map = staticmethod(nop_map) | |
@staticmethod | |
def map_reader(fd, size, url, params): | |
while True: | |
entry = ledger.Ledger.emit_entry(fd) | |
if entry: | |
# Hmm, this gives consistent hashing, but rules out multiple same-description same-date entries | |
entryid = uuid.uuid5(uuid.NAMESPACE_URL,"{0}-{1}".format(entry['date'],entry['description'])) | |
explicit_qtys = {} | |
for leg in entry['legs']: | |
currency = ledger.Ledger.get_currency(leg) | |
if not 'qty' in leg: continue | |
if currency in explicit_qtys: | |
explicit_qtys[currency] = explicit_qtys[currency] + leg['qty'] | |
else: | |
explicit_qtys[currency] = leg['qty'] | |
# There should only be one leg with an empty qty, but it can have multiple currencies | |
# (see Opening Balances). | |
# TODO: parser could handle multiple explicity currencies per leg, we just don't have any | |
implicit_qtys = [] | |
implicit_qty_assigned = False | |
for leg in entry['legs']: | |
if not 'qty' in leg: | |
if implicit_qty_assigned: | |
raise Exception('An entry had multiple omitted quantities') | |
implicit_qty_assigned = True | |
for currency in explicit_qtys: | |
if explicit_qtys[currency] != 0.0: | |
yield entryid, (entry['date'],entry['description'],leg['account'],-explicit_qtys[currency],currency) | |
else: | |
currency = ledger.Ledger.get_currency(leg) | |
yield entryid, (entry['date'],entry['description'],leg['account'],leg['qty'],currency) | |
else: break | |
@staticmethod | |
def get_currency(leg): | |
if 'currency' in leg: | |
return leg['currency'] | |
else: | |
return 'defcur' # Magic value, assume not in use | |
@staticmethod | |
def emit_entry(f): | |
global lineno | |
in_entry = False | |
entry = {} | |
for line in f: | |
lineno = lineno + 1 | |
line = line.decode() | |
# Empty line | |
if len(line.strip()) == 0: | |
# Return entry if we can | |
if in_entry: | |
return entry | |
# Otherwise eat leading newline | |
else: continue | |
# Skip price entries and emacs format line | |
elif line.startswith('P ') or line.startswith('; -*-ledger-*-'): | |
continue | |
# entry start | |
elif datepattern.match(line) and not in_entry: | |
in_entry = True | |
datepart = line[0:10] | |
description = line[11:].strip() | |
entry['date'] = datepart | |
entry['description'] = description | |
entry['legs'] = [] | |
# in an entry leg | |
elif in_entry and line[0] == ' ': | |
line = line.strip() | |
mainparts = re.split('\s{2,}',line,maxsplit=1) | |
leg = {} | |
leg['account'] = mainparts[0] | |
if len(mainparts) > 1: | |
amountparts = mainparts[1].split() | |
leg['qty'] = float(amountparts[0]) | |
if len(amountparts) > 1: | |
leg['currency'] = amountparts[1] | |
entry['legs'].append(leg) | |
else: | |
raise Exception("Not sure what to do with line {0}: {1}",lineno,line) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment