Last active
February 28, 2023 17:29
-
-
Save merlin-quix/38d090ecaf99f3cec0003a81a8895cbc to your computer and use it in GitHub Desktop.
OSS Announcement
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
state = InMemoryStorage(LocalFileStorage()) | |
def on_g_force_x(topic, stream_consumer: StreamConsumer, data: TimeseriesData): | |
for row in data.timestamps: | |
# Append G-Force sensor value to accumulated state (SUM). | |
state[stream_consumer.stream_id] += abs(row.parameters["gForceX"].numeric_value) | |
# Attach new column with aggregated values. | |
row.add_value("gForceX_sum", state[stream_consumer.stream_id]) | |
# Send updated rows to the producer topic. | |
topic_producer.get_or_create_stream(stream_consumer.stream_id).timeseries.publish(data) | |
# read streams | |
def read_stream(topic_consumer: TopicConsumer, stream_consumer: StreamConsumer): | |
# If there is no record for this stream, create a default value. | |
if stream_consumer.stream_id not in state: | |
state[stream_consumer.stream_id] = 0 | |
# We subscribe to gForceX column. | |
stream_consumer.timeseries.create_buffer("gForceX").on_read = on_g_force_x |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment