Skip to content

Instantly share code, notes, and snippets.

@benongithub
Created February 2, 2023 16:01
Show Gist options
  • Save benongithub/5ce06890f10c9187bfaa4ec30ad4ed72 to your computer and use it in GitHub Desktop.
Save benongithub/5ce06890f10c9187bfaa4ec30ad4ed72 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from threading import Thread
import serial
import time
import collections
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import struct
import copy
import pandas as pd
from matplotlib.widgets import Slider
class serialPlot:
def __init__(self, serialPort='COM7', serialBaud=115200, plotLength=100, dataNumBytes=8, numPlots=3):
# servo_one_slider, servo_two_slider,
self.port = serialPort
self.baud = serialBaud
self.plotMaxLength = plotLength
self.dataNumBytes = dataNumBytes
self.numPlots = numPlots
self.rawData = bytearray(numPlots * dataNumBytes)
self.dataType = None
self.dataType = 'd'
# if dataNumBytes == 2:
# self.dataType = 'h' # 2 byte integer
# elif dataNumBytes == 4:
# self.dataType = 'f' # 4 byte float
# elif dataNumBytes == 8:
# self.dataType = 'd' # 8 byte double
self.data = []
for i in range(numPlots): # give an array for each type of data and store them in a list
self.data.append(collections.deque([0] * plotLength, maxlen=plotLength))
self.isRun = True
self.isReceiving = False
self.thread = None
self.plotTimer = 0
self.previousTimer = 0
# self.csvData = []
# self.servo_one_slider = servo_one_slider
# self.servo_two_slider = servo_two_slider
print('Trying to connect to: ' + str(serialPort) + ' at ' + str(serialBaud) + ' BAUD.')
try:
self.serialConnection = serial.Serial(serialPort, serialBaud, timeout=4)
print('Connected to ' + str(serialPort) + ' at ' + str(serialBaud) + ' BAUD.')
except:
print("Failed to connect with " + str(serialPort) + ' at ' + str(serialBaud) + ' BAUD.')
def readSerialStart(self):
if self.thread == None:
self.thread = Thread(target=self.backgroundThread)
self.thread.start()
# Block till we start receiving values
while self.isReceiving != True:
time.sleep(0.1)
def getSerialData(self, frame, lines, lineValueText, lineLabel, timeText):
currentTimer = time.perf_counter()
self.plotTimer = int((currentTimer - self.previousTimer) * 1000) # the first reading will be erroneous
self.previousTimer = currentTimer
timeText.set_text('Plot Interval = ' + str(self.plotTimer) + 'ms')
privateData = copy.deepcopy(self.rawData[:]) # so that the 3 values in our plots will be synchronized to the same sample time
for i in range(self.numPlots):
data = privateData[(i*self.dataNumBytes):(self.dataNumBytes + i*self.dataNumBytes)]
value, = struct.unpack(self.dataType, data)
self.data[i].append(value) # we get the latest data point and append it to our array
lines[i].set_data(range(self.plotMaxLength), self.data[i])
lineValueText[i].set_text('[' + lineLabel[i] + '] = ' + str(value))
# self.csvData.append([self.data[0][-1], self.data[1][-1], self.data[2][-1]])
def backgroundThread(self): # retrieve data
time.sleep(1.0) # give some buffer time for retrieving data
self.serialConnection.reset_input_buffer()
while (self.isRun):
self.serialConnection.readinto(self.rawData)
self.isReceiving = True
# print(self.rawData)
def update_servos(self, val):
#print(b'%d,%d\n' % (self.servo_one_slider.val, self.servo_two_slider.val))
pitch_angle = self.servo_one_slider.val
yaw_angle = self.servo_two_slider.val
# x = 20 + pitch_angle * 140
# y = 160 - pitch_angle * 140
x = 60 + pitch_angle * 120
y = 60 + pitch_angle * 120
print(x, y)
self.serialConnection.write(
b'%d,%d\n' % (x, y)
)
def close(self):
self.isRun = False
self.thread.join()
self.serialConnection.close()
print('Disconnected...')
# df = pd.DataFrame(self.csvData)
# df.to_csv('/home/rikisenia/Desktop/data.csv')
def main():
# portName = 'COM5'
portName = 'COM7'
baudRate = 115200
maxPlotLength = 100 # number of points in x-axis of real time plot
dataNumBytes = 8 # number of bytes of 1 data point
numPlots = 3 # number of plots in 1 graph
# plotting starts below
pltInterval = 50 # Period at which the plot animation updates [ms]
xmin = 0
xmax = maxPlotLength
ymin = -(180)
ymax = 360
fig = plt.figure(figsize=(10, 8))
ax = plt.axes(xlim=(xmin, xmax), ylim=(float(ymin - (ymax - ymin) / 10), float(ymax + (ymax - ymin) / 10)))
ax.set_title('Angle')
ax.set_xlabel("Time")
ax.set_ylabel("Servo Angle Output")
lineLabel = ['X', 'Y', 'Z']
style = ['r-', 'c-', 'b-'] # linestyles for the different plots
timeText = ax.text(0.70, 0.95, '', transform=ax.transAxes)
lines = []
lineValueText = []
for i in range(numPlots):
lines.append(ax.plot([], [], style[i], label=lineLabel[i])[0])
lineValueText.append(ax.text(0.70, 0.90-i*0.05, '', transform=ax.transAxes))
plt.legend(loc="upper left")
# fig.subplots_adjust(left=0.25, bottom=0.25)
#
# #servo_one_angle = 180 / 2
# #servo_two_angle = 180 / 2
# servo_one_angle = 0.5
# servo_two_angle = 0.5
#
# axservo_one = fig.add_axes([0.1, 0.25, 0.0225, 0.63])
# servo_one_slider = Slider(
# ax=axservo_one,
# label="Pitch Angle",
# valmin=0,
# valmax=1,
# valinit=servo_one_angle,
# orientation="vertical"
# )
# axservo_two = fig.add_axes([0, 0.25, 0.0225, 0.63])
# servo_two_slider = Slider(
# ax=axservo_two,
# label="Yaw Angle",
# valmin=0,
# valmax=1,
# valinit=servo_two_angle,
# orientation="vertical"
# )
#
# s = serialPlot(servo_one_slider, servo_two_slider, portName, baudRate, maxPlotLength, dataNumBytes, numPlots) # initializes all required variables
#
# servo_one_slider.on_changed(s.update_servos)
# servo_two_slider.on_changed(s.update_servos)
s = serialPlot(portName, baudRate, maxPlotLength, dataNumBytes, numPlots) # initializes all required variables
s.readSerialStart()
anim = animation.FuncAnimation(fig, s.getSerialData, fargs=(lines, lineValueText, lineLabel, timeText), interval=pltInterval) # fargs has to be a tuple
plt.show()
s.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment