Skip to content

Instantly share code, notes, and snippets.

@ruoyu0088
Created May 12, 2018 04:23
Show Gist options
  • Save ruoyu0088/ea9a199762993b91f7ca13d0dbdbcdaa to your computer and use it in GitHub Desktop.
Save ruoyu0088/ea9a199762993b91f7ca13d0dbdbcdaa to your computer and use it in GitHub Desktop.
a demo for zmq process with bokeh server
from os import path
from bokeh.models import Button, Div
from bokeh.layouts import column
from bokeh.document import without_document_lock
from bokeh.io import curdoc
from zmq_subprocess import ZmqSubProcessClient
ok_button = Button(label="ok")
div = Div()
def ok_button_clicked():
subproc.send("reset")
ok_button.on_click(ok_button_clicked)
def process_message(message, doc=curdoc()):
def show():
div.text = str(message)
doc.add_next_tick_callback(show)
subproc = ZmqSubProcessClient(curdoc())
subproc.start_subprocess(path.join(path.dirname(__file__), "calc_process.py"), (), process_message)
curdoc().add_root(column(ok_button, div))
import sys
import time
import zmq
from zmq_subprocess import ZmqSubProcess
zsp = ZmqSubProcess()
count = 0
while True:
if zsp.poll() != 0:
message = zsp.recv()
if message == "reset":
count = 0
print("send", count)
zsp.send(count)
count += 1
time.sleep(1.0)
import sys
import subprocess
import zmq
import zmq.asyncio
from bokeh.document import without_document_lock
from bokeh.application.handlers import Handler
def on_session_destroyed(self, session_context):
if hasattr(session_context, "on_destroyed"):
return session_context.on_destroyed(session_context)
Handler.on_session_destroyed = on_session_destroyed
class ZmqSubProcessClient:
def __init__(self, doc, port=0):
ctx = zmq.asyncio.Context.instance()
self.socket = ctx.socket(zmq.PAIR)
if port == 0:
port = self.socket.bind_to_random_port("tcp://127.0.0.1")
else:
addr = "tcp://127.0.0.1:{}".format(port)
self.socket.bind(addr)
self.port = port
self.doc = doc
def start_subprocess(self, pyfile, args, message_callback):
self.process = subprocess.Popen(["python", pyfile] + [str(self.port)] + list(args))
self.message_callback = message_callback
self.doc.add_next_tick_callback(self.message_loop)
self.doc.session_context.on_destroyed = self.destroy
def destroy(self, session_context):
self.process.kill()
@without_document_lock
async def message_loop(self):
while True:
message = await self.socket.recv_pyobj()
self.message_callback(message)
def send(self, message):
@without_document_lock
async def _send_message():
await self.socket.send_pyobj(message)
self.doc.add_next_tick_callback(_send_message)
class ZmqSubProcess:
def __init__(self, port=None):
if port is None:
port = int(sys.argv[1])
ctx = zmq.Context.instance()
self.socket = ctx.socket(zmq.PAIR)
self.socket.connect("tcp://127.0.0.1:{}".format(port))
def send(self, obj):
self.socket.send_pyobj(obj)
def poll(self):
return self.socket.poll(timeout=0)
def recv(self):
return self.socket.recv_pyobj()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment