Skip to content

Instantly share code, notes, and snippets.

@quad
Last active Jan 23, 2020
Embed
What would you like to do?
Convert USAA Bank Statements to Ledger Transactions
import datetime
import enum
import re
import sys
import types
from collections import namedtuple
from decimal import Decimal
from operator import attrgetter
class Tokens(enum.Enum):
STATEMENT_DATE = re.compile(r"""
.*?
(?:
(?P<start_month>\d \s* \d)
\s* \/ \s*
(?P<start_day>\d \s* \d)
\s* \/ \s*
(?P<start_year>\d \s* \d)
\s* - \s*
)?
(?P<end_month>\d \s* \d)
\s* / \s*
(?P<end_day>\d \s* \d)
\s* / \s*
(?P<end_year>\d \s* \d)
$
""", re.VERBOSE)
SUMMARY = re.compile(r"""
\s+
(?P<start_balance>[,\d]*\.\d\d)
\s+
(?P<num_debits>\d+)
\s+
(?P<debits>[,\d]*\.\d\d)
\s+
(?P<num_deposits>\d+)
\s+
(?P<deposits>[,\d]*\.\d\d)
\s+
(?P<service_charges>[,\d]*\.\d\d)
\s+
(?P<final_balance>[,\d]*\.\d\d)
""", re.VERBOSE)
DEPOSITS = re.compile(r"\s+DEPOSITS AND OTHER CREDITS")
DEBITS = re.compile(r"\s+OTHER DEBITS")
DATE_AMOUNT = re.compile(r"\s+DATE\.+AMOUNT")
TRANSACTION = re.compile(r"""
(?P<indent>\s+
(?P<post_month>\d\d) /
(?P<post_day>\d\d)
\s+
(?P<amount>[,\d]*\.\d\d)
\s*
)
(?P<description>.*?)
\s*
(?:
(?P<run_month>\d\d)
(?P<run_day>\d\d)
(?P<run_year>\d\d)
)?$
""", re.VERBOSE)
EXTRA_DESCRIPTION = re.compile(r"(?P<indent>\s+)(?P<description>.*)")
END_OF_TRANSACTIONS = re.compile(r"(?:$|\x0c|\s+\d{5,6}-\d{4}_\d\d)")
BALANCE_SUMMARY = re.compile(r"\s+ACCOUNT BALANCE SUMMARY")
DATE_BALANCE = re.compile(r"\s+DATE\.+BALANCE")
BALANCE = re.compile(r"""
\s+
(?P<month>\d\d) /
(?P<day>\d\d)
\s+
(?P<balance>[,\d]*\.\d\d)
""", re.VERBOSE)
class Scanner:
def __init__(self, generator):
self.input = enumerate(generator, start=1)
self._input_name = generator.name
self._next()
def _next(self):
self._current_lineno, self._current_line = next(self.input, (None, None))
done = property(lambda self: self._current_line is None)
def match(self, token):
return token.value.match(self._current_line)
def accept(self, token):
match = self.match(token)
if match:
self._next()
return match
def expect(self, token):
match = self.accept(token)
if match:
return match
else:
raise SyntaxError("expected %s at %s:%u\ninstead got: %s" % \
(token.name, \
self._input_name, \
self._current_lineno, \
self._current_line))
def until(self, *tokens):
start_ln = self._current_lineno
while not self.done:
for t in tokens:
match = self.accept(t)
if match and len(tokens) == 1:
return match
elif match and len(tokens) > 1:
return t, match
self._next()
raise SyntaxError("found no %s from %s:%u" % \
(', '.join(t.name for t in tokens), self._input_name, start_ln))
def parse(scanner):
yield Period.parse(scanner.until(Tokens.STATEMENT_DATE))
yield Summary.parse(scanner.until(Tokens.SUMMARY))
while True:
token, _ = scanner.until(
Tokens.DEPOSITS,
Tokens.DEBITS,
Tokens.BALANCE_SUMMARY
)
if token == Tokens.BALANCE_SUMMARY:
break
scanner.expect(Tokens.DATE_AMOUNT)
dispatch = {Tokens.DEPOSITS: Deposits, Tokens.DEBITS: Debits}
yield dispatch[token]([*transactions(scanner)])
scanner.expect(Tokens.DATE_BALANCE)
yield Balances([*assertions(scanner)])
def convert(match, items):
for name, type_ in items:
yield type_(match.group(name))
def calendar(text):
return int(text.translate({ord(' '): None}))
def year(text):
if text:
rv = calendar(text)
if rv < 100:
return 2000 + rv
else:
return rv
def mm_dd(match, prefix=''):
if prefix:
prefix += '_'
return types.SimpleNamespace(
month=int(match.group(prefix + 'month')),
day=int(match.group(prefix + 'day')),
)
def mm_dd_yy(match, prefix=''):
if prefix:
prefix += '_'
return datetime.date(*convert(match, [
(prefix + 'year', year),
(prefix + 'month', int),
(prefix + 'day', int),
]))
class Period(namedtuple('Period', 'start end')):
@classmethod
def parse(cls, match):
end = datetime.date(*convert(match, [
('end_year', year),
('end_month', int),
('end_day', int),
]))
if match.group('start_year'):
start = datetime.date(*convert(match, [
('start_year', year),
('start_month', calendar),
('start_day', calendar),
]))
return cls(start, end)
return cls(None, end)
def financial(text):
return Decimal(text.translate({ord(','): None}))
class Summary(namedtuple('Summary', Tokens.SUMMARY.value.groupindex.keys())):
@classmethod
def parse(cls, match):
return cls(*convert(match, [
('start_balance', financial),
('num_debits', int),
('debits', financial),
('num_deposits', int),
('deposits', financial),
('service_charges', financial),
('final_balance', financial),
]))
Deposits = namedtuple('Deposits', 'transactions')
Debits = namedtuple('Debits', 'transactions')
class Transaction(namedtuple('Transaction', 'post_date amount description run_date')):
@classmethod
def parse(cls, match, extra_descriptions):
post_date = mm_dd(match, 'post')
amt, description = convert(match, [
('amount', financial),
('description', str),
])
run_date = mm_dd_yy(match, 'run') if match.group('run_month') else None
return cls(post_date, amt, [description, *extra_descriptions], run_date)
def ledger(self, posting_from, posting_to):
return "\n".join([
"%s %s" % (self.post_date.isoformat(), self.description[0]),
*["\t; %s" % (d, ) for d in self.description[1:]],
"\t%s\t\t-$%s" % (posting_from, self.amount),
"\t%s\t\t$%s" % (posting_to, self.amount),
])
def transactions(scanner):
while not scanner.accept(Tokens.END_OF_TRANSACTIONS):
match = scanner.expect(Tokens.TRANSACTION)
indent_length = len(match.group('indent'))
indent_range = range(indent_length - 4, indent_length + 1)
yield Transaction.parse(match, [*extra_descriptions(scanner, indent_range)])
def extra_descriptions(scanner, indent_length_range):
while True:
match = scanner.match(Tokens.EXTRA_DESCRIPTION)
if not match or \
len(match.group('indent')) not in indent_length_range:
break
scanner.accept(Tokens.EXTRA_DESCRIPTION)
yield match.group('description')
Balances = namedtuple('Balances', 'assertions')
class Assertion(namedtuple('Assertion', 'date balance')):
@classmethod
def parse(cls, match):
return cls(
mm_dd(match),
financial(match.group('balance')))
def ledger(self, account_name):
return "\n".join([
"%s Balance Assertion" % (self.date, ),
"\t%s\t\t$0 = $%s" % (account_name, self.balance),
])
def assertions(scanner):
while True:
match = scanner.accept(Tokens.BALANCE)
if not match:
break
for m in Tokens.BALANCE.value.finditer(match.string):
yield Assertion.parse(m)
def fix_period_start_date(tree):
period, summary, deposits, debits, balances = tree
if not period.start:
candidate_dates = [*filter(
lambda d: d.date.month > period.end.month,
balances.assertions)] or balances.assertions
earliest = min(
candidate_dates,
key=lambda k: (k.date.month, k.date.day))
if earliest.date.month < period.end.month:
year = period.end.year
else:
year = period.end.year - 1
start = datetime.date(
year=year,
month=earliest.date.month,
day=earliest.date.day)
period = period._replace(start=start)
assert period.start < period.end
yield from (period, summary, deposits, debits, balances)
def merge_nodes(tree):
deposits = [([], )]
debits = [([], )]
balances = [([], )]
for node in tree:
if isinstance(node, Deposits):
deposits.append(node)
elif isinstance(node, Debits):
debits.append(node)
elif isinstance(node, Balances):
balances.append(node)
else:
yield node
yield Deposits(*(sum(xs, []) for xs in zip(*deposits)))
yield Debits(*(sum(xs, []) for xs in zip(*debits)))
yield Balances(*(sum(xs, []) for xs in zip(*balances)))
def fix_date(period, item, key):
date = getattr(item, key)
month, day = date.month, date.day
if period.start.month <= month:
year = period.start.year
else:
year = period.end.year
return item._replace(**{key: datetime.date(year, month, day)})
def fix_tx_dates(tree):
period, summary, deposits, debits, balances = tree
yield period
yield summary
deposit_txs = [fix_date(period, tx, 'post_date') for tx in deposits.transactions]
yield deposits._replace(transactions=deposit_txs)
debit_txs = [fix_date(period, tx, 'post_date') for tx in debits.transactions]
yield debits._replace(transactions=debit_txs)
balance_txs = [fix_date(period, tx, 'date') for tx in balances.assertions]
yield balances._replace(assertions=balance_txs)
def validate_summary(tree):
period, summary, deposits, debits, balances = tree
num, total = len(deposits.transactions), sum([t.amount for t in deposits.transactions])
assert total == summary.deposits
assert num == summary.num_deposits
num, total = len(debits.transactions), sum([t.amount for t in debits.transactions])
if summary.service_charges:
assert total == summary.debits + summary.service_charges
assert num >= summary.num_debits
else:
assert total == summary.debits
assert num == summary.num_debits
start = min(balances.assertions, key=attrgetter('date'))
final = max(balances.assertions, key=attrgetter('date'))
assert start.balance == summary.start_balance
assert final.balance == summary.final_balance
yield from (period, summary, deposits, debits, balances)
def main():
account_name = 'USAA SECURE CHECKING'
for fn in sys.argv[1:]:
with open(fn, 'r') as fin:
scanner = Scanner(fin)
tree = parse(scanner)
tree = merge_nodes(tree)
tree = fix_period_start_date(tree)
tree = fix_tx_dates(tree)
tree = validate_summary(tree)
_, _, deposits, debits, balances = tree
for tx in deposits.transactions:
print(tx.ledger('Income:Unsorted', account_name))
print()
for tx in debits.transactions:
print(tx.ledger(account_name, 'Expenses:Unsorted'))
print()
for tx in balances.assertions:
print(tx.ledger(account_name))
print()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment