Created
March 30, 2024 15:38
-
-
Save zzstoatzz/e4316401cb128b27db65f51ce93d620c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import functools | |
import asyncpg as apg | |
from prefect import flow, task | |
async def create_db_pool() -> apg.Pool: | |
pool = await apg.create_pool( | |
host="localhost", | |
port=5432, | |
user="myuser", | |
password="mypassword", | |
database="mydb", | |
min_size=1, | |
max_size=5, | |
) | |
print("Created new connection pool") | |
return pool | |
async def log_insert_event(task, task_run, state, name: str, status: str): | |
pool = await create_db_pool() | |
async with pool.acquire() as con: | |
await con.execute( | |
""" | |
INSERT INTO log_table (name, status) VALUES ($1, $2) | |
""", | |
name, | |
status, | |
) | |
print(f"Logged insert event for {name} with status {status}") | |
await pool.close() | |
@task | |
async def create_tables(): | |
pool = await create_db_pool() | |
async with pool.acquire() as con: | |
await con.execute( | |
""" | |
CREATE TABLE IF NOT EXISTS test_table ( | |
id SERIAL PRIMARY KEY, | |
name VARCHAR(255) | |
) | |
""" | |
) | |
await con.execute( | |
""" | |
CREATE TABLE IF NOT EXISTS log_table ( | |
id SERIAL PRIMARY KEY, | |
name VARCHAR(255), | |
status VARCHAR(255) | |
) | |
""" | |
) | |
await pool.close() | |
@task | |
async def insert_data(name: str): | |
pool = await create_db_pool() | |
async with pool.acquire() as con: | |
await con.execute( | |
""" | |
INSERT INTO test_table (name) VALUES ($1) | |
""", | |
name, | |
) | |
await pool.close() | |
@flow(log_prints=True) | |
async def main_flow(name: str = "Alice", status: str = "success"): | |
await create_tables() | |
await insert_data.with_options( | |
on_completion=[functools.partial(log_insert_event, name=name, status=status)] | |
)(name) | |
if __name__ == "__main__": | |
asyncio.run(main_flow()) |
Author
zzstoatzz
commented
Mar 30, 2024
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment