Skip to content

Instantly share code, notes, and snippets.

@ankona
Last active February 15, 2024 16:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ankona/1b18f5c7d775b286d3b437e8b0fc4aaa to your computer and use it in GitHub Desktop.
Save ankona/1b18f5c7d775b286d3b437e8b0fc4aaa to your computer and use it in GitHub Desktop.
0MQ synthetic data publisher
import numpy as np
import time
import zmq
def gen():
min_value = 0
max_value = 2000
normal_dy = 10
sample = np.random.randn()
total_range = max_value - min_value
difference = sample * total_range
last_value = min_value + difference
i = 0
while True:
sample = np.random.randn()
difference = sample * normal_dy
spikeprob = np.random.randn()
is_spike = spikeprob < 0.01
if is_spike:
# let's amp up the signal
difference *= 2
last_value += difference
yval = last_value
i += 1
yield i, yval
import json
if __name__ == "__main__":
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5678")
min_value = 0
max_value = 2000
normal_dy = 10
sample = np.random.randn()
total_range = max_value - min_value
difference = sample * total_range
last_value = min_value + difference
i = 0
x_list = []
y_list = []
# socket.recv()
for x, y in gen():
x_list.append(x)
y_list.append(y)
if len(x_list) > 99:
batch = json.dumps({"x": x_list, "y": y_list})
print("sending batch...", batch)
socket.send_string(batch)
x_list, y_list = [], []
time.sleep(0.05)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment