Skip to content

Instantly share code, notes, and snippets.

@heitorfr
Last active June 25, 2018 03:37
Show Gist options
  • Save heitorfr/689806e404b8afd1e87e to your computer and use it in GitHub Desktop.
Save heitorfr/689806e404b8afd1e87e to your computer and use it in GitHub Desktop.
Plots bitalino signals in realtime and saves as video
import bitalino
import numpy
import signal
import sys
import time
import threading
import datetime
from matplotlib import pyplot as plt
from matplotlib import animation
from multiprocessing import Process,Value,Queue
from Queue import Empty
addr = '/dev/cu.bitalino-DevB'
# Channel
# 0 - EMG
# 1 - EDA
# 2 - ECG
# 3 - Acelerometer
# 4 - Lux
channel = 2
sampling_rate = 100
read_buffer_size = 10
plot_size = sampling_rate*5
video_file_name = 'bitalino_'+str(channel)+'_'+datetime.datetime.now().strftime("%Y%m%d_%H%M%S")+'.mp4'
should_exit = Value('b',0)
q = Queue()
plot_data = [0]*plot_size
miny,maxy = 0,0
num_samples = 0
fig = plt.figure(figsize=(15, 6))
ax = fig.add_subplot(111)
line, = ax.plot(range(-plot_size,0), plot_data, 'b-')
#extra_args=['-vcodec', 'libx264']
writer = animation.writers['ffmpeg'](fps=sampling_rate/read_buffer_size )
writer.setup(fig, video_file_name, 100,)
def plot_add_samples(samples):
global plot_data, num_samples
plot_data = plot_data[len(samples):] + samples
num_samples += len(samples)
def plot_set_data():
marginy = (maxy-miny)*0.05
if marginy > 0: ax.set_ylim([miny-marginy,maxy+marginy])
line.set_xdata(range(num_samples-plot_size,num_samples))
line.set_ydata(plot_data)
ax.relim()
ax.autoscale(True,'x',True)
def plot_init():
global miny,maxy
samples = q.get()
plot_add_samples(samples)
miny,maxy = min(samples), max(samples)
plot_set_data()
return line,
def plot_update(i):
global plot_data, miny, maxy, num_samples
try:
while True:
samples = q.get_nowait()
plot_add_samples(samples)
miny,maxy = min(miny,min(samples)), max(maxy,max(samples))
except Empty:
plot_set_data()
writer.grab_frame()
return line,
def read_data(q,should_exit):
signal.signal(signal.SIGINT, signal.SIG_IGN)
print "Started acquisition process"
device = bitalino.BITalino()
print device.find(True)
device.open(addr, sampling_rate)
device.start([channel])
print "Bitalino started"
while should_exit.value == 0:
try:
data_acquired = device.read(read_buffer_size)
samples = data_acquired[5,:].tolist()
q.put(samples)
except Exception, e:
print "Acquisiton: ", e
device.stop()
device.close()
print "Finished acquisition process"
p = Process(target=read_data,args=(q,should_exit))
p.start()
def exit():
print "Exiting"
should_exit.value = 1
p.join()
def signal_handler(signal, frame):
exit()
signal.signal(signal.SIGINT, signal_handler)
anim = animation.FuncAnimation(fig, plot_update, init_func=plot_init, interval=100, blit=False)
plt.show()
writer.finish()
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment