Skip to content

Instantly share code, notes, and snippets.

@elijahbenizzy
Created June 26, 2023 23:23
Show Gist options
  • Save elijahbenizzy/52dacb97f4c3513090ad6acaceda0ddc to your computer and use it in GitHub Desktop.
Save elijahbenizzy/52dacb97f4c3513090ad6acaceda0ddc to your computer and use it in GitHub Desktop.
import numpy as np
from pandas.core.indexes.datetimes import DatetimeIndex
import pandas as pd
from typing import Tuple, List, Dict
import functools as ft
from hamilton.function_modifiers import inject, resolve, ResolveAt, parameterized_subdag, source, value, group
# Quick hack to keep everything in the same runnable file
# But you should probably store these in their own modules :)
from hamilton.ad_hoc_utils import create_temporary_module
from hamilton import driver
####### BASE MODULE ########
def spine(start_date: str, end_date: str) -> DatetimeIndex:
return pd.date_range(start_date, end_date)
def feature_1(spine: DatetimeIndex) -> pd.Series:
return pd.Series(index=spine, data=np.random.random(spine.shape))
def feature_2(spine: DatetimeIndex) -> pd.Series:
return pd.Series(index=spine, data=np.random.random(spine.shape))
def feature_3(spine: DatetimeIndex) -> pd.Series:
return pd.Series(index=spine, data=np.random.random(spine.shape))
def feature_4(spine: DatetimeIndex) -> pd.Series:
return pd.Series(index=spine, data=np.random.random(spine.shape))
base_module = create_temporary_module(feature_1, feature_2, feature_3, feature_4, spine)
####### MODULE WITH PARAMETERIZED SUBDAG #########
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda features_requested: inject(
features=group(**{feature: source(feature) for feature in features_requested}))
)
def all_features(features: Dict[str, pd.Series]) -> pd.DataFrame:
return pd.DataFrame(features)
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda time_ranges : parameterized_subdag(
all_features,
base_module,
**{
key: {
'inputs' : {'start_date' : value(start_date), 'end_date' : value(end_date)},
'config' : {'features_requested' : features_requested} # Note we're passing this into the config above...
} for key, (features_requested, start_date, end_date) in time_ranges.items()}
)
)
def all_data(all_features: pd.DataFrame) -> pd.DataFrame:
# This is just a pass-through
return all_features
power_user_module = create_temporary_module(all_data)
####### MODULE WITH HYBRID APPROACH, FILTER AT END #########
def features_filtered_on_time(
all_features: pd.DataFrame,
time_ranges: Dict[str, Tuple[List[str], str, str]]) -> pd.DataFrame:
dfs = []
prefixes = []
for prefix, (features, start_date, end_date) in time_ranges.items():
dfs.append(all_features[features][start_date:end_date])
prefixes.append(prefix)
return _ts_join(dfs, prefixes)
hybrid_module = create_temporary_module(features_filtered_on_time, all_features)
######## UTILITY FUNCTIONS ################
def _ts_join(dfs: List[pd.DataFrame], names: List[str]):
return ft.reduce(
lambda left, right: pd.merge(
left,
right,
how='outer',
left_index=True,
right_index=True,
suffixes=['foo', 'bar']),
dfs)
######## DRIVERS FOR ALL APPROACHES #######
def current_approach(**time_ranges: Tuple[List[str], str, str]):
"""
As you're doing it now:
- pros are its simple/easy to handle
- cons are it's not all int eh DAG/not parallelizable/whatnot
"""
dr = driver.Driver({}, base_module)
dfs = []
prefixes = []
for prefix, (features, start, end) in time_ranges.items():
dfs.append(dr.execute(features, inputs={'start_date' : start, 'end_date': end}))
prefixes.append(prefix)
return _ts_join(dfs, prefixes)
def with_power_user_mode(**time_ranges: Tuple[List[str], str, str]):
"""
With subdags
- pros are its very powerful, uses all the same code except some extra fns. Also quite efficient, only calculates what it needs.
- cons are it's a bit complex. That said, the complexity is all in two functions
What this is doing is:
1. Dynamically creating a "inject", which chooses which nodes to execute
2. Repeats the subdag with "inject" once for each group of features/times
"""
dr = driver.Driver({'time_ranges' : time_ranges, 'hamilton.enable_power_user_mode' : True}, power_user_module)
# dr.visualize_execution(['foo', 'bar'], "./out.png", {})
output = dr.execute([item for item in time_ranges])
return output
def with_filter_at_end(**time_ranges: Tuple[List[str], str, str]):
"""
With a filter step
- pros are
What this is doing is:
1. Generating all the data
2. Cutting out just the pieces the user requested
Note the last step could be replaced by a results builder, I just put it in here to demonstrate.
"""
features_requested = set(sum([features for (features, _, _) in time_ranges.values()], []))
# dates in strings like this sort lexically as well
start_date = min([start_date for (_, start_date, _) in time_ranges.values()])
end_date = max([end_date for (_, _, end_date) in time_ranges.values()])
inputs = {
'start_date' : start_date,
'end_date' : end_date,
'time_ranges' : time_ranges
}
dr = driver.Driver({'features_requested' : features_requested, 'hamilton.enable_power_user_mode' : True}, hybrid_module, base_module)
dr.visualize_execution(['features_filtered_on_time'], "./out.png", {}, inputs=inputs)
return dr.execute(['features_filtered_on_time'], inputs=inputs)
if __name__ == '__main__':
print(current_approach(
foo=(['feature_1', 'feature_2', 'feature_3'], '2022-06-26', '2023-06-26'),
bar=(['feature_2', 'feature_3', 'feature_4'], '2020-03-01', '2021-03-01')))
print(with_power_user_mode(
foo=(['feature_1', 'feature_2', 'feature_3'], '2022-06-26', '2023-06-26'),
bar=(['feature_2', 'feature_3', 'feature_4'], '2020-03-01', '2021-03-01')))
print(with_filter_at_end(
foo=(['feature_1', 'feature_2', 'feature_3'], '2022-06-26', '2023-06-26'),
bar=(['feature_2', 'feature_3', 'feature_4'], '2020-03-01', '2021-03-01')))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment