Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Streaming (faked) LabJack data to Bokeh plot
import csv, datetime, os
from threading import Timer, Lock
import re
import numpy as np
import bokeh.plotting as bk
from bokeh.models import Range1d
# import u3 # LabJack library
# from LabJackPython import LabJackException
class TempMonitor(object):
def __init__(self, log_interval=None):
if log_interval == None:
self.log_interval = 10
else:
self.log_interval = log_interval
self.refresh_interval = 1
self.logdir = "/Applications/Tempura/logs"
try:
os.makedirs(self.logdir)
except OSError:
if not os.path.isdir(self.logdir):
raise
self.update_timer = None
self.log_timer = None
self.is_logging = False
self.input_channels = [0,1,2,3,4,5]
self.num_channels = len(self.input_channels)
# try to open the U3
# self.daq = u3.U3()
# self.daq.getCalibrationData()
# self.daq.configIO(FIOAnalog=0x3F) # AIN 0-5
# self.daq.streamConfig(NumChannels = self.num_channels,
# PChannels = self.input_channels,
# NChannels = [32]*self.num_channels,
# Resolution = 1,
# ScanFrequency = 10000 / self.num_channels)
self.sample_lock = Lock()
self.dataCount = 0
self.V_samples = np.zeros((self.num_channels,))
buffer_length_in_minutes = 10
self.buffer_length = buffer_length_in_minutes * 60 // self.refresh_interval
self.sample_time_buffer = self.buffer_length * [datetime.datetime.now()]
self.temperature_buffer = 20. * np.ones((6, self.buffer_length))
self.make_plot()
def make_plot(self):
bk.output_server('Temperature Monitor')
tools = 'pan,xwheel_zoom,ywheel_zoom,box_zoom,save,reset'
self.plot = bk.figure(title="Gradient Temperature Readings",
x_axis_label="Time [minutes]",
x_axis_type='datetime',
y_axis_label="Temperature [degrees C]",
h_symmetry=True,
height=500,
width=1100,
logo='grey',
tools=tools)
# self.plot.y_range = Range1d(start=20, end=65)
# self.plot.x_range = Range1d(start=-1, end=0)
colors = ['darkblue', 'darkcyan', 'darkmagenta',
'maroon','darkgoldenrod','darkolivegreen']
for ch in self.input_channels:
self.plot.line(self.sample_time_buffer,
self.temperature_buffer[ch,:],
name="ch{}".format(ch),
legend="T{}".format(ch+1),
line_width=4,
line_join='round',
line_cap='round',
line_alpha=0.85,
color=colors[ch])
self.plot.legend[0].orientation = 'top_left'
bk.show(self.plot)
def update_plot(self):
start_time = datetime.datetime.now()
# update the temperature buffer with the averaged collected data
# multiply by 20 to convert V to degrees C
with self.sample_lock:
self.temperature_buffer = np.roll(self.temperature_buffer, -1, 1)
if self.dataCount > 0:
self.temperature_buffer[:,-1] = 20. * np.asarray(self.V_samples) / self.dataCount
else:
# if the DAQ messes up, just assume the temp didn't change
self.temperature_buffer[:,-1] = self.temperature_buffer[:,-2]
self.V_samples = np.zeros((self.num_channels,))
self.dataCount = 0
self.sample_time_buffer = np.roll(self.sample_time_buffer, -1)
self.sample_time_buffer[-1] = start_time
ch_re = re.compile(r'ch(?P<ch>\d+)')
for r in self.plot.renderers:
if r.name:
match = re.match(ch_re, r.name)
if match:
ch = int(match.group('ch'))
r.data_source.data['x'] = self.sample_time_buffer
r.data_source.data['y'] = self.temperature_buffer[ch,:]
bk.push()
# restart the collection timer
dt = start_time.now() - start_time
next_refresh = max(0, self.refresh_interval - dt.seconds - dt.microseconds/1e6)
self.update_timer = Timer(next_refresh, self.update_plot)
self.update_timer.daemon = True
self.update_timer.start()
def log_data(self):
sample_time = self.sample_time_buffer[-1]
temperature_samples = self.temperature_buffer[:,-1]
logfilename = "gradient_temps_{}.log".format(sample_time.strftime('%Y-%m-%d'))
with open(os.path.join(self.logdir, logfilename), 'a') as csv_log_file:
logwriter = csv.writer(csv_log_file)
row = [sample_time.strftime('%H:%M:%S')]
row += ["{:.3f}".format(s) for s in temperature_samples]
logwriter.writerow(row)
self.is_logging = False
self.start_logging()
def start_collecting(self, recover=False):
self.update_timer = Timer(self.refresh_interval, self.update_plot)
self.update_timer.daemon = True
self.update_timer.start()
if not recover:
with self.sample_lock:
self.dataCount = 0
self.V_samples = np.zeros((self.num_channels,))
# continuously sample data using the U3 streaming mode
try:
# if not self.daq.streamStarted:
# self.daq.streamStart()
# continuously running loop
# for r in self.daq.streamData():
while True:
r = 1
if r is not None:
# if r['errors'] != 0:
# print("STREAM ERROR : ", r['errors'])
# if r['numPackets'] != self.daq.packetsPerRequest:
# print("---- UNDERFLOW : ", r['numPackets'])
# if r['missed'] != 0:
# print("++++ MISSED : ", r['missed'])
# average samples
with self.sample_lock:
self.V_samples += [2. * np.random.random() for ch in self.input_channels]
self.dataCount += 1
else:
# no data from read
print("No data")
except LabJackException as e:
print(e)
# self.daq.streamStop()
print "Restarting stream..."
self.start_collecting(recover=True)
finally:
pass
# self.daq.streamStop()
def start_logging(self):
if not self.is_logging:
self.log_timer = Timer(self.log_interval, self.log_data)
self.log_timer.daemon = True
self.log_timer.start()
self.is_logging = True
def start(self):
self.start_collecting()
self.start_logging()
def stop(self):
self.update_timer.cancel()
self.log_timer.cancel()
if __name__ == '__main__':
T = TempMonitor()
T.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.