Skip to content

Instantly share code, notes, and snippets.

@timwis
Last active March 19, 2023 19:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timwis/2e6aa51b803839a77229beea3342c777 to your computer and use it in GitHub Desktop.
Save timwis/2e6aa51b803839a77229beea3342c777 to your computer and use it in GitHub Desktop.
###
### A complete description of a Prefect Deployment for flow 'sync'
###
name: sync
description: null
version: 18bceefebb44d365369778a74b2b7187
# The work queue that will handle this deployment's runs
work_queue_name: test
work_pool_name: null
tags: []
parameters: {}
schedule:
cron: "0 6 * * *"
timezone: UTC
is_schedule_active: null
infra_overrides: {}
infrastructure:
type: process
env: {}
labels: {}
name: null
command: null
stream_output: true
working_dir: null
block_type_slug: process
_block_type_slug: process
###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: sync
manifest_path: null
storage: null
# path: /Users/timwis/sites/spend-sync/data-pipelines
path: /home/prefect/spend-sync/data-pipelines # <-- note I edited this
entrypoint: sync.py:sync
parameter_openapi_schema:
title: Parameters
type: object
properties: {}
required: null
definitions: null
timestamp: '2023-03-17T10:11:08.484934+00:00'
import datetime
import os
from pydantic import SecretStr, BaseModel
import httpx
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
from sqlalchemy import or_
from sqlalchemy.orm import sessionmaker, joinedload
from prefect_true_layer import TrueLayerCredentials
import prefect_true_layer.tasks
from prefect_monzo import MonzoCredentials
import prefect_monzo.tasks
from models import JobDefinition, Account, Connection
DEBUG = True if os.getenv("DEBUG") else False
database_block = SqlAlchemyConnector.load("db")
Session = sessionmaker(database_block.get_engine(echo=DEBUG), expire_on_commit=False)
true_layer_credentials = TrueLayerCredentials.load("truelayer")
monzo_credentials = MonzoCredentials.load("monzo")
class Token(BaseModel):
access_token: SecretStr
refresh_token: SecretStr
expires_in: int
class Money:
def __init__(self, major: float):
self.value = int(major * 100)
def as_minor(self):
return self.value
def now_utc():
return datetime.datetime.now(datetime.timezone.utc)
@task()
def get_job_definitions():
one_day_ago = now_utc() - datetime.timedelta(days=1)
with Session() as session:
query = session.query(JobDefinition).options(
joinedload(JobDefinition.card_account) \
.joinedload(Account.connection)
).options(
joinedload(JobDefinition.cash_account) \
.joinedload(Account.connection)
).options(
joinedload(JobDefinition.reserve_account)
.joinedload(Account.connection)
).filter(or_(
JobDefinition.last_synced_at < one_day_ago,
JobDefinition.last_synced_at == None
))
return query.all()
@task
def save_renewed_token(connection: Connection, renewed_token: Token):
access_token = renewed_token.access_token
refresh_token = renewed_token.refresh_token
expires_in = renewed_token.expires_in
with Session() as session:
connection.access_token = access_token.get_secret_value()
connection.refresh_token = refresh_token.get_secret_value()
connection.expires_at = now_utc() + datetime.timedelta(0, expires_in)
session.add(connection)
session.commit()
session.refresh(connection)
@task
def update_last_synced(job_definition: JobDefinition):
with Session() as session:
job_definition.last_synced_at = now_utc()
session.add(job_definition)
session.commit()
@flow()
def sync():
job_definitions = get_job_definitions()
results = []
for job_def in job_definitions:
card_connection = job_def.card_account.connection
try:
if card_connection.expires_at <= now_utc():
renewed_token = prefect_true_layer.tasks.renew_true_layer_token(
credentials=true_layer_credentials,
refresh_token=card_connection.decrypted_refresh_token)
save_renewed_token(card_connection, renewed_token)
card_transactions = prefect_true_layer.tasks.get_card_transactions(
credentials=true_layer_credentials,
account_id=job_def.card_account.external_account_id,
access_token=card_connection.decrypted_access_token,
since=job_def.last_synced_at)
total = sum([txn["amount"] for txn in card_transactions])
print(f"Spent {total}")
amount_to_sync = Money(major=total)
cash_connection = job_def.cash_account.connection
if cash_connection.expires_at <= now_utc():
renewed_token = prefect_monzo.tasks.renew_monzo_token(
credentials=monzo_credentials,
refresh_token=cash_connection.decrypted_refresh_token
)
save_renewed_token(cash_connection, renewed_token)
deposit_result = prefect_monzo.tasks.deposit_into_pot(
credentials=monzo_credentials,
access_token=cash_connection.decrypted_access_token,
source_account_id=job_def.cash_account.external_account_id,
destination_pot_id=job_def.reserve_account.external_account_id,
amount=amount_to_sync
)
results.append(deposit_result)
update_last_synced(job_def)
except httpx.HTTPStatusError as err:
print(err.response.text)
raise err
return results
if __name__ == "__main__":
print(sync())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment