Last active
January 23, 2020 18:25
-
-
Save quad/3853668c5287d3db3bdae9a792142f56 to your computer and use it in GitHub Desktop.
Convert USAA Bank Statements to Ledger Transactions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import enum | |
import re | |
import sys | |
import types | |
from collections import namedtuple | |
from decimal import Decimal | |
from operator import attrgetter | |
class Tokens(enum.Enum): | |
STATEMENT_DATE = re.compile(r""" | |
.*? | |
(?: | |
(?P<start_month>\d \s* \d) | |
\s* \/ \s* | |
(?P<start_day>\d \s* \d) | |
\s* \/ \s* | |
(?P<start_year>\d \s* \d) | |
\s* - \s* | |
)? | |
(?P<end_month>\d \s* \d) | |
\s* / \s* | |
(?P<end_day>\d \s* \d) | |
\s* / \s* | |
(?P<end_year>\d \s* \d) | |
$ | |
""", re.VERBOSE) | |
SUMMARY = re.compile(r""" | |
\s+ | |
(?P<start_balance>[,\d]*\.\d\d) | |
\s+ | |
(?P<num_debits>\d+) | |
\s+ | |
(?P<debits>[,\d]*\.\d\d) | |
\s+ | |
(?P<num_deposits>\d+) | |
\s+ | |
(?P<deposits>[,\d]*\.\d\d) | |
\s+ | |
(?P<service_charges>[,\d]*\.\d\d) | |
\s+ | |
(?P<final_balance>[,\d]*\.\d\d) | |
""", re.VERBOSE) | |
DEPOSITS = re.compile(r"\s+DEPOSITS AND OTHER CREDITS") | |
DEBITS = re.compile(r"\s+OTHER DEBITS") | |
DATE_AMOUNT = re.compile(r"\s+DATE\.+AMOUNT") | |
TRANSACTION = re.compile(r""" | |
(?P<indent>\s+ | |
(?P<post_month>\d\d) / | |
(?P<post_day>\d\d) | |
\s+ | |
(?P<amount>[,\d]*\.\d\d) | |
\s* | |
) | |
(?P<description>.*?) | |
\s* | |
(?: | |
(?P<run_month>\d\d) | |
(?P<run_day>\d\d) | |
(?P<run_year>\d\d) | |
)?$ | |
""", re.VERBOSE) | |
EXTRA_DESCRIPTION = re.compile(r"(?P<indent>\s+)(?P<description>.*)") | |
END_OF_TRANSACTIONS = re.compile(r"(?:$|\x0c|\s+\d{5,6}-\d{4}_\d\d)") | |
BALANCE_SUMMARY = re.compile(r"\s+ACCOUNT BALANCE SUMMARY") | |
DATE_BALANCE = re.compile(r"\s+DATE\.+BALANCE") | |
BALANCE = re.compile(r""" | |
\s+ | |
(?P<month>\d\d) / | |
(?P<day>\d\d) | |
\s+ | |
(?P<balance>[,\d]*\.\d\d) | |
""", re.VERBOSE) | |
class Scanner: | |
def __init__(self, generator): | |
self.input = enumerate(generator, start=1) | |
self._input_name = generator.name | |
self._next() | |
def _next(self): | |
self._current_lineno, self._current_line = next(self.input, (None, None)) | |
done = property(lambda self: self._current_line is None) | |
def match(self, token): | |
return token.value.match(self._current_line) | |
def accept(self, token): | |
match = self.match(token) | |
if match: | |
self._next() | |
return match | |
def expect(self, token): | |
match = self.accept(token) | |
if match: | |
return match | |
else: | |
raise SyntaxError("expected %s at %s:%u\ninstead got: %s" % \ | |
(token.name, \ | |
self._input_name, \ | |
self._current_lineno, \ | |
self._current_line)) | |
def until(self, *tokens): | |
start_ln = self._current_lineno | |
while not self.done: | |
for t in tokens: | |
match = self.accept(t) | |
if match and len(tokens) == 1: | |
return match | |
elif match and len(tokens) > 1: | |
return t, match | |
self._next() | |
raise SyntaxError("found no %s from %s:%u" % \ | |
(', '.join(t.name for t in tokens), self._input_name, start_ln)) | |
def parse(scanner): | |
yield Period.parse(scanner.until(Tokens.STATEMENT_DATE)) | |
yield Summary.parse(scanner.until(Tokens.SUMMARY)) | |
while True: | |
token, _ = scanner.until( | |
Tokens.DEPOSITS, | |
Tokens.DEBITS, | |
Tokens.BALANCE_SUMMARY | |
) | |
if token == Tokens.BALANCE_SUMMARY: | |
break | |
scanner.expect(Tokens.DATE_AMOUNT) | |
dispatch = {Tokens.DEPOSITS: Deposits, Tokens.DEBITS: Debits} | |
yield dispatch[token]([*transactions(scanner)]) | |
scanner.expect(Tokens.DATE_BALANCE) | |
yield Balances([*assertions(scanner)]) | |
def convert(match, items): | |
for name, type_ in items: | |
yield type_(match.group(name)) | |
def calendar(text): | |
return int(text.translate({ord(' '): None})) | |
def year(text): | |
if text: | |
rv = calendar(text) | |
if rv < 100: | |
return 2000 + rv | |
else: | |
return rv | |
def mm_dd(match, prefix=''): | |
if prefix: | |
prefix += '_' | |
return types.SimpleNamespace( | |
month=int(match.group(prefix + 'month')), | |
day=int(match.group(prefix + 'day')), | |
) | |
def mm_dd_yy(match, prefix=''): | |
if prefix: | |
prefix += '_' | |
return datetime.date(*convert(match, [ | |
(prefix + 'year', year), | |
(prefix + 'month', int), | |
(prefix + 'day', int), | |
])) | |
class Period(namedtuple('Period', 'start end')): | |
@classmethod | |
def parse(cls, match): | |
end = datetime.date(*convert(match, [ | |
('end_year', year), | |
('end_month', int), | |
('end_day', int), | |
])) | |
if match.group('start_year'): | |
start = datetime.date(*convert(match, [ | |
('start_year', year), | |
('start_month', calendar), | |
('start_day', calendar), | |
])) | |
return cls(start, end) | |
return cls(None, end) | |
def financial(text): | |
return Decimal(text.translate({ord(','): None})) | |
class Summary(namedtuple('Summary', Tokens.SUMMARY.value.groupindex.keys())): | |
@classmethod | |
def parse(cls, match): | |
return cls(*convert(match, [ | |
('start_balance', financial), | |
('num_debits', int), | |
('debits', financial), | |
('num_deposits', int), | |
('deposits', financial), | |
('service_charges', financial), | |
('final_balance', financial), | |
])) | |
Deposits = namedtuple('Deposits', 'transactions') | |
Debits = namedtuple('Debits', 'transactions') | |
class Transaction(namedtuple('Transaction', 'post_date amount description run_date')): | |
@classmethod | |
def parse(cls, match, extra_descriptions): | |
post_date = mm_dd(match, 'post') | |
amt, description = convert(match, [ | |
('amount', financial), | |
('description', str), | |
]) | |
run_date = mm_dd_yy(match, 'run') if match.group('run_month') else None | |
return cls(post_date, amt, [description, *extra_descriptions], run_date) | |
def ledger(self, posting_from, posting_to): | |
return "\n".join([ | |
"%s %s" % (self.post_date.isoformat(), self.description[0]), | |
*["\t; %s" % (d, ) for d in self.description[1:]], | |
"\t%s\t\t-$%s" % (posting_from, self.amount), | |
"\t%s\t\t$%s" % (posting_to, self.amount), | |
]) | |
def transactions(scanner): | |
while not scanner.accept(Tokens.END_OF_TRANSACTIONS): | |
match = scanner.expect(Tokens.TRANSACTION) | |
indent_length = len(match.group('indent')) | |
indent_range = range(indent_length - 4, indent_length + 1) | |
yield Transaction.parse(match, [*extra_descriptions(scanner, indent_range)]) | |
def extra_descriptions(scanner, indent_length_range): | |
while True: | |
match = scanner.match(Tokens.EXTRA_DESCRIPTION) | |
if not match or \ | |
len(match.group('indent')) not in indent_length_range: | |
break | |
scanner.accept(Tokens.EXTRA_DESCRIPTION) | |
yield match.group('description') | |
Balances = namedtuple('Balances', 'assertions') | |
class Assertion(namedtuple('Assertion', 'date balance')): | |
@classmethod | |
def parse(cls, match): | |
return cls( | |
mm_dd(match), | |
financial(match.group('balance'))) | |
def ledger(self, account_name): | |
return "\n".join([ | |
"%s Balance Assertion" % (self.date, ), | |
"\t%s\t\t$0 = $%s" % (account_name, self.balance), | |
]) | |
def assertions(scanner): | |
while True: | |
match = scanner.accept(Tokens.BALANCE) | |
if not match: | |
break | |
for m in Tokens.BALANCE.value.finditer(match.string): | |
yield Assertion.parse(m) | |
def fix_period_start_date(tree): | |
period, summary, deposits, debits, balances = tree | |
if not period.start: | |
candidate_dates = [*filter( | |
lambda d: d.date.month > period.end.month, | |
balances.assertions)] or balances.assertions | |
earliest = min( | |
candidate_dates, | |
key=lambda k: (k.date.month, k.date.day)) | |
if earliest.date.month < period.end.month: | |
year = period.end.year | |
else: | |
year = period.end.year - 1 | |
start = datetime.date( | |
year=year, | |
month=earliest.date.month, | |
day=earliest.date.day) | |
period = period._replace(start=start) | |
assert period.start < period.end | |
yield from (period, summary, deposits, debits, balances) | |
def merge_nodes(tree): | |
deposits = [([], )] | |
debits = [([], )] | |
balances = [([], )] | |
for node in tree: | |
if isinstance(node, Deposits): | |
deposits.append(node) | |
elif isinstance(node, Debits): | |
debits.append(node) | |
elif isinstance(node, Balances): | |
balances.append(node) | |
else: | |
yield node | |
yield Deposits(*(sum(xs, []) for xs in zip(*deposits))) | |
yield Debits(*(sum(xs, []) for xs in zip(*debits))) | |
yield Balances(*(sum(xs, []) for xs in zip(*balances))) | |
def fix_date(period, item, key): | |
date = getattr(item, key) | |
month, day = date.month, date.day | |
if period.start.month <= month: | |
year = period.start.year | |
else: | |
year = period.end.year | |
return item._replace(**{key: datetime.date(year, month, day)}) | |
def fix_tx_dates(tree): | |
period, summary, deposits, debits, balances = tree | |
yield period | |
yield summary | |
deposit_txs = [fix_date(period, tx, 'post_date') for tx in deposits.transactions] | |
yield deposits._replace(transactions=deposit_txs) | |
debit_txs = [fix_date(period, tx, 'post_date') for tx in debits.transactions] | |
yield debits._replace(transactions=debit_txs) | |
balance_txs = [fix_date(period, tx, 'date') for tx in balances.assertions] | |
yield balances._replace(assertions=balance_txs) | |
def validate_summary(tree): | |
period, summary, deposits, debits, balances = tree | |
num, total = len(deposits.transactions), sum([t.amount for t in deposits.transactions]) | |
assert total == summary.deposits | |
assert num == summary.num_deposits | |
num, total = len(debits.transactions), sum([t.amount for t in debits.transactions]) | |
if summary.service_charges: | |
assert total == summary.debits + summary.service_charges | |
assert num >= summary.num_debits | |
else: | |
assert total == summary.debits | |
assert num == summary.num_debits | |
start = min(balances.assertions, key=attrgetter('date')) | |
final = max(balances.assertions, key=attrgetter('date')) | |
assert start.balance == summary.start_balance | |
assert final.balance == summary.final_balance | |
yield from (period, summary, deposits, debits, balances) | |
def main(): | |
account_name = 'USAA SECURE CHECKING' | |
for fn in sys.argv[1:]: | |
with open(fn, 'r') as fin: | |
scanner = Scanner(fin) | |
tree = parse(scanner) | |
tree = merge_nodes(tree) | |
tree = fix_period_start_date(tree) | |
tree = fix_tx_dates(tree) | |
tree = validate_summary(tree) | |
_, _, deposits, debits, balances = tree | |
for tx in deposits.transactions: | |
print(tx.ledger('Income:Unsorted', account_name)) | |
print() | |
for tx in debits.transactions: | |
print(tx.ledger(account_name, 'Expenses:Unsorted')) | |
print() | |
for tx in balances.assertions: | |
print(tx.ledger(account_name)) | |
print() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment