Skip to content

Instantly share code, notes, and snippets.

@skrawcz
Created January 21, 2024 05:23
Show Gist options
  • Save skrawcz/2f30866f9506d910b2d8a05a34bff6b9 to your computer and use it in GitHub Desktop.
Save skrawcz/2f30866f9506d910b2d8a05a34bff6b9 to your computer and use it in GitHub Desktop.
Example using parallelism with Hamilton
# functions.py - declare and link your transformations as functions....
import pandas as pd
from hamilton.htypes import Parallelizable, Collect
def motor(motor_list: list[int]) -> Parallelizable[int]:
for _motor in motor_list:
yield _motor
def _is_motor_on(motor: int ) -> bool:
return motor % 2 == 0
def motor_status(motor: int) -> dict:
# logic to check
return {
"motor_id": motor,
"is_on": _is_motor_on(motor)
}
def aggregate_statuses(motor_status: Collect[dict]) -> list[dict]:
return list(motor_status)
# def on_motor(motor_status: Collect[dict]) -> Parallelizable[int]:
def on_motor(aggregate_statuses: list[dict]) -> Parallelizable[int]:
for motor_dict in aggregate_statuses:
# for motor_dict in motor_status:
if motor_dict["is_on"]:
yield motor_dict["motor_id"]
def status_check_1(on_motor: int) -> float:
# some status check.
return 2.3 * on_motor
def status_check_2(on_motor: int, status_check_1: float) -> str:
return f"some result based on {on_motor} and {status_check_1}"
def status_result(on_motor: int, status_check_1: float, status_check_2: str) -> dict:
return locals()
def on_motor_statuses(status_result: Collect[dict]) -> pd.DataFrame:
return pd.DataFrame(status_result)
# And run them!
import functions
from hamilton import base
from hamilton import driver
from hamilton.execution import executors
dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_modules(functions)
# .with_remote_executor(executors.SynchronousLocalTaskExecutor())
.with_adapters(base.PandasDataFrameResult())
.build()
)
result = dr.execute(
['on_motor_statuses'],
inputs={'motor_list': [1, 2, 3, 4, 5]}
)
print(result)
dr.display_all_functions(
"graph.dot", {"format": "png"}, orient="TB", show_legend=False)
@skrawcz
Copy link
Author

skrawcz commented Jan 21, 2024

graph dot

This is what the graph would look like.

@skrawcz
Copy link
Author

skrawcz commented Jan 21, 2024

And only even motors (given the code) get through:

   on_motor  status_check_1                  status_check_2
0         2             4.6  some result based on 2 and 4.6
1         4             9.2  some result based on 4 and 9.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment