Skip to content

Instantly share code, notes, and snippets.

@lizan
Created June 24, 2020 03:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lizan/6001f991cc0e5526d9bb46a0868e5b56 to your computer and use it in GitHub Desktop.
Save lizan/6001f991cc0e5526d9bb46a0868e5b56 to your computer and use it in GitHub Desktop.
import hashlib
import base64
class Hasher:
def __init__(self, salt = ""):
self.salt = salt
def hash(self, data):
d = hashlib.sha256(data.encode('utf-8')).digest()
return base64.b64encode(d).decode('utf-8')[:8]
#!/usr/bin/env python3
from beancount.core.number import D
from beancount.ingest import importer
from beancount.core.amount import Amount
from beancount.core import flags
from beancount.core import data
from beancount.core.position import Cost
from dateutil.parser import parse
import csv
import os
import re
import json
class MintImporter(importer.ImporterProtocol):
def __init__(self,
hasher,
account_mapping,
category_mapping,
category_flag_map,
transaction_filter=lambda x: True):
self.hasher = hasher
self.account_mapping = account_mapping
self.category_mapping = category_mapping
self.category_flag_map = category_flag_map
self.unknown_categories = set()
self.unknown_accounts = set()
self.transaction_filter = transaction_filter
def identify(self, f):
return re.match('mint_transactions.*\.csv', os.path.basename(f.name))
def extract(self, f):
entries = []
with open(f.name) as f:
for index, row in enumerate(csv.reader(f)):
date, desc, original_desc, amount, transaction_type, category, account, _, notes = row
if date == "Date":
continue
trans_date = parse(date).date()
meta = data.new_metadata(f.name, index)
txn = data.Transaction(
meta=meta,
date=trans_date,
flag=self.category_flag_map.get(category, flags.FLAG_OKAY),
payee=desc,
narration="{} :{}:".format(original_desc, self.hasher.hash(json.dumps(row))),
tags=set(),
links=set(),
postings=[],
)
if not self.transaction_filter(txn):
continue
if account not in self.account_mapping:
self.unknown_accounts.add(account)
if category not in self.category_mapping:
self.unknown_categories.add(category)
try:
account = self.account_mapping[account]
to_account = self.category_mapping[category]
except KeyError:
continue
if transaction_type == "debit":
amount = "-" + amount
txn.postings.append(
data.Posting(account, Amount(D(amount), 'USD'), None, None,
None, None))
txn.postings.append(
data.Posting(to_account, None, None, None, None, None))
entries.append(txn)
if len(self.unknown_categories) + len(self.unknown_accounts) > 0:
raise LookupError('Unknown categories: {}, accounts: {}'.format(
self.unknown_categories, self.unknown_accounts))
return entries
#!/usr/bin/env python3
from beancount.core.number import D
from beancount.ingest import importer
from beancount.core.amount import Amount
from beancount.core import flags
from beancount.core import data
from beancount.core.position import CostSpec, Cost
from dateutil.parser import parse
import csv
import os
import re
import json
class Vanguard401kImporter(importer.ImporterProtocol):
def __init__(self, hasher, file_prefix, account_mapping, income_mapping,
fund_mapping):
self.hasher = hasher
self.file_prefix = file_prefix
self.account_mapping = account_mapping
self.income_mapping = income_mapping
self.fund_mapping = fund_mapping
def identify(self, f):
return re.match('vanguard_{}.*\.tsv'.format(self.file_prefix),
os.path.basename(f.name))
def extract(self, f):
entries = []
with open(f.name) as f:
for index, row in enumerate(csv.reader(f, delimiter='\t')):
date, kind, source, fund, unit, unit_price, total = row
if date == "Date":
continue
trans_date = parse(date).date()
meta = data.new_metadata(f.name, index)
if kind in ("Fee", "Withdrawal"):
multiplier = -1
else:
multiplier = 1
txn = data.Transaction(
meta=meta,
date=trans_date,
flag=flags.FLAG_OKAY,
payee=kind,
narration="{} :{}:".format(fund, self.hasher.hash(json.dumps(row))),
tags=set(),
links=set(),
postings=[],
)
account = self.account_mapping[source]
income = self.income_mapping[source]
fund_symbol = self.fund_mapping[fund]
txn.postings.append(
data.Posting(account,
Amount(D(unit) * multiplier, fund_symbol),
Cost(D(unit_price[1:]), 'USD', None, None),
None, None, None))
if kind == "Fee":
income = "Expenses:Fees:Account"
if kind == "Withdrawal":
income = "Equity:RothConversion"
txn.postings.append(
data.Posting(income,
Amount(-D(total[1:]) * multiplier, 'USD'),
None, None, None, None))
if D(total[1:]) != D(unit) * D(unit_price[1:]):
assert (abs(D(total[1:]) - D(unit) * D(unit_price[1:])) <
D("0.02"))
txn.postings.append(
data.Posting("Expenses:Rounding", None, None, None,
None, None))
entries.append(txn)
return entries
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment