Last active
April 15, 2022 04:32
-
-
Save peterhillyard/027f6ec86f9603ee76d2212a4c6a98f4 to your computer and use it in GitHub Desktop.
A pyqtgraph example for plotting realtime data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from accelerometer import AccelerometerLiveM5, AccelerometerLiveM4 | |
from PyQt5.Qt import QMutex | |
from pyqtgraph.Qt import QtGui, QtCore | |
from copy import copy | |
import pyqtgraph as pg | |
import sys | |
import time | |
import numpy as np | |
import const as c | |
pg.setConfigOptions(useOpenGL=False) | |
class StreamEmitterM4(QtCore.QObject): | |
new_data_signal = QtCore.pyqtSignal(object) | |
def __init__(self, parent=None): | |
QtCore.QObject.__init__(self) | |
self.parent = parent | |
self.mutex = QMutex() | |
self.num_samples = 1000 | |
self.ac_live = AccelerometerLiveM4(c.zmq_obj, 0.05) | |
self.setup_data_dicts() | |
def setup_data_dicts(self): | |
self.stream_obj = {} | |
self.stream_obj['singles'] = { | |
'oldest_index': 0, | |
'ts_vec': np.nan*np.ones(self.num_samples), | |
} | |
for ax in self.ac_live.axes: | |
self.stream_obj[ax] = { | |
'raw': np.zeros(self.num_samples), | |
'st_avg': np.zeros(self.num_samples), | |
'derivative': np.zeros(self.num_samples), | |
'oldest_index': 0 | |
} | |
def recv_and_update_motion(self): | |
prev_ts = 0 | |
is_running = True | |
while is_running: | |
# Get list of valid updated links | |
self.ac_live.process_data() | |
self.mutex.lock() | |
ts = time.time() | |
index = self.stream_obj['singles']['oldest_index'] | |
self.stream_obj['singles']['ts_vec'][index] = ts | |
self.stream_obj['singles']['oldest_index'] = (index + 1) % self.num_samples | |
for ax in self.ac_live.axes: | |
index = self.stream_obj[ax]['oldest_index'] | |
self.stream_obj[ax]['raw'][index] = self.ac_live.accel[ax] | |
self.stream_obj[ax]['st_avg'][index] = self.ac_live.accel_st_avg[ax] | |
self.stream_obj[ax]['derivative'][index] = self.ac_live.accel_derivative[ax] | |
self.stream_obj[ax]['oldest_index'] = (index + 1) % self.num_samples | |
self.mutex.unlock() | |
# Send out new data | |
if ts - prev_ts > 0.1: | |
self.new_data_signal.emit(self.stream_obj) | |
prev_ts = ts | |
class MainWinM4(QtGui.QMainWindow): | |
"""Holds a couple of tone power plots. Updates the plots whenever DataReceiver | |
signals that new data is ready.""" | |
def __init__(self): | |
QtGui.QMainWindow.__init__(self) | |
self.data_generator = StreamEmitterM4(self) | |
self.qt_thread = QtCore.QThread() | |
self.data_generator.moveToThread(self.qt_thread) | |
self.data_generator.new_data_signal.connect(self.update_plots) | |
self.qt_thread.started.connect(self.data_generator.recv_and_update_motion) | |
self.qt_thread.start() | |
self.setup_plot() | |
def setup_plot(self): | |
self.win = {} | |
self.win['accel_and_st_avg'] = pg.GraphicsWindow(title="") | |
self.win['accel_derivative'] = pg.GraphicsWindow(title="") | |
# win.resize(640, 480) | |
pg.setConfigOptions(antialias=True) | |
self.setCentralWidget(self.win['accel_and_st_avg']) | |
self.curve_dict = {} | |
self.plot_dict = {} | |
for ii, ax in enumerate(self.data_generator.ac_live.axes): | |
self.plot_dict[ax] = self.win['accel_and_st_avg'].addPlot() | |
self.plot_dict[ax].addLegend() | |
self.plot_dict[ax].showGrid(x=True, y=True) | |
self.plot_dict[ax].setYRange(-20000, 20000) | |
self.plot_dict[ax].setLabel(axis='left', text='Raw x', units='') | |
self.plot_dict[ax].setLabel(axis='bottom', text='time', units='s') | |
self.curve_dict[ax] = {} | |
self.curve_dict[ax]['raw'] = self.plot_dict[ax].plot(pen=pg.mkPen(color='r', width=3), name='raw') | |
self.curve_dict[ax]['raw'].setData([0, 1], [0, 1]) | |
self.curve_dict[ax]['st_avg'] = self.plot_dict[ax].plot(pen=pg.mkPen(color='w', width=2), name='st_avg') | |
self.curve_dict[ax]['st_avg'].setData([0, 1], [0, 1]) | |
if ii < len(self.data_generator.ac_live.axes) - 1: | |
self.win['accel_and_st_avg'].nextRow() | |
for ii, ax in enumerate(self.data_generator.ac_live.axes): | |
self.plot_dict[ax] = self.win['accel_derivative'].addPlot() | |
self.plot_dict[ax].addLegend() | |
self.plot_dict[ax].showGrid(x=True, y=True) | |
self.plot_dict[ax].setYRange(-2000, 2000) | |
self.plot_dict[ax].setLabel(axis='left', text='Raw x', units='') | |
self.plot_dict[ax].setLabel(axis='bottom', text='time', units='s') | |
self.curve_dict[ax]['derivative'] = self.plot_dict[ax].plot(pen=pg.mkPen(color='r', width=3), name='derivative') | |
self.curve_dict[ax]['derivative'].setData([0, 1], [0, 1]) | |
if ii < len(self.data_generator.ac_live.axes) - 1: | |
self.win['accel_derivative'].nextRow() | |
def update_plots(self, data): | |
if self.data_generator.mutex.tryLock(): | |
stream_obj = copy(data) | |
self.data_generator.mutex.unlock() | |
for ax in self.data_generator.ac_live.axes: | |
oldest_index = stream_obj[ax]['oldest_index'] | |
raw_vec = np.roll(stream_obj[ax]['raw'], -oldest_index) | |
st_avg_vec = np.roll(stream_obj[ax]['st_avg'], -oldest_index) | |
derivative_vec = np.roll(stream_obj[ax]['derivative'], -oldest_index) | |
self.curve_dict[ax]['raw'].setData(raw_vec) | |
self.curve_dict[ax]['st_avg'].setData(st_avg_vec) | |
self.curve_dict[ax]['derivative'].setData(derivative_vec) | |
if __name__ == '__main__': | |
app = QtGui.QApplication(sys.argv) | |
main = MainWinM4() | |
main.show() | |
sys.exit(app.exec_()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment