Skip to content

Instantly share code, notes, and snippets.

@skrawcz
Created February 7, 2023 21:50
Show Gist options
  • Save skrawcz/8b505850bb1142f58a06b55dee9edf12 to your computer and use it in GitHub Desktop.
Save skrawcz/8b505850bb1142f58a06b55dee9edf12 to your computer and use it in GitHub Desktop.
Hamilton streaming pseudo code example
from hamilton import driver
import transforms
# load the client
kafka_client = KafkaClient() # or whatever
config = {...}
dr = driver.Driver(config, transforms, adapter=...)
while kafka_client.has_next(): # psuedo code, but you get the idea
batch = kafka_client.get_batch() # get a batch of inputs
result = dr.execute([transforms.price_zero_mean], inputs=batch, overrides={"price_mean": SOME_VALUE})
# save - do something else
kafka_client.close()
# you can use these transforms in batch, or in streaming!
def price_per_sqft(price: pd.Series, sqft: pd.Series) -> pd.Series:
return price / sqft
def price_mean(price: pd.Series) -> float:
"""In a streaming context this doesn't make sense, so you could provide this as part of DAG execution."""
return price.mean()
def price_zero_mean(price: pd.Series, price_mean: float) -> pd.Series:
return price - price_mean
# more transforms...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment