Skip to content

Instantly share code, notes, and snippets.

@telegraphic
Last active January 12, 2023 02:01
Show Gist options
  • Save telegraphic/2709b7e6edc3a0c39ed9b75452da205e to your computer and use it in GitHub Desktop.
Save telegraphic/2709b7e6edc3a0c39ed9b75452da205e to your computer and use it in GitHub Desktop.
Dash (plot.ly) with ZMQ PubSub, sending numpy arrays with msgpack
"""
# app.py - example Dash + ZMQ + msgpack + numpy monitor
This app receives data from zmq_pub.py, and plots it.
Run the app, browse to localhost:8085 in your web browser, and run zmq_pub.py in a different terminal.
"""
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import dash_daq as daq
import zmq
import msgpack
import msgpack_numpy as m
m.patch()
def create_zmq_socket(zmq_port="5556", topicfilter="data"):
""" Create a ZMQ SUBSCRIBE socket """
context = zmq.Context()
zmq_socket = context.socket(zmq.SUB)
zmq_socket.connect ("tcp://localhost:%s" % zmq_port)
zmq_socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
return zmq_socket
def recv_zmq(topic='data'):
""" Receive data over ZMQ PubSub socket
Args:
socket: zmq.socket
topic: topic subscribed to
Returns numpy array data
"""
# Note - context managing socket as it can't be shared between threads
# This makes sure the socket is opened and closed by whatever thread Dash gives it
with create_zmq_socket() as socket:
msg = socket.recv()
msgdata = msg[len(topic)+1:]
return msgpack.unpackb(msgdata)
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(children=[
html.H1(children='Hello Dash'),
html.Div(children='''
Dash: A web application framework for Python.
'''),
dcc.Graph(
id='example-graph',
figure={
'data': [
{'x': [1, 2, 3, 4, 5], 'y': [4, 1, 2, 1, 2],
'type': 'scatter', 'name': 'SF'},
],
'layout': {
'title': 'Dash Data Visualization'
}
}
),
dcc.Interval(
id='interval-component',
interval=1*1000, # in milliseconds
n_intervals=0
)
])
# The updating is done with this callback
@app.callback(
Output('example-graph', 'figure'),
[Input('interval-component', 'n_intervals')])
def update(n):
d = recv_zmq('data')
return {'data': [
{'x': d['x'], 'y': d['y'],
'type': 'scatter', 'name': 'SF'},
]}
if __name__ == '__main__':
app.run_server(debug=True)
import zmq
import random
import sys
import time
import msgpack
import msgpack_numpy as m
import numpy as np
m.patch()
# Create ZMQ socket
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
def send_msg(socket, d, topic='data'):
""" Send data over ZMQ PubSub socket
Args:
socket: zmq.socket instance
topic: topic to put in ZMQ topic field (str)
"""
msg = msgpack.packb(d)
return socket.send("%s %s" % (topic, msg))
while True:
topic = "data"
messagedata = {'x': np.arange(1024), 'y': np.random.randint(5, size=1024)}
print "%s %s" % (topic, messagedata)
send_msg(socket, messagedata, topic)
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment