Skip to content

Instantly share code, notes, and snippets.

@Pablo1107
Created April 9, 2022 14:09
Show Gist options
  • Save Pablo1107/eecb056599d7a11a2cd9d071be68f801 to your computer and use it in GitHub Desktop.
Save Pablo1107/eecb056599d7a11a2cd9d071be68f801 to your computer and use it in GitHub Desktop.
Automatic import/fetch coinbase and coinbase pro transactions csv and hledger rules
#!/usr/bin/env python3
import sys
import requests
import datetime as dt
import pathlib
import csv
import requests_cache
expire_after = dt.timedelta(days=3)
session = requests_cache.CachedSession(cache_name='cache', backend='sqlite', expire_after=expire_after)
import json, hmac, hashlib, time, requests, base64
from requests.auth import AuthBase
CB_PATH = pathlib.Path(__file__).parent.resolve()
# Create custom authentication for Exchange
class CoinbaseAuth(AuthBase):
def __init__(self, api_key, secret_key):
self.api_key = api_key
self.secret_key = bytes(secret_key, encoding="utf-8")
def __call__(self, request):
timestamp = str(int(time.time()))
message = timestamp + request.method + request.path_url + (request.body or '')
signature = hmac.new(self.secret_key, bytes(message, encoding="utf-8"), hashlib.sha256).hexdigest()
request.headers.update({
'CB-ACCESS-SIGN': signature,
'CB-ACCESS-TIMESTAMP': timestamp,
'CB-ACCESS-KEY': self.api_key,
})
return request
api_url = 'https://api.coinbase.com/v2'
auth = CoinbaseAuth(<api_key>, <secret_key>)
def get_accounts():
# https://api.coinbase.com/v2/accounts
data = []
response = session.get(f'{api_url}/accounts', auth=auth).json()
data.extend(response['data'])
starting_after = response['pagination']['next_starting_after']
while starting_after is not None:
response = session.get(f'{api_url}/accounts', auth=auth, params={'starting_after': starting_after}).json()
starting_after = response['pagination']['next_starting_after']
data.extend(response['data'])
return data
def get_account_transactions(account_id):
data = []
# https://api.coinbase.com/v2/accounts/:account_id/transactions
response = session.get(f'{api_url}/accounts/{account_id}/transactions', auth=auth).json()
data.extend(response['data'])
starting_after = response['pagination']['next_starting_after']
while starting_after is not None:
response = session.get(f'{api_url}/accounts/{account_id}/transactions',
auth=auth,
params={'starting_after': starting_after}
).json()
starting_after = response['pagination']['next_starting_after']
data.extend(response['data'])
return data
def get_accounts_transactions(accounts):
"""
Returns a list of all transactions for all accounts merged
"""
data = []
for account in accounts:
txs = get_account_transactions(account['id'])
data.extend(txs)
# order by created_at
data = sorted(data, key=lambda k: k['created_at'])
return data
def group_transactions_by_year(transactions):
"""
Returns a dict of transactions grouped by year
"""
data = {}
for transaction in transactions:
created_at = dt.datetime.strptime(transaction['created_at'], '%Y-%m-%dT%H:%M:%SZ')
year = created_at.year
if year not in data:
data[year] = []
data[year].append(transaction)
return data
def save_transactions(transactions, year):
"""
Saves transactions to a file as csv with the following headers:
timestamp,type,asset,quantity,fees,description
"""
with open(f'in/transactions-{year}.csv', 'w', newline='') as csvfile:
fieldnames = ['timestamp', 'type', 'asset', 'quantity', 'fees', 'description']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for tx in transactions:
if tx['type'] == 'send':
if float(tx['amount']['amount']) > 0:
tx['type'] = 'receive'
else:
tx['amount']['amount'] = abs(float(tx['amount']['amount']))
writer.writerow({
'timestamp': tx['created_at'],
'type': tx['type'],
'asset': tx['amount']['currency'],
'quantity': tx['amount']['amount'],
'fees': '',
'description': tx['details']["title"] + ' ' + tx["details"]["subtitle"]
})
def main():
accounts = get_accounts()
transactions = get_accounts_transactions(accounts)
transactions = group_transactions_by_year(transactions)
for year in transactions:
save_transactions(transactions[year], year)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import sys
import requests
import datetime as dt
import pathlib
import os
import requests_cache
expire_after = dt.timedelta(days=1)
session = requests_cache.CachedSession(cache_name='cache', backend='sqlite', expire_after=expire_after)
import json, hmac, hashlib, time, requests, base64
from requests.auth import AuthBase
CB_PATH = pathlib.Path(__file__).parent.resolve()
# Create custom authentication for Exchange
class CoinbaseExchangeAuth(AuthBase):
def __init__(self, api_key, secret_key, passphrase):
self.api_key = api_key
self.secret_key = secret_key
self.passphrase = passphrase
def __call__(self, request):
timestamp = str(time.time())
body = request.body.decode() if request.body else ''
message = timestamp + request.method + request.path_url + body
print(message)
hmac_key = base64.b64decode(self.secret_key)
signature = hmac.new(hmac_key, bytes(message, encoding="utf-8"), hashlib.sha256)
signature_b64 = base64.b64encode(signature.digest())
request.headers.update({
'CB-ACCESS-SIGN': signature_b64,
'CB-ACCESS-TIMESTAMP': timestamp,
'CB-ACCESS-KEY': self.api_key,
'CB-ACCESS-PASSPHRASE': self.passphrase,
})
return request
api_url = 'https://api.exchange.coinbase.com/'
auth = CoinbaseExchangeAuth(<api_key>, <secret_key>, <passphrase>)
def get_accounts():
response = session.get(api_url + 'accounts', auth=auth)
def create_report():
start_date = dt.datetime.now().replace(month=1, day=1).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end_date = dt.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
payload = {
"start_date": start_date,
"end_date": end_date,
"type": "account",
"format": "csv",
"product_id": "ALL",
"account_id": "ALL",
"email": "dealberapablo07@gmail.com"
}
response = session.post(api_url + 'reports', json=payload, auth=auth)
return response.json()
def get_report(report_id):
url = f"https://api.exchange.coinbase.com/reports/{report_id}"
headers = {"Accept": "application/json"}
response = session.request("GET", url, headers=headers, auth=auth)
return response.json()
def download_report(file_url):
response = session.get(file_url)
return response.content
def save_report(report):
today = dt.datetime.now().strftime("%Y-%m-%d")
with open(f'{CB_PATH}/pro/in/account-{today}.csv', 'wb') as f:
f.write(download_report(report['file_url']))
def erase_year_reports():
year = dt.datetime.now().year
for file in os.listdir(f'{CB_PATH}/pro/in'):
if file.startswith(f'account-{year}') and file.endswith('.csv'):
os.remove(f'{CB_PATH}/pro/in/{file}')
def main(report_id=None):
if not report_id:
report_id = create_report()['id']
report = get_report(report_id)
while report['status'] != 'ready':
time.sleep(5)
report = get_report(report_id)
erase_year_reports()
save_report(report)
if __name__ == '__main__' and not sys.flags.inspect:
try:
main(sys.argv[1])
except IndexError:
main()
# skip the headings line:
skip 1
fields date,type,coin,amount,fees,description
date-format %Y-%m-%dT%H:%M:%SZ
currency
amount %amount %coin
account1 assets:crypto:coinbase:%coin
account2 expenses:coinbase:unknown
if %type receive
account2 income:xoor
if %type send
amount -%amount %coin
account2 equity:transfers
if %description bitpay
amount -%amount %coin
account2 expenses:crypto
if
%type pro_deposit
%type pro_withdrawal
account2 assets:crypto:coinbase:pro:%coin
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment