Skip to content

Instantly share code, notes, and snippets.

@skrawcz
Created June 13, 2023 18:42
Show Gist options
  • Save skrawcz/677daa5e72cba8b9c26d91728468f9e0 to your computer and use it in GitHub Desktop.
Save skrawcz/677daa5e72cba8b9c26d91728468f9e0 to your computer and use it in GitHub Desktop.
Shows how to do async based functions in hamilton -- and then a suggestion for another way to parallelize
from hamilton.function_modifiers import extract_columns
import pandas as pd
async def _run_query(col_name: str, query_string: str) -> pd.DataFrame:
# this would go to the database -- ideally the client is passed in as a parameter
# the assumption here is that the database driver is asyncio based, else there's no
# value in doing this :)
return pd.DataFrame({col_name: [query_string]})
# async Hamilton func for query #1
async def sql_1(param1: str) -> pd.DataFrame:
query_string = "..." + param1 + "..."
return await _run_query("a", query_string)
# query #1 dataframe -> columns
@extract_columns("a")
def sql_1_extract(sql_1: pd.DataFrame) -> pd.DataFrame:
return sql_1
# async Hamilton func for query #n
async def sql_n(param2: str) -> pd.DataFrame:
query_string = "..." + param2 + "..."
return await _run_query("b", query_string)
# query #n dataframe -> columns
@extract_columns("b")
def sql_n_extract(sql_n: pd.DataFrame) -> pd.DataFrame:
return sql_n
import asyncio
from hamilton import base
from hamilton import driver
from hamilton.experimental import h_async
from hamilton.experimental import h_dask
import async_funcs
dr = h_async.AsyncDriver({}, async_funcs)
loop = asyncio.get_event_loop()
execute_coroutine = dr.execute(["a", "b"], inputs={"param1": "1", "param2": "2"})
dr.visualize_execution(["a", "b"], "./example", {"format": "png"}, inputs={"param1": "1", "param2": "2"})
result = loop.run_until_complete(execute_coroutine)
print(result)
# suggested way to do parallelization:
import loading_functions
import transforming_functions
# parallelized hamilton RUN -- get data in parallel
# suggestion - split loading functions into their own module
adapter = h_dask.SimplePythonGraphAdapter(base.DictResult())
dr1 = driver.Driver({}, loading_functions, result_builder=adapter)
results = dr1.execute(["table1", "table2"])
# now create next driver to run things sequentially
# suggestion - split transforming functions into their own module(s)
dr2 = driver.Driver({}, transforming_functions)
results = dr2.execute(["col1", "col2"], inputs=results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment