Skip to content

Instantly share code, notes, and snippets.

# Callback triggered for each new data frame.
def on_data_frame_handler(topic, stream: StreamConsumer, df: pd.DataFrame):
# We filter rows where the driver was speeding.
above_speed_limit = df[df["speed"] > 130]
# If there is a record of speeding, we sent a ticket.
if df.shape[0] > O:
# We find the moment with the highest speed.
# Callback triggered for each new data frame
def on_parameter_data_handler(df: pd.DataFrame):
# If the braking force applied is more than 50%, we mark HardBraking with True
df["HardBraking"] = df.apply(lambda row: "True" if row.Brake > 0.5 else "False", axis=1)
stream_producer.timeseries.publish(df) # Send data back to the stream
def on_read_dataframe(stream: StreamConsumer, df: pd.DataFrame):
df["total"] = df["price"] * df["items_count"]
topic_producer.get_or_create_stream(stream.stream_id).timeseries_data.write(df)
buffer.on_received_dataframe = on_read_dataframe_handler
topic_producer = client.get_topic_producer("data")
stream = topic_producer.create_stream("bus-123AAAV")
# Message 1 sent (the stream context)
stream.properties.name = "BUS 123 AAAV"
# Message 2 sent (the human-readable identifier the bus)
stream.timeseries \
.buffer \