Skip to content

Instantly share code, notes, and snippets.

@hanhha
Last active January 23, 2018 15:40
Show Gist options
  • Save hanhha/a0b6f9a6fa87c9843de70d8e5f18415f to your computer and use it in GitHub Desktop.
Save hanhha/a0b6f9a6fa87c9843de70d8e5f18415f to your computer and use it in GitHub Desktop.
Demo of Bokeh programmatic server
#/usr/bin/env python3
from datetime import datetime
from time import sleep
from bokeh.plotting import figure
from bokeh.application import Application
from bokeh.application.handlers.function import FunctionHandler
from bokeh.document import without_document_lock
from bokeh.models import ColumnDataSource
from bokeh.models.glyphs import Line
from bokeh.server.server import Server
from tornado import gen, ioloop
from threading import Thread, Lock
class BPA (object): # Base Processing Agent
def __init__ (self, source = None):
self._source = source
self._observer = list ()
if self._source is not None:
self._source.Bind (self.CallBack)
def Bind (self, cb):
self._observer.append (cb)
def CallBack (self, data):
assert "Must be overidden", 0
def BroadCast (self, data):
for cb in self._observer:
cb (data)
class DataGen (BPA):
def start (self):
in_turn = True
idx, y0, y1 = 0, 0, 0
try:
while True:
s = 0 if in_turn else 1
y = y0 if in_turn else y1
self.BroadCast ({s : {'x': idx, 'y': y}})
print ("New data for stream {s}: {x} - {y}".format (s = s, x = idx, y = y))
idx += 1
y0 += 1 if in_turn else 0
y1 += 1 if not in_turn else 0
in_turn = not in_turn
sleep (1)
except KeyboardInterrupt:
print ("\rUser interrupted.")
class Chart (BPA):
def __init__ (self, source):
self._new_data = dict ()
self._sources = dict ()
self.port = 8888
self.allow_websocket_origin = ['localhost:8888']
self._cb_lock = Lock ()
self._new_data_lock = Lock ()
self._exec_t = None
self.doc = None
BPA.__init__ (self, source)
@gen.coroutine
def real_update_data (self):
self._new_data_lock.acquire ()
new_plot_data = self._new_data.copy()
self._new_data = dict ()
self._new_data_lock.release ()
for k, v in new_plot_data.items():
if k in self._sources.keys():
self._sources[k].stream (v, rollover = 288)
self._cb_lock.release ()
@gen.coroutine
@without_document_lock
def update_data (self):
if self._cb_lock.acquire (False):
self.doc.add_next_tick_callback (self.real_update_data)
def CallBack (self, data):
self._new_data_lock.acquire ()
for k, v in data.items():
if k not in self._new_data.keys():
self._new_data[k] = {}
for kd, vd in v.items():
if kd not in self._new_data[k].keys():
self._new_data[k][kd] = [vd]
else:
self._new_data[k][kd].append (vd)
self._new_data_lock.release ()
def modify_document (self, doc):
self._cb_lock.acquire ()
self.doc = doc
p = figure(sizing_mode = 'scale_width', plot_height = 200)
p.grid.grid_line_alpha = 0.6
if self._sources:
for k, v in self._sources.items():
tmp = v.to_df().to_dict(orient='list')
self._sources[k] = ColumnDataSource(tmp)
else:
self._sources [0] = ColumnDataSource ({'x':[],'y':[]})
self._sources [1] = ColumnDataSource ({'x':[],'y':[]})
g0 = Line (x = 'x', y = 'y', line_color = 'blue')
g1 = Line (x = 'x', y = 'y', line_color = 'red')
p.add_glyph (self._sources[0], g0)
p.add_glyph (self._sources[1], g1)
self.doc.add_root (p)
self.doc.add_periodic_callback (self.update_data, 1000)
self._cb_lock.release ()
return self.doc
def start (self):
app = {'/': Application(FunctionHandler(self.modify_document))}
self.server = Server (app, port = self.port, allow_websocket_origin = self.allow_websocket_origin)
self.server.start ()
self._exec_t = Thread (name = "bokeh_chart", target = self.server.io_loop.start)
self._exec_t.start ()
def stop (self):
self.server.io_loop.stop ()
self._exec_t.join ()
if __name__ == '__main__':
datagen = DataGen ()
chart = Chart (datagen)
chart.start ()
datagen.start ()
chart.stop ()
exit (0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment