Skip to content

Instantly share code, notes, and snippets.

@JEnoch
Last active May 9, 2023 17:33
Show Gist options
  • Save JEnoch/9c0690d6af83d94bbeeef8a26c38fa7a to your computer and use it in GitHub Desktop.
Save JEnoch/9c0690d6af83d94bbeeef8a26c38fa7a to your computer and use it in GitHub Desktop.
A Zenoh Python script that queries a time-series Storage (typically InfluxDB) for past publications and replay them
#
# A Zenoh Python script that queries a time-series Storage (typically InfluxDB)
# for past publications and replay them
#
import sys
import time, datetime
import argparse
import json
import zenoh
from zenoh import config, QueryTarget
# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
prog='z_replay',
description='zenoh replay example')
parser.add_argument('--mode', '-m', dest='mode',
choices=['peer', 'client'],
type=str,
help='The zenoh session mode.')
parser.add_argument('--connect', '-e', dest='connect',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to connect to.')
parser.add_argument('--listen', '-l', dest='listen',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to listen on.')
parser.add_argument('--selector', '-s', dest='selector',
default='demo/example/**',
type=str,
help='The selection of resources to query.')
parser.add_argument('--target', '-t', dest='target',
choices=['ALL', 'BEST_MATCHING', 'ALL_COMPLETE', 'NONE'],
default='BEST_MATCHING',
type=str,
help='The target queryables of the query.')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
help='A configuration file.')
parser.add_argument('--time-filter', dest='time_filter',
default='[now(-5m)..]',
type=str,
help='A time filter used for query (e.g.: "[now(-1h)..]")')
parser.add_argument('--replay-prefix', dest='replay_prefix',
default='replay/',
type=str,
help='A prefix added to each key expression when re-publishing')
parser.add_argument('--time-scale', dest='time_scale',
default=1.0,
type=float,
help='The time scale (i.e. multiplier of time interval between each re-publication)')
args = parser.parse_args()
conf = zenoh.Config.from_file(
args.config) if args.config is not None else zenoh.Config()
if args.mode is not None:
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode))
if args.connect is not None:
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect))
if args.listen is not None:
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
selector = args.selector
target = {
'ALL': QueryTarget.ALL(),
'BEST_MATCHING': QueryTarget.BEST_MATCHING(),
'ALL_COMPLETE': QueryTarget.ALL_COMPLETE(),
}.get(args.target)
# Zenoh code --- --- --- --- --- --- --- --- --- --- ---
# initiate logging
zenoh.init_logger()
print("Opening session...")
session = zenoh.open(conf)
# Get past publications from a storage with the selector and the time filter
query = selector + "?_time=" + args.time_filter
print("Query on '{}'...".format(query))
replies = session.get(query, zenoh.Queue(), target=target)
# Sort replies by timestamp
sorted_replies = sorted(replies.receiver, key=lambda reply: reply.ok.timestamp)
# If no reply, exit
if sorted_replies.__len__() == 0:
print("No publications found - nothing to replay.")
exit(0)
# Get first and last timestamps
first_ts = sorted_replies[0].ok.timestamp.seconds_since_unix_epoch
last_ts = sorted_replies[-1].ok.timestamp.seconds_since_unix_epoch
print("Replay {} publications made between {} and {} ".format(
sorted_replies.__len__(),
datetime.datetime.utcfromtimestamp(first_ts),
datetime.datetime.utcfromtimestamp(last_ts)))
print(" Initial duration: {:.1f} seconds => with time-scale={}, new duration: {:.1f} seconds".format(
last_ts - first_ts,
args.time_scale,
(last_ts - first_ts) * args.time_scale))
# Iterate throught the replies, re-publishing each and sleeping between each
previous_ts = first_ts
for reply in sorted_replies:
# sleep the interval between this reply and the previous reply, multiplied by time_scale
current_ts = reply.ok.timestamp.seconds_since_unix_epoch
time.sleep((current_ts - previous_ts) * args.time_scale)
# re-publish, adding the replay_prefix to the key expression
replay_ke = args.replay_prefix + reply.ok.key_expr.__str__()
print(" - [{}] Replay publication from '{}' to '{}'".format(
datetime.datetime.utcfromtimestamp(current_ts),
reply.ok.key_expr,
replay_ke))
session.put(replay_ke, reply.ok.payload, encoding=reply.ok.encoding)
# update previous_ts
previous_ts = current_ts
session.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment