-
-
Save apauley/398fa031c202733959af76b3b8ce8197 to your computer and use it in GitHub Desktop.
Maintain .pricedb file for ledger3 for gbp/usd to zar
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# A script by Keegan Carruthers-Smith, with some small modifications | |
# https://gist.github.com/keegancsmith/0ac3542481b90cab3d6d421be14fe33e | |
from html.parser import HTMLParser | |
import datetime | |
import itertools | |
import os.path, glob | |
import urllib.request | |
currencies = { | |
"CHF": "swiss-franc", | |
"EUR": "euro", | |
"GBP": "british-pound", | |
"USD": "us-dollar", | |
"ZAR": "south-african-rand", | |
} | |
symbols = { | |
"CHF": "CHF", | |
"EUR": "€", | |
"GBP": "£", | |
"USD": "$", | |
"ZAR": "R", | |
} | |
fetch_currencies = [("EUR", "ZAR"), ("GBP", "ZAR"), ("USD", "ZAR"), ("ZAR", "CHF")] | |
fetch_years = range(2014, datetime.date.today().year + 1) | |
def fetch(currency_pair, year): | |
a, b = currencies[currency_pair[0]], currencies[currency_pair[1]] | |
url = f"https://www.poundsterlinglive.com/best-exchange-rates/best-{a}-to-{b}-history-{year}" | |
print(f"Fetching {currency_pair} {year} from {url}") | |
return urllib.request.urlopen(url).read().decode("utf-8") | |
class ExchangeHistoryHTMLParser(HTMLParser): | |
@staticmethod | |
def parse(data): | |
p = ExchangeHistoryHTMLParser() | |
p.feed(data) | |
pricedb = {} | |
for row in p.rows: | |
currency = row["Main Currency"].split()[1] | |
price, price_currency = row["Mid Rate"].split() | |
if float(price) == 0: | |
continue | |
symbol, price_symbol = symbols[currency], symbols[price_currency] | |
date = datetime.datetime.strptime(row["Date"], "%A %d %B %Y").date() | |
k = (date, symbol) | |
assert k not in pricedb, k | |
pricedb[k] = price_symbol + price | |
return pricedb | |
@staticmethod | |
def _row_to_item(row): | |
return (date.date(), symbol), price_symbol + price | |
def __init__(self): | |
super().__init__() | |
self.rows = [] | |
self.row = {} | |
self.title = "" | |
def handle_starttag(self, tag, attrs): | |
if tag == "tr": | |
self.row = {} | |
if tag == "td": | |
self.title = dict(attrs).get("data-title") | |
def handle_endtag(self, tag): | |
if tag == "tr" and self.row: | |
self.rows.append(self.row) | |
if tag == "td": | |
self.title = None | |
def handle_data(self, data): | |
if self.title: | |
self.row[self.title] = self.row.get(self.title, "") + data | |
def pricedb_unmarshal(data): | |
pricedb = {} | |
for line in data.splitlines(): | |
assert line.startswith("P "), line | |
_, date, symbol, price = line.split() | |
date = datetime.date(*map(int, date.split("/"))) | |
k = (date, symbol) | |
assert k not in pricedb, k | |
pricedb[k] = price | |
return pricedb | |
def pricedb_marshal(pricedb): | |
lines = [] | |
for (date, symbol), price in sorted(pricedb.items()): | |
lines.append(f"P {date:%Y/%m/%d} {symbol} {price}") | |
lines.append("") | |
return "\n".join(lines) | |
def pricedb_read(currency_pair, year): | |
pricedb_path = f"./prices/{year}/prices-{currency_pair[0]}-{currency_pair[1]}.journal" | |
if not os.path.exists(pricedb_path): | |
print(f"File not found: {pricedb_path}") | |
return {} | |
with open(pricedb_path) as f: | |
pricedb = pricedb_unmarshal(f.read()) | |
print(f"Read {len(pricedb)} entries from {pricedb_path}") | |
return pricedb | |
def pricedb_write(pricedb, currency_pair, year): | |
pricedb_path = f"./prices/{year}/prices-{currency_pair[0]}-{currency_pair[1]}.journal" | |
print(f"Writing {len(pricedb)} entries to {pricedb_path}") | |
data = pricedb_marshal(pricedb) | |
with open(pricedb_path, "w") as f: | |
f.write(data) | |
def include_contents(pricefiles): | |
return "\n".join([f"!include {pf}" for pf in sorted(pricefiles)])+"\n" | |
def write_yearly_prices(): | |
for year in fetch_years: | |
pricefiles = [os.path.basename(pf) for pf in glob.glob(f"./prices/{year}/prices-*.journal")] | |
with open(f"./prices/{year}/prices.journal", "w") as f: | |
f.write(include_contents(pricefiles)) | |
def write_combined_prices(): | |
pricefiles = [f"{year}/prices.journal" for year in fetch_years] | |
with open(f"./prices/prices.journal", "w") as f: | |
f.write(include_contents(pricefiles)) | |
def main(): | |
for currency_pair, year in itertools.product(fetch_currencies, fetch_years): | |
pricedb = pricedb_read(currency_pair, year) | |
if (datetime.date(year, 12, 31), symbols[currency_pair[0]]) in pricedb: | |
print(f"Skipping {currency_pair} {year}") | |
continue | |
page = fetch(currency_pair, year) | |
page_pricedb = ExchangeHistoryHTMLParser.parse(page) | |
for k, v in page_pricedb.items(): | |
if k in pricedb: | |
assert pricedb[k] == v, (k, pricedb[k], v) | |
(date, symbol), price = k, v | |
pricedb[k] = v | |
pricedb_write(pricedb, currency_pair, year) | |
write_yearly_prices() | |
write_combined_prices() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I made a few small modifications to the original script, some should be generally useful to anybody, others may be specific to my needs.
€
(Euro) and£
(Pound). I like the symbols in my reports. YMMV.update-pricedb.py
writes out currency pair prices per year, and then generates include files per year. An additional top-levelprices/prices.journal
has the same data as the previous~/.pricedb
, just achieved with include directives.Example all-inclusive pricedb:
Example prices for 2020:
And at the lowest level there will be actual prices per currency pair per year: