This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Callback triggered for each new data frame. | |
def on_data_frame_handler(topic, stream: StreamConsumer, df: pd.DataFrame): | |
# We filter rows where the driver was speeding. | |
above_speed_limit = df[df["speed"] > 130] | |
# If there is a record of speeding, we sent a ticket. | |
if df.shape[0] > O: | |
# We find the moment with the highest speed. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Callback triggered for each new data frame | |
def on_parameter_data_handler(df: pd.DataFrame): | |
# If the braking force applied is more than 50%, we mark HardBraking with True | |
df["HardBraking"] = df.apply(lambda row: "True" if row.Brake > 0.5 else "False", axis=1) | |
stream_producer.timeseries.publish(df) # Send data back to the stream |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def on_read_dataframe(stream: StreamConsumer, df: pd.DataFrame): | |
df["total"] = df["price"] * df["items_count"] | |
topic_producer.get_or_create_stream(stream.stream_id).timeseries_data.write(df) | |
buffer.on_received_dataframe = on_read_dataframe_handler |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
topic_producer = client.get_topic_producer("data") | |
stream = topic_producer.create_stream("bus-123AAAV") | |
# Message 1 sent (the stream context) | |
stream.properties.name = "BUS 123 AAAV" | |
# Message 2 sent (the human-readable identifier the bus) | |
stream.timeseries \ | |
.buffer \ |