-
-
Save nikhilsimha/13cf46b93116bc3b0b08b4adc1483bd1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// userConf map is generated from commandline automatically when you use -Z prefix with Driver.scala or run.py | |
// -Zkey1=value1 -Zkey2=value2 will become userConf map {'key1': 'value1', 'key2': 'value2'} | |
class MyApiImpl(userConf: Map[String, String]) extends ai.chronon.online.Api(userConf) { | |
// tell chronon how to convert bytes in your kafka stream into ai.chronon.online.Mutation | |
override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): StreamDecoder | |
// tell chronon how to talk to your kv store - you can use userConf to initialize your kvstore | |
override def genKvStore: KVStore | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ai.chronon import query | |
from ai.chronon.group_by import GroupBy, TimeUnit, Window | |
from ai.chronon.api.ttypes import EventSource, EntitySource, Aggregation, Operation, JoinPart | |
from ai.chronon.join import Join | |
ratings_features = GroupBy( | |
sources=[ | |
EntitySource( | |
snapshotTable="item_info.ratings_snapshots_table", | |
mutationsTable="item_info.ratings_mutations_table", | |
mutationsTopic="ratings_mutations_topic", | |
query=query.Query( | |
selects={ | |
"rating": "CAST(rating as DOUBLE)", | |
})) | |
], | |
keys=["item"], | |
aggregations=[ | |
Aggregation(operation=Operation.AVERAGE, windows=[Window(length=90, timeUnit=TimeUnit.DAYS)]), | |
]) | |
view_features = GroupBy( | |
sources=[ | |
EventSource( | |
table="user_activity.user_views_table", | |
topic="user_views_stream", | |
query=query.Query( | |
selects={ | |
"view": "if(context['activity_type'] = 'item_view', 1 , 0)", | |
}, | |
wheres=["user != null"])) | |
], | |
keys=["user", "item"], | |
aggregations=[ | |
Aggregation(operation=Operation.COUNT, windows=[Window(length=5, timeUnit=TimeUnit.HOURS)]), | |
]) | |
item_rec_features = Join( | |
left=EventSource( | |
table="user_activity.view_purchases", | |
query=query.Query( | |
start_partition='2021-06-30' | |
) | |
), | |
## keys are automatically mapped from left to right_parts | |
right_parts=[JoinPart(groupBy = view_features), JoinPart(groupBy = ratings_features)] | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// If you have setup the chronon repo it should be straigt forward to use the cli to fetch | |
run.py --mode=fetch -k '{"user":"some_user","item":"some_item"}' -n your_team.chronon_example.item_rec_features -t join |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// somewhere in your service that you want to fetch features from | |
// Do this once | |
val apiImpl = new MyApiImpl(...) // any args needed to initialize kvstore | |
val fetcher = apiImpl.buildFetcher() | |
// In the request response loop of your service | |
def yourFetchMethod(user: String, item: String): Future[ai.chronon.online.Response] = { | |
// fetch join is a batch api but for this simple example we will implement a single request fetch | |
val responseFuture = fetcher.fetchJoin( | |
Seq(ai.chronon.online.Request( | |
"your_team.chronon_example.item_rec_features", // The join name is derived from the conf varible | |
Map("user" -> user, "item" -> item))) | |
).map(_.head) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ai.chronon import query | |
from ai.chronon.group_by import GroupBy, TimeUnit, Window | |
from ai.chronon.api.ttypes import EventSource, EntitySource, Aggregation, Operation, JoinPart | |
from ai.chronon.join import Join | |
def build_group_by(*key_columns): | |
return GroupBy( | |
sources=[ | |
EntitySource( | |
snapshotTable="payments.transactions", # hive daily table snapshot | |
mutationsTable="payments.transaction_mutations", # hive mutations log | |
mutationsTopic="payments.transaction_mutations", # kafka mutation events | |
query=query.Query( | |
selects=selects("amount_usd"), | |
wheres=["amount_usd > 0", "transaction_status = 'SUCCESSFUL'"] | |
) | |
) | |
], | |
keys=key_columns, | |
aggregations=[ | |
Aggregation(operation=op, input_column="amount_usd") for op in (Operation.AVERAGE, Operation.VARIANCE) | |
] | |
) | |
user_txn_features = build_group_by("user") | |
merchant_txn_features = build_group_by("merchant") | |
interaction_txn_features = build_group_by("user", "merchant") | |
# join has two sides left & right | |
# left represents the table, topic or the request we are enriching | |
# right is the data we are enriching the left with | |
txn_features = Join( | |
# keys are automatically mapped from left to right_parts | |
right_parts=[ | |
JoinPart(groupBy = user_txn_features), | |
JoinPart(groupBy = merchant_txn_features), | |
JoinPart(groupBy = interaction_txn_features), | |
], | |
derivations={ | |
"user_z_score": "(amount_usd - user_txn_features_amount_usd_average)/user_txn_features_amount_usd_variance", | |
"merchant_z_score": "(amount_usd - merchant_txn_features_amount_usd_average)/merchant_txn_features_amount_usd_variance" | |
"interaction_z_score": "(amount_usd - interaction_txn_features_amount_usd_average)/interaction_txn_features_amount_usd_variance" | |
} | |
) | |
# join has two sides left & right | |
# left represents the table, topic or the request we are enriching | |
# right is the data we are enriching the left with | |
txn_features = Join( | |
# keys are automatically mapped from left to right_parts | |
right_parts=[ | |
JoinPart(groupBy = user_txn_features), | |
JoinPart(groupBy = merchant_txn_features), | |
JoinPart(groupBy = interaction_txn_features), | |
], | |
derivations={ | |
f"{name}_z_score": f"(amount_usd - {name}_txn_features_amount_usd_average)/{name}_txn_features_amount_usd_variance" | |
for name in ("user", "merchant", "interaction") | |
} | |
) | |
# join has two sides left & right | |
# left represents the table, topic or the request we are enriching | |
# right is the data we are enriching the left with | |
txn_features = Join( | |
left=EventSource( | |
# enrich this table | |
table="payments.transaction_chargebacks", | |
query=query.Query( | |
# we will backfill starting from this partition automatically | |
start_partition='2021-06-30' | |
) | |
), | |
# keys are automatically mapped from left to right_parts | |
right_parts=[ | |
JoinPart(groupBy = user_txn_features), | |
JoinPart(groupBy = merchant_txn_features), | |
JoinPart(groupBy = interaction_txn_features), | |
], | |
derivations={ | |
f"{name}_z_score": f"(amount_usd - {name}_txn_features_amount_usd_average)/{name}_txn_features_amount_usd_variance" | |
for name in ("user", "merchant", "interaction") | |
} | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment