Skip to content

Instantly share code, notes, and snippets.

@zzstoatzz
Created March 30, 2024 15:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zzstoatzz/e4316401cb128b27db65f51ce93d620c to your computer and use it in GitHub Desktop.
Save zzstoatzz/e4316401cb128b27db65f51ce93d620c to your computer and use it in GitHub Desktop.
import asyncio
import functools
import asyncpg as apg
from prefect import flow, task
async def create_db_pool() -> apg.Pool:
pool = await apg.create_pool(
host="localhost",
port=5432,
user="myuser",
password="mypassword",
database="mydb",
min_size=1,
max_size=5,
)
print("Created new connection pool")
return pool
async def log_insert_event(task, task_run, state, name: str, status: str):
pool = await create_db_pool()
async with pool.acquire() as con:
await con.execute(
"""
INSERT INTO log_table (name, status) VALUES ($1, $2)
""",
name,
status,
)
print(f"Logged insert event for {name} with status {status}")
await pool.close()
@task
async def create_tables():
pool = await create_db_pool()
async with pool.acquire() as con:
await con.execute(
"""
CREATE TABLE IF NOT EXISTS test_table (
id SERIAL PRIMARY KEY,
name VARCHAR(255)
)
"""
)
await con.execute(
"""
CREATE TABLE IF NOT EXISTS log_table (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
status VARCHAR(255)
)
"""
)
await pool.close()
@task
async def insert_data(name: str):
pool = await create_db_pool()
async with pool.acquire() as con:
await con.execute(
"""
INSERT INTO test_table (name) VALUES ($1)
""",
name,
)
await pool.close()
@flow(log_prints=True)
async def main_flow(name: str = "Alice", status: str = "success"):
await create_tables()
await insert_data.with_options(
on_completion=[functools.partial(log_insert_event, name=name, status=status)]
)(name)
if __name__ == "__main__":
asyncio.run(main_flow())
@zzstoatzz
Copy link
Author

zzstoatzz commented Mar 30, 2024

» python flows/proofs/hook_db_pool_pg.py
10:34:21.547 | INFO    | prefect.engine - Created flow run 'overjoyed-gopher' for flow 'main-flow'
10:34:21.859 | INFO    | Flow run 'overjoyed-gopher' - Created task run 'create_tables-0' for task 'create_tables'
10:34:21.861 | INFO    | Flow run 'overjoyed-gopher' - Executing 'create_tables-0' immediately...
10:34:22.173 | INFO    | Task run 'create_tables-0' - Created new connection pool
10:34:22.279 | INFO    | Task run 'create_tables-0' - Finished in state Completed()
10:34:22.394 | INFO    | Flow run 'overjoyed-gopher' - Created task run 'insert_data-0' for task 'insert_data'
10:34:22.395 | INFO    | Flow run 'overjoyed-gopher' - Executing 'insert_data-0' immediately...
10:34:22.640 | INFO    | Task run 'insert_data-0' - Created new connection pool
10:34:22.747 | INFO    | Task run 'insert_data-0' - Running hook 'log_insert_event' in response to entering state 'Completed'
10:34:22.789 | INFO    | Task run 'insert_data-0' - Created new connection pool
10:34:22.795 | INFO    | Task run 'insert_data-0' - Logged insert event for Alice with status success
10:34:22.802 | INFO    | Task run 'insert_data-0' - Hook 'log_insert_event' finished running successfully
10:34:22.802 | INFO    | Task run 'insert_data-0' - Finished in state Completed()
10:34:22.960 | INFO    | Flow run 'overjoyed-gopher' - Finished in state Completed('All states completed.')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment