Skip to content

Instantly share code, notes, and snippets.

@bowbahdoe
Last active February 10, 2017 23:04
Show Gist options
  • Save bowbahdoe/197615ae9c46d674c62b8c6d0f3a5c85 to your computer and use it in GitHub Desktop.
Save bowbahdoe/197615ae9c46d674c62b8c6d0f3a5c85 to your computer and use it in GitHub Desktop.
'''
Live plots data recieved over serial
'''
import collections
import matplotlib as mpl
import matplotlib.pyplot as plt
import serial
import serial.tools.list_ports as list_ports
import threading
import atexit
import time
class LivePlot:
'''
Thin wrapper over the default plot to provide an interface for live plotting
'''
def __init__(self, data_source,
*, title = None,
xlabel = None,
ylabel = None,
max_points = 1000):
self._data_source = data_source
self._max_points = max_points
self.data = collections.deque([None] * max_points, maxlen=max_points)
###################################################################
# We make the implicit assumption that the data will be displayed #
# in a 1x1 ratio. #
###################################################################
self._figure = plt.figure()
self._axis = self._figure.add_subplot(1,1,1)
if title: self._axis.set_title(title)
if xlabel: self._axis.set_xlabel(xlabel)
if ylabel: self._axis.set_ylabel(ylabel)
def add_data(self, x, y):
'''
adds the arbitrary x and y data points to the data set used by the plot
If adding the data would have the plot exceed max_points, the least
recently added data point is removed
'''
self.data.append(x)
def _add_data_thread(self, shutdown_event):
t = threading.currentThread()
try:
for new_data in self._data_source:
if shutdown_event.is_set():
return
self.add_data(*new_data)
time.sleep(0.001)
except (KeyboardInterrupt, SystemExit):
return
def plot_forever(self):
'''
Continuously plots data from the data source
'''
#################################################################
# Since the target model for a live plot is a continuous #
# data source, we start a new thread to do that data collection #
#################################################################
shutdown_event = threading.Event()
data_col_thread = threading.Thread(target=self._add_data_thread, args = (shutdown_event,))
data_col_thread.start()
def _kill_thread():
'''
kills the data collection thread
'''
data_col_thread.do_run = False
data_col_thread.join()
atexit.register(_kill_thread)
plt.ion()
line, = self._axis.plot(range(len(self.data)),list(self.data), 'r-')
while True:
try:
line.set_ydata([a for a in self.data])
self._axis.clear()
self._axis.plot(range(len(self.data)),list(self.data), 'r-')
plt.pause(0.00001)
except (KeyboardInterrupt, SystemExit, Exception) as e:
shutdown_event.set()
raise e
import random
def a():
while True:
yield (random.random(), random.random())
v = LivePlot(a(), max_points = 100, title = "TestPlot", xlabel="a")
v.plot_forever()
def establish_serial(baud_rate = None, serial_path = None):
while not baud_rate:
try:
baud_rate = input("What is the baud rate: ")
baud_rate = int(baud_rate)
except (EOFError, ValueError):
baud_rate = None
print("Entered baud rate was not a number, please try again")
ports = list_ports.comports()
for p in ports:
print(p)
# TODO handle input from user to determine serial path
serial_path = '/dev/something'
return BeeConnection(serial_path, baud_rate)
class BeeConnection:
'''
Iterator that represents a view of the data being sent over serial
by the exBee
'''
def __init__(self, serial_path, baud_rate, timeout = 1):
'''
initializes serial connection. If initial connect fails, waits half
a second and tries again. If the connection still is not established
after 10 such additional attempts, raises ConnectionError
'''
self._connection = serial.Serial(serial_path, baud_rate, timeout)
attempts = 0
while not self.connection.isOpen():
if attempts == 10:
raise ConnectionError("Failed to connect to serial device")
attempts += 1
time.sleep(0.5)
self.raw_data = collections.deque()
def close(self):
'''
cleans up and closes the serial connection
'''
if self._connection.isOpen():
self._connection.flush()
self._connection.close()
if not self._connection.isOpen():
print("Serial port successfully closed")
else:
print("Something went wrong closing the connection")
def __exit__(self):
'''
closes connection when the object is used in a with block
'''
self.close()
def __del__(self):
'''
closes connection if the object is deleted
'''
self.close()
def __next__(self):
DELIMITER = b"^"
ENDBYTE = b';'
STARTBYTE = b"/"
try:
if self._connection.inWaiting():
self.raw_data.append(self._connection.read())
end = self.raw_data.pop()
self.raw_data.append(end)
if end == ENDBYTE:
data_line = b''.join(self.raw_data)
if data_line:
return 42
except:
raise StopIteration
def __iter__(self):
return self
matplotlib
pyserial
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment