Skip to content

Instantly share code, notes, and snippets.

@peterhillyard
Last active April 15, 2022 04:32
Show Gist options
  • Save peterhillyard/027f6ec86f9603ee76d2212a4c6a98f4 to your computer and use it in GitHub Desktop.
Save peterhillyard/027f6ec86f9603ee76d2212a4c6a98f4 to your computer and use it in GitHub Desktop.
A pyqtgraph example for plotting realtime data
from accelerometer import AccelerometerLiveM5, AccelerometerLiveM4
from PyQt5.Qt import QMutex
from pyqtgraph.Qt import QtGui, QtCore
from copy import copy
import pyqtgraph as pg
import sys
import time
import numpy as np
import const as c
pg.setConfigOptions(useOpenGL=False)
class StreamEmitterM4(QtCore.QObject):
new_data_signal = QtCore.pyqtSignal(object)
def __init__(self, parent=None):
QtCore.QObject.__init__(self)
self.parent = parent
self.mutex = QMutex()
self.num_samples = 1000
self.ac_live = AccelerometerLiveM4(c.zmq_obj, 0.05)
self.setup_data_dicts()
def setup_data_dicts(self):
self.stream_obj = {}
self.stream_obj['singles'] = {
'oldest_index': 0,
'ts_vec': np.nan*np.ones(self.num_samples),
}
for ax in self.ac_live.axes:
self.stream_obj[ax] = {
'raw': np.zeros(self.num_samples),
'st_avg': np.zeros(self.num_samples),
'derivative': np.zeros(self.num_samples),
'oldest_index': 0
}
def recv_and_update_motion(self):
prev_ts = 0
is_running = True
while is_running:
# Get list of valid updated links
self.ac_live.process_data()
self.mutex.lock()
ts = time.time()
index = self.stream_obj['singles']['oldest_index']
self.stream_obj['singles']['ts_vec'][index] = ts
self.stream_obj['singles']['oldest_index'] = (index + 1) % self.num_samples
for ax in self.ac_live.axes:
index = self.stream_obj[ax]['oldest_index']
self.stream_obj[ax]['raw'][index] = self.ac_live.accel[ax]
self.stream_obj[ax]['st_avg'][index] = self.ac_live.accel_st_avg[ax]
self.stream_obj[ax]['derivative'][index] = self.ac_live.accel_derivative[ax]
self.stream_obj[ax]['oldest_index'] = (index + 1) % self.num_samples
self.mutex.unlock()
# Send out new data
if ts - prev_ts > 0.1:
self.new_data_signal.emit(self.stream_obj)
prev_ts = ts
class MainWinM4(QtGui.QMainWindow):
"""Holds a couple of tone power plots. Updates the plots whenever DataReceiver
signals that new data is ready."""
def __init__(self):
QtGui.QMainWindow.__init__(self)
self.data_generator = StreamEmitterM4(self)
self.qt_thread = QtCore.QThread()
self.data_generator.moveToThread(self.qt_thread)
self.data_generator.new_data_signal.connect(self.update_plots)
self.qt_thread.started.connect(self.data_generator.recv_and_update_motion)
self.qt_thread.start()
self.setup_plot()
def setup_plot(self):
self.win = {}
self.win['accel_and_st_avg'] = pg.GraphicsWindow(title="")
self.win['accel_derivative'] = pg.GraphicsWindow(title="")
# win.resize(640, 480)
pg.setConfigOptions(antialias=True)
self.setCentralWidget(self.win['accel_and_st_avg'])
self.curve_dict = {}
self.plot_dict = {}
for ii, ax in enumerate(self.data_generator.ac_live.axes):
self.plot_dict[ax] = self.win['accel_and_st_avg'].addPlot()
self.plot_dict[ax].addLegend()
self.plot_dict[ax].showGrid(x=True, y=True)
self.plot_dict[ax].setYRange(-20000, 20000)
self.plot_dict[ax].setLabel(axis='left', text='Raw x', units='')
self.plot_dict[ax].setLabel(axis='bottom', text='time', units='s')
self.curve_dict[ax] = {}
self.curve_dict[ax]['raw'] = self.plot_dict[ax].plot(pen=pg.mkPen(color='r', width=3), name='raw')
self.curve_dict[ax]['raw'].setData([0, 1], [0, 1])
self.curve_dict[ax]['st_avg'] = self.plot_dict[ax].plot(pen=pg.mkPen(color='w', width=2), name='st_avg')
self.curve_dict[ax]['st_avg'].setData([0, 1], [0, 1])
if ii < len(self.data_generator.ac_live.axes) - 1:
self.win['accel_and_st_avg'].nextRow()
for ii, ax in enumerate(self.data_generator.ac_live.axes):
self.plot_dict[ax] = self.win['accel_derivative'].addPlot()
self.plot_dict[ax].addLegend()
self.plot_dict[ax].showGrid(x=True, y=True)
self.plot_dict[ax].setYRange(-2000, 2000)
self.plot_dict[ax].setLabel(axis='left', text='Raw x', units='')
self.plot_dict[ax].setLabel(axis='bottom', text='time', units='s')
self.curve_dict[ax]['derivative'] = self.plot_dict[ax].plot(pen=pg.mkPen(color='r', width=3), name='derivative')
self.curve_dict[ax]['derivative'].setData([0, 1], [0, 1])
if ii < len(self.data_generator.ac_live.axes) - 1:
self.win['accel_derivative'].nextRow()
def update_plots(self, data):
if self.data_generator.mutex.tryLock():
stream_obj = copy(data)
self.data_generator.mutex.unlock()
for ax in self.data_generator.ac_live.axes:
oldest_index = stream_obj[ax]['oldest_index']
raw_vec = np.roll(stream_obj[ax]['raw'], -oldest_index)
st_avg_vec = np.roll(stream_obj[ax]['st_avg'], -oldest_index)
derivative_vec = np.roll(stream_obj[ax]['derivative'], -oldest_index)
self.curve_dict[ax]['raw'].setData(raw_vec)
self.curve_dict[ax]['st_avg'].setData(st_avg_vec)
self.curve_dict[ax]['derivative'].setData(derivative_vec)
if __name__ == '__main__':
app = QtGui.QApplication(sys.argv)
main = MainWinM4()
main.show()
sys.exit(app.exec_())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment