Skip to content

Instantly share code, notes, and snippets.

@matham
Created March 6, 2015 18:46
Show Gist options
  • Save matham/16ce6e8c43b32d9a0f28 to your computer and use it in GitHub Desktop.
Save matham/16ce6e8c43b32d9a0f28 to your computer and use it in GitHub Desktop.
import time
from time import sleep
from threading import Thread
import tables as tb
from os.path import exists, isfile
from functools import partial
from re import match, compile
from Queue import Queue
from pybarst.core.server import BarstServer
from pybarst.ftdi import FTDIChannel
from pybarst.ftdi.adc import ADCSettings
from pybarst.ftdi.switch import SerializerSettings
from pybarst.serial import SerialChannel
# order is air, a, b and must be of length 3.
mfc_config = (('port', 'mfc_id'), )
_finish_program = False
def set_mfc_rate(mfc, mfc_id, to, val):
mfc.write('!{:02X},S,{:.3f}\r\n'.format(mfc_id, val), to)
rate_out = '!{:02X},S{:.3f}\r\n'.format(mfc_id, val)
_, val = mfc.read(len(rate_out), to)
if val != rate_out:
raise Exception('Failed setting MFC rate. '
'Expected "{}", got "{}"'.format(rate_out, val))
def get_mfc_rate(mfc, mfc_id, to, rate_pat):
mfc.write('!{:02X},F\r\n'.format(mfc_id), to)
t, val = mfc.read(24, stop_char='\n', timeout=to)
m = match(rate_pat, val)
if m is None:
raise Exception('Failed getting MFC rate. '
'Got "{}"'.format(val))
return t, float(m.group(1))
def start_barst(mfc_config):
assert len(mfc_config) == 3
server = BarstServer(
barst_path=r'C:\Program Files\Barst\Barst64.exe',
pipe_name=r'\\.\pipe\TestPipe')
server.close_server()
time.sleep(1.)
server.open_server()
server.get_manager('ftdi')
adc = ADCSettings(
clock_bit=3, lowest_bit=4, num_bits=3, sampling_rate=1000, chan1=True,
chan2=False, transfer_size=100, data_width=16, hw_buff_size=0)
odor = SerializerSettings(
clock_bit=0, data_bit=1, latch_bit=2, num_boards=2, output=True)
ftdi = FTDIChannel(
channels=[adc, odor], server=server, desc='Birch Board rev1 B')
mfcs = []
to = 4000
for name, mfc_id in mfc_config:
if name is None:
mfcs.append((None, None, None))
continue
mfc = SerialChannel(
server=server, port_name=name, max_write=96, max_read=96,
baud_rate=9600, stop_bits=1, parity='none', byte_size=8)
mfc.open_channel()
# set digital mode
mfc.write('!{:02X},M,D\r\n'.format(mfc_id), to)
dig_out = '!{:02X},MD\r\n'.format(mfc_id)
_, val = mfc.read(len(dig_out), to)
if val != dig_out:
raise Exception('Failed setting MFC to digital mode. '
'Expected "{}", got "{}"'.format(dig_out, val))
# set to standard LPM
mfc.write('!{:02X},U,SLPM\r\n'.format(mfc_id), to)
units_out = '!{:02X},USLPM\r\n'.format(mfc_id)
_, val = mfc.read(len(units_out), to)
if val != units_out:
raise Exception('Failed setting MFC to use SLPM units. '
'Expected "{}", got "{}"'.format(units_out, val))
setter = partial(set_mfc_rate, mfc, mfc_id, to)
getter = partial(
get_mfc_rate, mfc, mfc_id, to,
compile(r'\!{:02X},([0-9\.]+)\r\n'.format(mfc_id)))
setter(0)
mfcs.append((mfc, setter, getter))
adc, odor = ftdi.open_channel(alloc=True)
adc.open_channel()
adc.set_state(True)
odor.open_channel()
odor.set_state(True)
return server, ftdi, adc, odor, mfcs
def create_file(hdf_filename, odor_names, odor_mfc_map):
if exists(hdf_filename):
raise Exception('{} already exists'.format(hdf_filename))
odor_names = list(odor_names) + ['', ] * (16 - len(odor_names))
for i, name in enumerate(odor_names):
if name:
continue
odor_names[i] = 'p{}'.format(i)
fh = tb.open_file(
filename=hdf_filename, mode='w', title='PID Olfactometer diagnostics')
data = fh.create_group(fh.root, name='data', title='Holds the data')
odors = fh.create_group(data, name='odors', title='Holds valve status')
pid = fh.create_group(data, name='pid', title='Holds the PID output')
mfc = fh.create_group(data, name='mfc', title='Holds the MFC state')
mfc._v_attrs.odor_mfc_map = odor_mfc_map
for i, odor in enumerate(odor_names):
o = fh.create_group(odors, name='p{}'.format(i), title=odor)
fh.create_earray(
o, name='ts', atom=tb.Float64Atom(shape=1), shape=(0, ))
fh.create_earray(
o, name='values', atom=tb.BoolAtom(shape=1), shape=(0, ))
fh.create_earray(
pid, name='ts', atom=tb.Float64Atom(shape=1), shape=(0, ))
fh.create_earray(
pid, name='values', atom=tb.Float32Atom(shape=1), shape=(0, ))
fh.create_earray(
pid, name='idx', atom=tb.UInt64Atom(shape=1), shape=(0, ))
for mfc_type in ('air', 'a', 'b'):
m = fh.create_group(mfc, name=mfc_type)
fh.create_earray(
m, name='ts', atom=tb.Float64Atom(shape=1), shape=(0, ))
fh.create_earray(
m, name='values', atom=tb.Float64Atom(shape=1), shape=(0, ))
return fh
def close_barst(server, ftdi, odor, mfcs):
for mfc, setter, _ in mfcs:
setter(0)
mfc.close_channel_server()
odor.write(set_low=range(16))
ftdi.close_channel_server()
server.close_server()
def read_adc(adc, hdf_data, hdf_idx, hdf_ts):
global _finish_program
while not _finish_program:
data = adc.read()
hdf_data.append([data.chan1_data])
hdf_idx.append([[data.chan1_ts_idx]])
hdf_ts.append([[data.ts]])
def read_mfc(getter, hdf_data, hdf_ts):
global _finish_program
while not _finish_program:
ts, val = getter()
hdf_data.append([[val]])
hdf_ts.append([[ts]])
def write_mfc(setter, mfc_queue):
while True:
val, callback = mfc_queue.get(block=True)
if val == 'eof':
return
setter(val)
if callback is not None:
callback()
def write_odor(odor, odor_queue, hdf_odors):
last_state = [None, ] * 16
hdf_groups = []
for i in range(16):
g = getattr(hdf_odors, 'p{}'.format(i))
hdf_groups.append((g.ts, g.values))
while True:
high_vals, low_vals, callback = odor_queue.get(block=True)
if high_vals == 'eof':
return
ts = odor.write(set_high=high_vals, set_low=low_vals)
for vals, test_val in ((high_vals, True), (low_vals, False)):
for i in vals:
if last_state[i] is test_val:
continue
last_state[i] = test_val
hdf_groups[i][0].append([[ts]])
hdf_groups[i][1].append([[test_val]])
if callback is not None:
callback()
def run_program(
program, hdf_filename, odor_names, odor_mfc_map=[],
mfc_config=mfc_config):
global _finish_program
_finish_program = False
server, ftdi, adc, odor, mfcs = start_barst(mfc_config)
fh = create_file(hdf_filename, odor_names, odor_mfc_map)
data = fh.root.data
adc_thread = Thread(
target=read_adc, name='adc',
args=(adc, data.pid.values, data.pid.idx, data.pid.ts))
adc_thread.start()
odor_queue = Queue()
odor_thread = Thread(
target=write_odor, name='odor', args=(odor, odor_queue, data.odors))
odor_thread.start()
mfc_queues = []
mfc_read_threads = []
mfc_write_threads = []
for (mfc, setter, getter), name in zip(mfcs, ('air', 'a', 'b')):
if mfc is None:
mfc_queues.append(None)
mfc_read_threads.append(None)
mfc_write_threads.append(None)
continue
thread = Thread(
target=read_mfc, name='read_mfc',
args=(getter, getattr(data.mfc, name).values,
getattr(data.mfc, name).ts))
thread.start()
mfc_read_threads.append(thread)
q = Queue()
thread = Thread(target=write_mfc, name='write_mfc', args=(setter, q))
thread.start()
mfc_queues.append(q)
mfc_write_threads.append(thread)
q_air, q_a, q_b = mfc_queues
for delay, high_odors, low_odors, mfc_air, mfc_a, mfc_b in program:
if delay:
sleep(delay)
odor_queue.put((high_odors, low_odors, None), block=False)
if q_air is not None:
q_air.put(mfc_air, block=False)
if q_a is not None:
q_a.put(mfc_a, block=False)
if q_b is not None:
q_b.put(mfc_b, block=False)
_finish_program = True
adc_thread.join()
odor_queue.put(('eof', None, None), block=False)
odor_thread.join()
for thread in mfc_read_threads:
if thread is not None:
thread.join()
for thread, queue in zip(mfc_read_threads, mfc_queues):
if thread is not None:
queue.put(('eof', None), block=False)
thread.join()
fh.close()
close_barst(server, ftdi, odor, mfcs)
if __name__ == '__main__':
odor_names = ['NO', 'Limonene', 'Mix']
mfc_config = [(None, None, None), ] * 3
filename = 'data.h5'
program = ()
run_program(
program, hdf_filename=filename, odor_names=odor_names,
odor_mfc_map=[], mfc_config=mfc_config)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment