Skip to content

Instantly share code, notes, and snippets.

@agricolab
Created June 7, 2023 20:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save agricolab/68263fe720551e35fae7c2a0398a5e49 to your computer and use it in GitHub Desktop.
Save agricolab/68263fe720551e35fae7c2a0398a5e49 to your computer and use it in GitHub Desktop.
NeuroPrax2LSL Python Client
# -*- coding: utf-8 -*-
"""TCP/IP client for receiving data from the neuroPrax,
and publishing it via lab streaming layer
Documentation
-------------
args
----
host:str
ip adress where neuroPrax Software is running. defaults to localhost, i.e
'127.0.0.1'
"""
import threading
import socket
import struct
from uuid import getnode
import pylsl
__version__ = '0.3.0'
__author__ = 'Robert Guggenberger'
__license__ = 'MIT'
# %%
class ProtocolHandler():
def __init__(self, recv_foo):
'''handle the neuroPrax Protocol stream
args
----
recv_foo: :meth:`socket.recv`
the low level function for byte-wise receiving
'''
self.recv_bytes = recv_foo
def flush_protocol(self):
'reads and ignores each byte until the end of current protocol block'
msg = ''
while msg[-4:] != 'end$':
msg += self.recv_bytes(1).decode('ASCII', errors='replace')
print(f'Flushed protocol from {msg[:4]} to {msg[-4:]}')
def parse_header(self):
'reads the header of the current block'
header = self.recv_bytes(36).decode()
if 'neuroConn' not in header:
raise ConnectionError(f'Not a valid header: {header} does not ' +
'contain neuroConn')
header = header.split('$')[2].split('-')[1].strip()
return header
def parse_ender(self):
'reads the next four byte and checks whether it is a valid end'
ende = self.recv_bytes(4)
if not b'end$' == ende:
raise ConnectionError(f'Not a valid ender: {ende} instead of end$ ')
# -------------------------------------------------------------------------
# GIP PROTOCOL
# -------------------------------------------------------------------------
def handle_GIP(self):
'''handle a GIP: General Information Protocol
order of keys is fixed according to protocol
we therefore go through all keys, and store them
in a dictionary
'''
settingList = ['recFileName',
'recPathName',
'DB_PatName',
'DB_PatFirstName',
'DB_PatBirthDay',
'DB_PatINR',
'elecSetup',
'fs',
'selAlgorithm',
'numChannels',
'numEXGChannels'
]
chanInfoList = ['ChannelNames',
'ChannelTypes',
'ChannelUnits',
'ChannelReference']
# receive information on the generalsetting
params = self.recv_bytes(1576)
paramList = params.decode().split('$')
paramDict = {}
for key, val in zip(settingList, paramList):
if 'Channels' in key:
paramDict[key] = int(val)
elif 'fs' in key:
paramDict[key] = int(val)
else:
paramDict[key] = val
# receive information on the channels. As this depends on
# channel number, it is within a seperate block
chanInfo = self.recv_bytes(36 * paramDict['numChannels'])
infoList = chanInfo.decode('cp1252').split('$')
for idx, key in enumerate(chanInfoList):
num = paramDict['numChannels']
paramDict[key] = infoList[(num * idx) : (num * (idx + 1)) ]
# store key information in the instance, to allow transformation
# of packets to data files (see packet2cnt)
self.fs = paramDict['fs']
paramDict['ChannelNames'] = [ch.strip() for
ch in paramDict['ChannelNames']]
paramDict['ChannelTypes'] = [t.strip() for
t in paramDict['ChannelTypes']]
# lsl xdf specification requests 'microvolts' instead of 'µV'
units = []
for ch in paramDict['ChannelUnits']:
if ch.strip() == 'µV':
units.append('microvolts')
else:
units.append(ch.strip())
paramDict['ChannelUnits'] = units
self.settings = paramDict
return paramDict
# -------------------------------------------------------------------------
# MNP PROTOCOL
# -------------------------------------------------------------------------
def handle_MNP(self):
'handle a MNP: Marker Name Protocol'
markerCount = self.recv_bytes(4)
markerCount = int(markerCount.decode('cp1252')[:-1])
params = self.recv_bytes(40 * markerCount)
paramList = params.decode('cp1252').split('$')
markerDict = {}
for mi, mn in zip(paramList[0::2], paramList[1::2]):
markerDict[mi.strip()] = mn.strip()
self.receivedMNP = True
return markerDict
# -------------------------------------------------------------------------
# ISP PROTOCOL
# -------------------------------------------------------------------------
def handle_ISP(self):
'handle a ISP: Impedance Protocol'
params = self.recv_bytes(5)
numChannels = int(params.decode('cp1252').split('$')[0])
params = self.recv_bytes(12 * numChannels)
paramList = params.decode('cp1252').split('$')
impedance = {}
for chanNames, impedanceStatus in zip(paramList[0::2], paramList[1::2]):
impedance[chanNames.strip()] = impedanceStatus.strip()
return {'impedance':impedance}
# -------------------------------------------------------------------------
# DP PROTOCOL
# -------------------------------------------------------------------------
def handle_DP(self):
'handle a DP: Data Protocol'
params = self.recv_bytes(36)
params = [int(p) for p in params.decode('cp1252').split('$')[0:-1]]
# print(params)
#timeStamp = params[0]
buffer = self.recv_bytes(params[1] * params[2] * 4)
buffer = [v[0] for v in struct.iter_unpack('<f', buffer)]
packet = []
for i in range(0, len(buffer), params[2]):
packet.append(buffer[i:i+params[2]])
return packet
# -------------------------------------------------------------------------
def handle_stream(self):
try:
header = self.parse_header()
if 'GIP' in header:
msg= self.handle_GIP() # dict
elif 'MNP' in header:
msg = self.handle_MNP() # dict
elif 'DP' in header:
msg = self.handle_DP()
elif 'BOP' in header:
msg = None
elif 'ISP' in header:
msg = self.handle_ISP() # dict
else:
msg = None
print((f'Invalid protocol: {header}'))
self.parse_ender()
except UnicodeDecodeError as e:
self.flush_protocol()
header = str(e)
msg = None
return header, msg
# %%
TimeOutError = socket.timeout
class StreamReceiver():
'''Receives the TCP/IP stream from the neuroPrax PC '''
_state_was_sent = 0
_state = 'standby'
def __init__(self, host=None, port=8574):
'''initialize the TCP-IP StreamReceiver
args
----
host:str {None}
the ip adress of the neuroPrax device
port: int {8574, 8575}
the port for raw or corrected data stream
'''
if host is None:
raise ValueError('Specify a valid host IP')
self.host = host
self.port = port
self.state = 'standby'
self.settings = {}
def get_state(self):
if self._state == 'timeout_recv':
msg = f'Standby: Waiting for neuroPrax to stream'
elif self._state == 'standby':
msg = f'Standby: Waiting for neuroPrax to connect'
elif self._state == 'connected':
msg = f'Connected with neuroPrax software'
elif self._state == 'receiving':
msg = f'Connected with neuroPrax stream'
else:
msg = f'Logging Error: Undefined state'
if self._state_was_sent == False:
self._state_was_sent = True
return msg
def set_state(self, state:str):
if state != self._state:
self._state_was_sent = False
self._state = state
print('\n\r-------------------')
print(self.state)
state = property(get_state, set_state)
def connect(self):
'connect the TCP-IP stream'
self.interface = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.interface.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.interface.settimeout(1)
connected = False
while not connected:
try:
self.interface.connect((self.host, self.port))
connected = True
self.state = 'connected'
except TimeOutError as e:
self.state = 'standby'
except OSError as e:
# if neuroPrax Software was not started yet, it throws OsError
self.state = 'standby'
return connected
def recv_bytes(self, numBytes=1):
'blocks until timeout or it read the next numBytes bytes'
cntBytes = 0
dataBuffer = b''
while cntBytes < numBytes:
try:
msg = self.interface.recv(numBytes - cntBytes)
dataBuffer += msg
cntBytes = len(dataBuffer)
self.state = 'receiving'
except TimeOutError as e:
self.state = 'timeout_recv'
except OSError as e:
raise ConnectionResetError('Standby: Waiting for neuroPrax')
return dataBuffer
def close(self):
'close the TCP-IP stream'
if hasattr(self, 'interface'):
try:
self.interface.shutdown(2)
self.interface.close()
del self.interface
except OSError:
self.state = 'standby'
return False
def _is_connected(self):
return hasattr(self, 'interface')
is_connected = property(_is_connected)
# %%
class LSL_Publisher:
@classmethod
def from_settings(cls, settings):
obj = cls(chan_names=settings['ChannelNames'],
units=settings['ChannelUnits'],
types=settings['ChannelTypes'],
fs=settings['fs'])
return obj
def __init__(self,
chan_names=['Mock'],
units='microvolts',
types='EEG',
fs=0):
self.fs = fs
self.chan_names = chan_names
self.mrk_chan_idx = []
if 'MRK' in chan_names:
self.mrk_chan_idx.append(chan_names.index('MRK'))
if 'DTRIG' in chan_names:
self.mrk_chan_idx.append(chan_names.index('DTRIG'))
source_id = str(hex(getnode()))
# ---------------------------------------------------------------------
# create eeg info and stream
eeg_info= pylsl.StreamInfo(name='neuroPrax',
type='EEG',
channel_count=len(chan_names),
nominal_srate=fs,
channel_format='float32',
source_id='neuroPrax_EEG_' + source_id)
#eeg_info.desc().append_child_value("funding_body",'BMBF_13GW0119')
acq = eeg_info.desc().append_child("acquisition")
acq.append_child_value('manufacturer', 'neuroConn')
acq.append_child_value('model', 'neuroPrax')
acq.append_child_value('precision', '24')
acq.append_child_value('compensated_lag', '0')
channels = eeg_info.desc().append_child("channels")
for c, u, t in zip(chan_names, units, types):
channels.append_child("channel") \
.append_child_value("label", c) \
.append_child_value("unit", u) \
.append_child_value("type", t)
self.eeg_stream = pylsl.StreamOutlet(eeg_info,
chunk_size=0,
max_buffered=1)
print(f'StreamOutlet created with source_id ' +
f'"neuroPrax_EEG_{source_id}"')
# ---------------------------------------------------------------------
mrk_info= pylsl.StreamInfo(name='neuroPrax_Markers',
type='Markers',
channel_count=1,
nominal_srate=0,
channel_format='string',
source_id='neuroPrax_MRK_' + source_id)
self.mrk_stream = pylsl.StreamOutlet(mrk_info)
# ---------------------------------------------------------------------
self.l0 = pylsl.local_clock()
def publish(self, item):
timestamp = pylsl.local_clock()
# publish eeg
self.eeg_stream.push_chunk(item, timestamp)
reltime = timestamp-self.l0
logmsg = 'Streaming for {0:4.5f}s'.format(reltime)
print(logmsg, end='\r')
# publish markers
for idx in self.mrk_chan_idx:
old_mrk_val = '0'
for i in item:
sample = int(i[idx])
if sample > 0:
ts = timestamp + idx/self.fs
rs = reltime + idx/self.fs
mrk_val = str(sample)
if mrk_val != old_mrk_val:
self.mrk_stream.push_sample([mrk_val], ts)
old_mrk_val = mrk_val
print('Handling a marker at {0:>10.5f} : {1:>10.7s}'.format(rs, mrk_val), end='\n')
# %%
class neuroPraxService(threading.Thread):
settings = {}
def __init__(self, host=None, port=8574):
threading.Thread.__init__(self)
self.receiver = StreamReceiver(host=host, port=port)
self.handler = ProtocolHandler(self.receiver.recv_bytes)
def get_item(self):
'''return either the last packet of data as ndarray or
settings as dictionary'''
header, msg = self.handler.handle_stream()
if header == 'DP':
# create a publisher, if settings are already available
# otherwise return None
if not hasattr(self, 'publisher'):
try:
self.publisher = LSL_Publisher.from_settings(self.settings)
except KeyError as e:
return None
return msg
elif header == 'MNP' or header == 'ISP' or header == 'GIP':
if hasattr(self, 'publisher'):
del self.publisher
self.settings.update(msg)
return None
elif header == 'GIP':
self.settings.update(msg)
return None
elif header =='BOP':
self.receiver.close()
self.receiver.connect()
print(f'Ignoring data from {header}')
return None
def run(self):
while True:
if not self.receiver.is_connected:
self.receiver.connect()
try:
item = self.get_item()
except ConnectionError or ConnectionResetError as e:
self.receiver.close()
print(str(e))
continue
if item is not None:
self.publisher.publish(item)
#%%
def _help():
print(__doc__)
def _start_server(host='127.0.0.1'):
# start server
client = neuroPraxService(host=host, port=8574)
client.start()
def _build2exe(buildPath='..\\..\\..\\..\\bin\\build\\device'):
'''builds the lab streaming server as a windows stand-alone executable
args
----
buildPath: str
defaults to internal location in StimulationFramework
set it to '.' to build within the source code folder
'''
import os
import shutil
filename, extension = os.path.splitext(__file__)
if extension == '.exe':
raise Exception('Will not build. Am already a stand-alone executable')
CURDIR = os.path.dirname(__file__)
DIR = os.path.join(CURDIR, buildPath)
WORKPATH = DIR
EXECNAME = 'neuroPraxLSL'
DLLPATH = '..\\..\\..\\..\\dependencies\\bin\\pylsl\\'
BINARIES = f'{DLLPATH}liblsl64.dll;pylsl'
command = (f'python -O -m PyInstaller --onefile --workpath {WORKPATH} ' +
f'--distpath {DIR} {__file__} ' +
f'--name {EXECNAME} ' +
f'--add-binary {BINARIES} ')
print(command)
os.system(command)
print(f'Renaming executable {EXECNAME}')
ARTIFACTS = os.path.splitext(os.path.split(EXECNAME)[1])[0]
ARTIFACTS = os.path.join(WORKPATH, ARTIFACTS)
print('Deleting build artifacts')
shutil.rmtree(ARTIFACTS)
SPEC = os.path.splitext(EXECNAME)[0] + '.spec'
os.remove(SPEC)
# %%
if __name__ == "__main__":
import sys, os
# parse arguments
args = sys.argv[1:]
print(f'Starting neuroPrax-LSL converter Version: {__version__}')
if '--help' in args:
_help()
elif '--build' in args:
if len(args) == 2:
pth = args[1]
if os.path.exists(pth):
_build2exe(pth)
else:
print('Folder does not exist')
elif len(args) == 1:
_build2exe()
else:
print('Only one additional argument, is allowed, when ' +
'you want to build: \n the build directory')
else:
if len(args) > 0:
host = args[0]
print(f'I was told to search at {host}')
_start_server(host)
else:
host = '127.0.0.1'
print(f'Attempting to find the host at {host}')
_start_server(host)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment