Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save merlin-quix/38d090ecaf99f3cec0003a81a8895cbc to your computer and use it in GitHub Desktop.
Save merlin-quix/38d090ecaf99f3cec0003a81a8895cbc to your computer and use it in GitHub Desktop.
OSS Announcement
state = InMemoryStorage(LocalFileStorage())
def on_g_force_x(topic, stream_consumer: StreamConsumer, data: TimeseriesData):
for row in data.timestamps:
# Append G-Force sensor value to accumulated state (SUM).
state[stream_consumer.stream_id] += abs(row.parameters["gForceX"].numeric_value)
# Attach new column with aggregated values.
row.add_value("gForceX_sum", state[stream_consumer.stream_id])
# Send updated rows to the producer topic.
topic_producer.get_or_create_stream(stream_consumer.stream_id).timeseries.publish(data)
# read streams
def read_stream(topic_consumer: TopicConsumer, stream_consumer: StreamConsumer):
# If there is no record for this stream, create a default value.
if stream_consumer.stream_id not in state:
state[stream_consumer.stream_id] = 0
# We subscribe to gForceX column.
stream_consumer.timeseries.create_buffer("gForceX").on_read = on_g_force_x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment