Skip to content

Instantly share code, notes, and snippets.

@tostasmistas
Created November 1, 2017 15:00
Show Gist options
  • Save tostasmistas/d2a6a901763e00c3edc4600cdf81ca9f to your computer and use it in GitHub Desktop.
Save tostasmistas/d2a6a901763e00c3edc4600cdf81ca9f to your computer and use it in GitHub Desktop.
Python middleware for connecting CloudBIT to Repovizz
import os
import socket
import time
import math
import numpy as np
import struct
import re
import shutil
import zipfile
import lxml.etree as ET
import requests
import datetime
import six
import binascii
from collections import OrderedDict
import h5py
host_ip = '192.168.4.1'
port_number = 8001
default_pktsize = 1024
sampling_rate = 1000
no_channels = 4
if no_channels <= 4:
no_bytes = int(math.ceil((12. + 10. * no_channels) / 8.))
else:
no_bytes = int(math.ceil((52. + 6. * (no_channels - 4)) / 8.))
# Dictionary used to add attributes to the XML nodes
extracting_rules = {
'Name': lambda hdf5, xml: hdf5.name.split('/')[-1][1:] if re.match('([0-9A-F]{2}[:-]){5}([0-9A-F]{2})', hdf5.name.split('/')[-1][1:]) is None else hdf5.attrs.get('device'),
'Category': lambda hdf5, xml: extracting_rules['Name'](hdf5, xml).replace(":", "").upper(),
'Expanded': lambda hdf5, xml: '1',
'_Extra': lambda hdf5, xml: '' if isinstance(hdf5, h5py.highlevel.Group) else 'canvas=-1,color=0,selected=1',
'DefaultPath': lambda hdf5, xml: '0',
'EstimatedSampleRate': lambda hdf5, xml: '0.0',
'FrameSize': lambda hdf5, xml: '',
'BytesPerSample': lambda hdf5, xml: '',
'NumChannels': lambda hdf5, xml: '',
'NumSamples': lambda hdf5, xml: str(hdf5.len()),
'ResampledFlag': lambda hdf5, xml: '-1',
'SpecSampleRate': lambda hdf5, xml: '0.0',
'FileType': lambda hdf5, xml: 'CSV',
'MinVal': lambda hdf5, xml: "",
'MaxVal': lambda hdf5, xml: ""
}
# Default anonymity preferences for OpenSignals users
anonymity_prefs = {
'channels': True,
'comments': True,
'date': True,
'device': True,
'device connection': True,
'device name': True,
'digital IO': True,
'duration': True,
'firmware version': True,
'mode': True,
'nsamples': True,
'resolution': True,
'sampling rate': True,
'sync interval': True,
'time': True
}
def create_tcp_client():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
server_address = (host_ip, port_number)
client_socket.connect(server_address)
print(">> OK: TCP connection established\n")
return client_socket
def close():
client_socket.settimeout(1.0) # set a timeout of 1 second
try:
receive(default_pktsize) # receive any pending data
client_socket.shutdown(socket.SHUT_RDWR)
print(">> OK: TCP/IP connection to be gracefully shutdown")
client_socket.close()
print(">> OK: TCP/IP connection disconnected\n")
except socket.timeout:
print(">> ERROR: timed out on receive\n")
client_socket.shutdown(socket.SHUT_RDWR)
print(">> OK: TCP/IP connection to be gracefully shutdown")
client_socket.close()
print(">> OK: TCP/IP connection disconnected\n")
def receive(no_bytes_to_read):
data = b''
while len(data) < no_bytes_to_read:
data += client_socket.recv(1)
return data
def read(no_samples):
data_acquired = np.zeros((no_samples, 5 + no_channels))
for sample in range(no_samples):
raw_data = receive(no_bytes)
decoded_data = list(struct.unpack(no_bytes * "B ", raw_data))
crc = decoded_data[-1] & 0x0F
decoded_data[-1] = decoded_data[-1] & 0xF0
x = 0
for i in range(no_bytes):
for bit in range(7, -1, -1):
x = x << 1
if x & 0x10:
x = x ^ 0x03
x = x ^ ((decoded_data[i] >> bit) & 0x01)
if crc == x & 0x0F: # only fill data to the array if it passes CRC verification
data_acquired[sample, 0] = decoded_data[-1] >> 4 # sequence number
data_acquired[sample, 1] = decoded_data[-2] >> 7 & 0x01
data_acquired[sample, 2] = decoded_data[-2] >> 6 & 0x01
data_acquired[sample, 3] = decoded_data[-2] >> 5 & 0x01
data_acquired[sample, 4] = decoded_data[-2] >> 4 & 0x01
if no_channels > 0:
data_acquired[sample, 5] = ((decoded_data[-2] & 0x0F) << 6) | (decoded_data[-3] >> 2)
if no_channels > 1:
data_acquired[sample, 6] = ((decoded_data[-3] & 0x03) << 8) | decoded_data[-4]
if no_channels > 2:
data_acquired[sample, 7] = (decoded_data[-5] << 2) | (decoded_data[-6] >> 6)
if no_channels > 3:
data_acquired[sample, 8] = ((decoded_data[-6] & 0x3F) << 4) | (decoded_data[-7] >> 4)
if no_channels > 4:
data_acquired[sample, 9] = ((decoded_data[-7] & 0x0F) << 2) | (decoded_data[-8] >> 6)
if no_channels > 5:
data_acquired[sample, 10] = decoded_data[-8] & 0x3F
np.savetxt(dumpOS, [data_acquired[sample]], delimiter='\t', fmt='%i')
else:
print("CRC FAIL!")
return data_acquired
def enumerate_siblings(father_node, child_node):
""" Calculates the number of nodes on the same level that will have the same ID, and returns the final number to be
appended (_0, _1 etc) """
siblings = father_node.findall("./")
sibling_counter = 0
for node in siblings:
if node.get('Category')[:4] == child_node.get('Category')[:4]:
sibling_counter += 1
return father_node.get('ID')+'_'+child_node.get('Category')[:4]+str(sibling_counter-1)
def create_generic_node(hdf5_node, xml_node):
""" Creates a Generic node in the XML tree of the Repovizz datapack """
new_node = ET.SubElement(xml_node, 'Generic')
for id in ('Name', 'Category', 'Expanded', '_Extra'):
new_node.set(id, extracting_rules[id](hdf5_node, xml_node))
new_node.set('ID', enumerate_siblings(xml_node, new_node))
return new_node
def create_metadata_node(hdf5_node, xml_node, parent_xml_node):
""" Creates a Generic (METADATA) node in the XML tree of the Repovizz datapack """
new_node = ET.SubElement(parent_xml_node, 'Generic')
new_node.set('Category', 'METADATA')
new_node.set('Name', 'HDF5 Attributes')
for id in ('Expanded', '_Extra'):
new_node.set(id, extracting_rules[id](hdf5_node, xml_node))
new_node.set('ID', enumerate_siblings(parent_xml_node, new_node))
for id in anonymity_prefs:
if hdf5_node.attrs.get(id) is not None and anonymity_prefs[id] is True:
# Add a Description node for each attribute
new_desc_node = ET.SubElement(new_node, 'Description')
new_desc_node.set('Category', id.upper())
if isinstance(hdf5_node.attrs.get(id), np.ndarray):
new_desc_node.set('Text',
str(np.array([x.decode('utf-8') for x in hdf5_node.attrs.get(id)]).astype(np.int)))
else:
new_desc_node.set('Text',
str(hdf5_node.attrs.get(id).decode('utf-8')))
for id in ('Expanded', '_Extra'):
new_desc_node.set(id, extracting_rules[id](hdf5_node, xml_node))
new_desc_node.set('ID', enumerate_siblings(new_node, new_desc_node))
return new_node
def create_description_node(hdf5_node, xml_node):
""" Creates a Generic (METADATA) node in the XML tree of the Repovizz datapack """
new_node = ET.SubElement(xml_node, 'Description')
new_node.set('Category',hdf5_node.name.split('/')[-1].upper())
new_node.set('Text',str(hdf5_node.value))
for id in ('Expanded', '_Extra'):
new_node.set(id, extracting_rules[id](hdf5_node, xml_node))
new_node.set('ID', enumerate_siblings(xml_node, new_node))
def create_signal_node(hdf5_node, xml_node, sampling_rate):
""" Creates a Signal node in the XML tree of the Repovizz datapack """
new_node = ET.SubElement(xml_node, 'Signal')
for id in ('Name', 'Category', 'Expanded', '_Extra', 'DefaultPath', 'EstimatedSampleRate', 'FrameSize',
'BytesPerSample', 'NumChannels', 'NumSamples', 'ResampledFlag', 'SpecSampleRate', 'FileType',
'MinVal', 'MaxVal'):
new_node.set(id, extracting_rules[id](hdf5_node, xml_node))
new_node.set('ID', enumerate_siblings(xml_node, new_node))
new_node.set('Filename', new_node.get('ID').lower()+'.csv')
new_node.set('SampleRate', str(sampling_rate))
return new_node
def write_signal_node_to_disk(hdf5_node, signal_node, sampling_rate, directory):
""" Writes a repovizz-style .csv file to disk with the contents of a Signal node """
with open(os.path.join(directory, signal_node.get('ID').lower()+'.csv'), 'w') as text_file:
# Extract min and max values
[minimum, maximum] = get_min_max_values(hdf5_node)
# Write the contents of the HDF5 Dataset in a repovizz .csv file
text_file.write('repovizz,framerate='+str(sampling_rate) + ",minval=" + str(minimum) + ",maxval=" + str(maximum) + '\n')
for value in hdf5_node.value:
text_file.write(str(value[0]) + ',')
def get_min_max_values(hdf5_node):
minimum = float('inf')
maximum = -float('inf')
for value in hdf5_node.value:
if value[0] < minimum :
minimum = value[0]
if value[0] > maximum :
maximum = value[0]
if minimum == float('inf'):
minimum = -1.0
if maximum == -float('inf'):
maximum = 1.0
# repovizz assumes maxval > 0
# and minval = 0 or minval = -maxval
if maximum <= 0:
if minimum < 0:
maximum = -minimum
else: # min == 0 and max == 0
minimum = -1.
maximum = 1.
elif minimum >= 0:
minimum = 0.
else: # min < 0
maximum = max(maximum, -minimum)
minimum = -maximum
return [float(minimum), float(maximum)]
def traverse_hdf5(hdf5_node, xml_node, sampling_rate, directory):
print(hdf5_node)
if isinstance(hdf5_node, h5py.highlevel.Group):
# Add a Generic node for each HDF5 Group (used as a container for other nodes)
new_generic_node = create_generic_node(hdf5_node, xml_node)
# Add a Generic node for HDF5 Group attributes (used as a METADATA container)
new_metadata_node = create_metadata_node(hdf5_node, xml_node, new_generic_node)
for children in hdf5_node:
traverse_hdf5(hdf5_node[children], new_generic_node, sampling_rate, directory)
elif isinstance(hdf5_node, h5py.highlevel.Dataset):
if hdf5_node.len() > 0:
# Add a Signal node for each HDF5 Dataset
new_signal_node = create_signal_node(hdf5_node, xml_node, sampling_rate)
# Write the contents of the Signal node to a repovizz-style .csv file
write_signal_node_to_disk(hdf5_node, new_signal_node, sampling_rate, directory)
def zipdir(path, zip_handle):
for root, dirs, files in os.walk(path):
for file in files:
zip_handle.write(os.path.join(root, file), file)
def process_recording(path):
[input_directory, input_filename] = os.path.split(path)
output_directory = os.path.join(input_directory, input_filename[:-3])
if not os.path.exists(output_directory):
os.makedirs(output_directory)
output_xml = os.path.join(output_directory, input_filename[:-2] + 'xml')
f = h5py.File(path, 'r')
root = ET.Element('ROOT')
root.set('ID', 'ROOT0')
for device in enumerate(f):
traverse_hdf5(f[device[1]], root, sampling_rate, output_directory)
# delete all Generic nodes that do not contain Signal nodes
for empty_nodes in root.xpath(".//Generic[not(.//Signal|.//Description)]"):
empty_nodes.getparent().remove(empty_nodes)
with open(output_xml, 'wb') as text_file:
text_file.write(ET.tostring(root))
# zip the generated directory and then delete it
zipf = zipfile.ZipFile(path[:-2] + 'zip', 'w')
zipdir(output_directory, zipf)
zipf.close()
shutil.rmtree(output_directory)
def encode_hdf5_metadata():
message = bytearray.fromhex('0B')
client_socket.send(message)
print(">> OK: message sent")
status_data = receive(18)
if isinstance(status_data, six.string_types):
status_data = str(status_data).encode('hex')
else:
status_data = status_data.hex()
status_data = status_data[-4:]
binary_mode = int(status_data[1], 16) & 0x1
if binary_mode == 0:
cb_mode = 'live'
elif binary_mode == 1:
cb_mode = 'simulated'
mode = 0
if cb_mode == 'simulated':
mode = 1
binary_sampling_rate = (int(status_data[2], 16) & 0xC) >> 2
if binary_sampling_rate == 0:
cb_sampling_rate = 1
elif binary_sampling_rate == 1:
cb_sampling_rate = 10
elif binary_sampling_rate == 2:
cb_sampling_rate = 100
elif binary_sampling_rate == 3:
cb_sampling_rate = 1000
binary_channels = int(status_data[2:4], 16) & 0x3F
cb_channels = ''
if binary_channels & 0x01:
cb_channels += str(1)
if binary_channels & 0x02:
cb_channels += str(2)
if binary_channels & 0x04:
cb_channels += str(3)
if binary_channels & 0x08:
cb_channels += str(4)
if binary_channels & 0x10:
cb_channels += str(5)
if binary_channels & 0x20:
cb_channels += str(6)
cb_no_channels = len(cb_channels)
channels = []
for i in range(0, cb_no_channels):
channels.append('A' + str(cb_channels[i]))
if cb_no_channels == 6:
adc_resolution = [10, 10, 10, 10, 6, 6]
elif cb_no_channels == 5:
adc_resolution = [10, 10, 10, 10, 6]
else:
adc_resolution = [10] * cb_no_channels
metadata = OrderedDict()
metadata['channels'] = [int(i) for i in list(cb_channels)]
metadata['comments'] = ''
metadata['date'] = datetime.datetime.now().strftime('%Y-%m-%d')
metadata['device'] = 'bitalino_rev'
metadata['device connection'] = 'CloudBIT'
metadata['device name'] = '192.168.4.1:8001'
metadata['digital IO'] = [0, 0, 1, 1]
metadata['firmware version'] = 52
metadata['mode'] = mode
metadata['resolution'] = [4, 1, 1, 1, 1] + adc_resolution
metadata['sampling rate'] = cb_sampling_rate
metadata['sync interval'] = 2
metadata['time'] = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
print(metadata)
return metadata
def convert_to_h5():
encoded_metadata = encode_hdf5_metadata()
close() # close the client socket
dumpOS.close() # close the OpenSignals dump file
data = np.loadtxt(pathDumpOS, delimiter='\t')
dumpH5 = h5py.File(pathDumpH5, 'w')
device = dumpH5.create_group('_bitalino')
dt = h5py.special_dtype(vlen=bytes)
device.attrs.create('channels', data=encoded_metadata['channels'], dtype='S10')
device.attrs.create('comments', data=encoded_metadata['comments'], dtype=dt)
device.attrs.create('date', data=encoded_metadata['date'], dtype=dt)
device.attrs.create('device', data=encoded_metadata['device'], dtype=dt)
device.attrs.create('device connection', data=encoded_metadata['device connection'], dtype=dt)
device.attrs.create('device name', data=encoded_metadata['device name'], dtype=dt)
device.attrs.create('digital IO', data=encoded_metadata['digital IO'], dtype='S10')
m, s = divmod(len(data[:, 0])/sampling_rate, 60)
h, m = divmod(m, 60)
duration = '%dh%02dm%02ds' % (h, m, s)
device.attrs.create('duration', data=duration, dtype=dt)
device.attrs.create('firmware version', data=encoded_metadata['firmware version'], dtype='S10')
device.attrs.create('mode', data=encoded_metadata['mode'], dtype='S10')
device.attrs.create('nsamples', data=len(data[:, 0]), dtype='S10')
device.attrs.create('resolution', data=encoded_metadata['resolution'], dtype='S10')
device.attrs.create('resolution', data=encoded_metadata['resolution'], dtype='S10')
device.attrs.create('sampling rate', data=encoded_metadata['sampling rate'], dtype='S10')
device.attrs.create('time', data=encoded_metadata['time'], dtype=dt)
nseq = device.create_group('1nseq')
nseq.create_dataset('1nseq' + str(0), data=np.reshape(data[:, 0], (len(data[:, 0]), 1)), dtype='u2')
digital = device.create_group('2digital')
for i in range(1, 5):
digital.create_dataset('2digital' + str(i), data=np.reshape(data[:, i], (len(data[:, i]), 1)), dtype='u2')
analog = device.create_group('3analog')
for i in range(5, data.shape[1]):
analog.create_dataset('3channel' + str(i - 4), data=np.reshape(data[:, i], (len(data[:, i]), 1)), dtype='u2')
dumpH5.close()
def post_datapack():
binaryZip = open(pathDumpZIP, 'rb')
url = 'https://repovizz.upf.edu/repo/api/datapacks/upload'
data = {'name': 'CloudBIT-dump2', 'folder': 'CloudBIT',
'user': 'tostasmistas', 'api_key': 'e2c0be24560d78c5e599c2a9c9d0bbd2',
'desc': 'TCP/IP direct streaming to cloud', 'keywords': ''}
files = {'file': ('datapack.zip', binaryZip)}
r = requests.post(url, data=data, files=files)
print()
print(r)
print(r.json())
def cleanup():
os.remove(pathDumpOS)
os.remove(pathDumpH5)
os.remove(pathDumpZIP)
if __name__ == '__main__':
pathDumpOS = os.path.join(os.getcwd(), 'RV' + '.TXT')
pathDumpH5 = pathDumpOS[:-4] + '.h5'
pathDumpZIP = pathDumpOS[:-4] + '.zip'
dumpOS = open(pathDumpOS, 'wb')
client_socket = create_tcp_client()
time.sleep(5)
no_samples = 10
try:
print(">> OK: now collecting data...\n")
while True:
data_acquired = read(no_samples)
except KeyboardInterrupt:
print('>> OK: stop data collecting\n')
message = bytearray.fromhex('00')
print(">> OK: message sent\n")
client_socket.send(message)
message = bytearray.fromhex('07')
client_socket.send(message)
print(">> OK: message sent")
version_str = ''
while True:
version_str += receive(1).decode('cp437')
if version_str[-1] == '\n' and 'BITalino' in version_str:
break
print(version_str[version_str.index("BITalino"):-1] + '\n')
print(">> OK: connect to the internet now...\n")
# time.sleep(30) # we need to have time to connect to the internet again
# send received data to Repovizz
convert_to_h5()
process_recording(pathDumpH5)
post_datapack()
cleanup()
print("\n>> DONE")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment