Skip to content

Instantly share code, notes, and snippets.

@nikhilsimha
Last active October 3, 2023 04:52
Show Gist options
  • Save nikhilsimha/13cf46b93116bc3b0b08b4adc1483bd1 to your computer and use it in GitHub Desktop.
Save nikhilsimha/13cf46b93116bc3b0b08b4adc1483bd1 to your computer and use it in GitHub Desktop.
// userConf map is generated from commandline automatically when you use -Z prefix with Driver.scala or run.py
// -Zkey1=value1 -Zkey2=value2 will become userConf map {'key1': 'value1', 'key2': 'value2'}
class MyApiImpl(userConf: Map[String, String]) extends ai.chronon.online.Api(userConf) {
// tell chronon how to convert bytes in your kafka stream into ai.chronon.online.Mutation
override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): StreamDecoder
// tell chronon how to talk to your kv store - you can use userConf to initialize your kvstore
override def genKvStore: KVStore
}
from ai.chronon import query
from ai.chronon.group_by import GroupBy, TimeUnit, Window
from ai.chronon.api.ttypes import EventSource, EntitySource, Aggregation, Operation, JoinPart
from ai.chronon.join import Join
ratings_features = GroupBy(
sources=[
EntitySource(
snapshotTable="item_info.ratings_snapshots_table",
mutationsTable="item_info.ratings_mutations_table",
mutationsTopic="ratings_mutations_topic",
query=query.Query(
selects={
"rating": "CAST(rating as DOUBLE)",
}))
],
keys=["item"],
aggregations=[
Aggregation(operation=Operation.AVERAGE, windows=[Window(length=90, timeUnit=TimeUnit.DAYS)]),
])
view_features = GroupBy(
sources=[
EventSource(
table="user_activity.user_views_table",
topic="user_views_stream",
query=query.Query(
selects={
"view": "if(context['activity_type'] = 'item_view', 1 , 0)",
},
wheres=["user != null"]))
],
keys=["user", "item"],
aggregations=[
Aggregation(operation=Operation.COUNT, windows=[Window(length=5, timeUnit=TimeUnit.HOURS)]),
])
item_rec_features = Join(
left=EventSource(
table="user_activity.view_purchases",
query=query.Query(
start_partition='2021-06-30'
)
),
## keys are automatically mapped from left to right_parts
right_parts=[JoinPart(groupBy = view_features), JoinPart(groupBy = ratings_features)]
)
// If you have setup the chronon repo it should be straigt forward to use the cli to fetch
run.py --mode=fetch -k '{"user":"some_user","item":"some_item"}' -n your_team.chronon_example.item_rec_features -t join
// somewhere in your service that you want to fetch features from
// Do this once
val apiImpl = new MyApiImpl(...) // any args needed to initialize kvstore
val fetcher = apiImpl.buildFetcher()
// In the request response loop of your service
def yourFetchMethod(user: String, item: String): Future[ai.chronon.online.Response] = {
// fetch join is a batch api but for this simple example we will implement a single request fetch
val responseFuture = fetcher.fetchJoin(
Seq(ai.chronon.online.Request(
"your_team.chronon_example.item_rec_features", // The join name is derived from the conf varible
Map("user" -> user, "item" -> item)))
).map(_.head)
}
from ai.chronon import query
from ai.chronon.group_by import GroupBy, TimeUnit, Window
from ai.chronon.api.ttypes import EventSource, EntitySource, Aggregation, Operation, JoinPart
from ai.chronon.join import Join
def build_group_by(*key_columns):
return GroupBy(
sources=[
EntitySource(
snapshotTable="payments.transactions", # hive daily table snapshot
mutationsTable="payments.transaction_mutations", # hive mutations log
mutationsTopic="payments.transaction_mutations", # kafka mutation events
query=query.Query(
selects=selects("amount_usd"),
wheres=["amount_usd > 0", "transaction_status = 'SUCCESSFUL'"]
)
)
],
keys=key_columns,
aggregations=[
Aggregation(operation=op, input_column="amount_usd") for op in (Operation.AVERAGE, Operation.VARIANCE)
]
)
user_txn_features = build_group_by("user")
merchant_txn_features = build_group_by("merchant")
interaction_txn_features = build_group_by("user", "merchant")
# join has two sides left & right
# left represents the table, topic or the request we are enriching
# right is the data we are enriching the left with
txn_features = Join(
# keys are automatically mapped from left to right_parts
right_parts=[
JoinPart(groupBy = user_txn_features),
JoinPart(groupBy = merchant_txn_features),
JoinPart(groupBy = interaction_txn_features),
],
derivations={
"user_z_score": "(amount_usd - user_txn_features_amount_usd_average)/user_txn_features_amount_usd_variance",
"merchant_z_score": "(amount_usd - merchant_txn_features_amount_usd_average)/merchant_txn_features_amount_usd_variance"
"interaction_z_score": "(amount_usd - interaction_txn_features_amount_usd_average)/interaction_txn_features_amount_usd_variance"
}
)
# join has two sides left & right
# left represents the table, topic or the request we are enriching
# right is the data we are enriching the left with
txn_features = Join(
# keys are automatically mapped from left to right_parts
right_parts=[
JoinPart(groupBy = user_txn_features),
JoinPart(groupBy = merchant_txn_features),
JoinPart(groupBy = interaction_txn_features),
],
derivations={
f"{name}_z_score": f"(amount_usd - {name}_txn_features_amount_usd_average)/{name}_txn_features_amount_usd_variance"
for name in ("user", "merchant", "interaction")
}
)
# join has two sides left & right
# left represents the table, topic or the request we are enriching
# right is the data we are enriching the left with
txn_features = Join(
left=EventSource(
# enrich this table
table="payments.transaction_chargebacks",
query=query.Query(
# we will backfill starting from this partition automatically
start_partition='2021-06-30'
)
),
# keys are automatically mapped from left to right_parts
right_parts=[
JoinPart(groupBy = user_txn_features),
JoinPart(groupBy = merchant_txn_features),
JoinPart(groupBy = interaction_txn_features),
],
derivations={
f"{name}_z_score": f"(amount_usd - {name}_txn_features_amount_usd_average)/{name}_txn_features_amount_usd_variance"
for name in ("user", "merchant", "interaction")
}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment