Skip to content

Instantly share code, notes, and snippets.

View skrawcz's full-sized avatar

Stefan Krawczyk skrawcz

View GitHub Profile
"""
This module demonstrates a telephone application
using Burr that:
- captions an image
- creates caption embeddings (for analysis)
- creates a new image based on the created caption
"""
import os
import uuid
@skrawcz
skrawcz / functions.py
Created January 21, 2024 05:23
Example using parallelism with Hamilton
# functions.py - declare and link your transformations as functions....
import pandas as pd
from hamilton.htypes import Parallelizable, Collect
def motor(motor_list: list[int]) -> Parallelizable[int]:
for _motor in motor_list:
yield _motor
def _is_motor_on(motor: int ) -> bool:
@skrawcz
skrawcz / hamilton_airflow_node.py
Created June 29, 2023 18:02
Usage pattern for Hamilton within an Airflow DAG task
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
# set the Airflow DAG parameters. This will appear in the Airflow UI.
DEFAULT_DAG_PARAMS = dict(
label="absenteeism_time_in_hours",
feature_set=[
"age_zero_mean_unit_variance",
"has_children",
"has_pet",
@skrawcz
skrawcz / async_funcs.py
Created June 13, 2023 18:42
Shows how to do async based functions in hamilton -- and then a suggestion for another way to parallelize
from hamilton.function_modifiers import extract_columns
import pandas as pd
async def _run_query(col_name: str, query_string: str) -> pd.DataFrame:
# this would go to the database -- ideally the client is passed in as a parameter
# the assumption here is that the database driver is asyncio based, else there's no
# value in doing this :)
return pd.DataFrame({col_name: [query_string]})
# async Hamilton func for query #1
@skrawcz
skrawcz / embed_this_code.py
Created February 7, 2023 21:50
Hamilton streaming pseudo code example
from hamilton import driver
import transforms
# load the client
kafka_client = KafkaClient() # or whatever
config = {...}
dr = driver.Driver(config, transforms, adapter=...)
@skrawcz
skrawcz / indicators.py
Last active January 19, 2023 18:22
Constant passing in Hamilton
import pandas as pd
def total_distance_travelled_by_train( ... ) -> pd.Series:
return # placeholder -- fill with actual logic, etc.
def distance_travelled_till_intermediate_station( ... ) -> pd.Series:
return # placeholder -- fill with actual logic, etc.
def journey_distance(total_distance_travelled_by_train: pd.Series, distance_travelled_till_intermediate_station: pd.Series) -> pd.Series:
@skrawcz
skrawcz / my_builder.py
Last active October 26, 2022 22:44
Custom Result Builder Example
from typing import Tuple
from hamilton import base
class PandasDFWithDebugResultBuilder(base.ResultMixin):
"""This class is an example to show how you can extend the result building functionality of Hamilton.
This result builder returns a dataframe, and dictionary of outputs that we don't want to make into a dataframe,
but are useful for debugging (for example). Caveat: this wont work for ray, dask, or spark usage without some tweaks.
Example Usage::
@skrawcz
skrawcz / my_functions.py
Last active August 6, 2022 22:22
Code to get Hamilton to run asynchronously -- no parallelization
from typing import Any
import pandas as pd
"""
Notes:
1. This file is used for all the [ray|dask|spark]/hello_world examples.
2. It therefore show cases how you can write something once and not only scale it, but port it
to different frameworks with ease!
"""
@skrawcz
skrawcz / example_function.py
Created July 24, 2022 01:04
Function for Tidy Pandas in Production post
@tag(owner='Data-Science', pii='False')
@check_output(data_type=np.float64, range=(-5.0, 5.0), allow_nans=False)
def height_zero_mean_unit_variance(height_zero_mean: pd.Series,
height_std_dev: pd.Series) -> pd.Series:
"""Zero mean unit variance value of height"""
return height_zero_mean / height_std_dev
@skrawcz
skrawcz / my_functions.py
Last active November 13, 2021 06:45
Hamilton with a single value node
# --- my_functions.py
import pandas as pd
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3).mean()
def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series: