Skip to content

Instantly share code, notes, and snippets.

@julian-klode
Created November 5, 2019 20:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save julian-klode/be46a2e9964f2ad8878c68c1e6ce04d7 to your computer and use it in GitHub Desktop.
Save julian-klode/be46a2e9964f2ad8878c68c1e6ce04d7 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import json
import requests
import os
import sys
import time
from collections import OrderedDict
from typing import Any, Dict, Iterator
user = "FIXME"
password = "FIXME"
LOGIN_CACHE = os.path.expanduser("~/.cache/n26-login.json")
# When syncing, reject cached transactions older than this
USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/59.0.3071.86 Safari/537.36"
)
BASIC_HEADERS = {
"Authorization": "Basic bXktdHJ1c3RlZC13ZHBDbGllbnQ6c2VjcmV0",
"User-Agent": USER_AGENT,
}
def authorize_mfa_request_token() -> str:
"""Request a new MFA token."""
data = [("username", user), ("password", password), ("grant_type", "password")]
response = requests.post(
"https://api.tech26.de/oauth/token", data=data, headers=BASIC_HEADERS
)
try:
mfa_token = response.json()["mfaToken"]
assert isinstance(mfa_token, str)
return mfa_token
except KeyError as e:
response.raise_for_status()
raise e
def authorize_mfa_start(mfa_token: str) -> None:
"""Start the multi-factor authentication on the associated device."""
response = requests.post(
"https://api.tech26.de/api/mfa/challenge",
json={"challengeType": "oob", "mfaToken": mfa_token},
headers=BASIC_HEADERS,
)
response.raise_for_status()
def authorize_mfa_end(mfa_token: str) -> requests.Response:
"""Complete the multi-factor authentication.
This tries 6 times, waiting 10 seconds each.
"""
print("Waiting to approve login request", end="", file=sys.stderr, flush=True)
for _ in range(6):
time.sleep(10)
print(".", end="", file=sys.stderr, flush=True)
response = requests.post(
"https://api.tech26.de/oauth/token",
data={"grant_type": "mfa_oob", "mfaToken": mfa_token},
headers=BASIC_HEADERS,
)
if response.ok:
break
print(file=sys.stderr)
response.raise_for_status()
return response
def authorize_refresh() -> requests.Response:
"""Re-authorize from refresh_token"""
with open(LOGIN_CACHE, "r") as cachef:
cache = json.load(cachef)
response = requests.post(
"https://api.tech26.de/oauth/token",
data={"grant_type": "refresh_token", "refresh_token": cache["refresh_token"]},
headers=BASIC_HEADERS,
)
response.raise_for_status()
return response
def authorize() -> str:
"Return the bearer."
try:
response = authorize_refresh()
except Exception as e:
print(e, file=sys.stderr)
mfa_token = authorize_mfa_request_token()
authorize_mfa_start(mfa_token)
response = authorize_mfa_end(mfa_token)
with open(LOGIN_CACHE, "w") as cachef:
result = response.json()
result["expires_at"] = result["expires_in"] + int(time.time())
json.dump(result, cachef)
access_token = response.json()["access_token"]
assert isinstance(access_token, str)
return access_token
def get_transactions(bearer: str) -> Iterator[Dict[str, Any]]:
last_id = ""
new_last_id = ""
while True:
print("Requesting transactions after %s" % last_id, file=sys.stderr)
response = requests.get(
"https://api.tech26.de/api/smrt/transactions?limit=50" + last_id,
headers={"Authorization": "bearer %s" % bearer, "User-Agent": USER_AGENT},
)
response.raise_for_status()
body = response.json()
for transaction in body:
yield transaction
new_last_id = "&lastId=%s" % transaction["id"]
if new_last_id == last_id:
return
last_id = new_last_id
def new_transactions(bearer: str, old: Dict[str, Dict[str, Any]]) -> None:
new_transactions = []
found_overlap = 0
for transaction in get_transactions(bearer):
if transaction["id"] not in old or transaction["type"] == "AA":
found_overlap = 0
elif transaction["id"] in old and transaction["type"] != "AA":
found_overlap += 1
last_overlap = transaction
new_transactions.append(transaction)
# We found 120 completed transactions, let's assume we are done
if found_overlap == 100:
found = False
for k in old:
old_transaction = old[k]
if old_transaction["id"] == last_overlap["id"]:
found = True
elif found:
new_transactions.append(old_transaction)
break
json.dump(new_transactions, fp=sys.stdout, indent=4)
return
with open("n26.json") as fobj:
old = OrderedDict((t["id"], t) for t in json.load(fobj))
bearer = authorize()
new_transactions(bearer, old)
@julian-klode
Copy link
Author

Beware that the sync algorithm is not entirely accurate, it may occassionally produce wrong results, and you need to resync or increase the overlap value.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment