Skip to content

Instantly share code, notes, and snippets.

Last active June 15, 2023 20:08
Show Gist options
  • Save louismullie/b111be352e4f581670ed8dedf00cfd13 to your computer and use it in GitHub Desktop.
Save louismullie/b111be352e4f581670ed8dedf00cfd13 to your computer and use it in GitHub Desktop.
CareScape parser
import re
from struct import iter_unpack
from quopri import decodestring
import magic
import matplotlib.pyplot as plt
import numpy as np
# Constants
'EKG': 'uV',
'Pulse oximetry': '%',
'Pressure - artery': 'mmHg',
'Pressure - central_venous': 'mmHg',
UNIT_SCALE = 0.001
# Plotting constants
'EKG': -0.75,
'Pulse oximetry': 0,
'Pressure - artery': 0,
'Pressure - central_venous': 0,
'EKG': 0.75,
'Pulse oximetry': 150,
'Pressure - artery': 150,
'Pressure - central_venous': 30,
def load_data(file_name):
Reads a binary file and returns its content as a list of lines.
with open(file_name, 'rb') as file:
lines = file.readlines()
return lines
def process_data(lines):
Main data processing function. Breaks the input lines into chunks, processes these
chunks, and then processes the binary data. Returns the processed leads and sources.
chunks = chunk_lines(lines)
binary_lines, byte_sections, endian = process_chunks(chunks)
leads, sources = process_binary_data(binary_lines, byte_sections, endian)
return leads, sources
def chunk_lines(lines):
Splits the input lines into chunks using the '--' separator.
chunks = []
current_chunk = []
for line in lines:
if line[0:2] == b'--' and len(current_chunk) > 0:
current_chunk = []
return chunks
def process_chunks(chunks):
Processes each chunk based on its MIME type, identified by the guess_mime_type function.
Returns the processed binary lines, byte sections and endianess.
binary_lines = []
byte_sections = []
endian = None
for chunk in chunks:
guessed_mime_type, joined_chunk = guess_mime_type(chunk)
if guessed_mime_type in ['text/html', 'application/xml']:
endian, byte_sections = process_textual_data(joined_chunk, byte_sections)
elif guessed_mime_type == 'application/octet-stream':
binary_lines = process_binary_data_chunk(chunk, binary_lines)
elif guessed_mime_type == 'text/plain':
raise Exception(f'ERROR: unrecognized MIME type {guessed_mime_type}')
return binary_lines, byte_sections, endian
def guess_mime_type(chunk):
Joins a chunk into a single string and guesses its MIME type using the magic module.
Returns the guessed MIME type and the joined chunk.
joined_chunk = b''.join(chunk)
guessed_mime_type = magic.from_buffer(joined_chunk, mime=True)
return guessed_mime_type, joined_chunk
def process_textual_data(joined_chunk, byte_sections):
Processes textual data by decoding the chunk, identifying endianess, and
extracting data from chunk lines. Returns the endianess and byte sections.
endian = None
current_ip_site = None
current_ip_label = None
decoded_str = decodestring(joined_chunk)
if b'bigEndian' in decoded_str:
endian = '>'
elif b'littleEndian' in decoded_str:
endian = '<'
chunk_lines = decoded_str.split(b'\n')
for chunk_line in chunk_lines:
m ='site="([a-zA-Z0-9_]+)"', str(chunk_line))
if m is not None:
current_ip_site =
m ='label="([a-zA-Z0-9]+)"', str(chunk_line))
if m is not None:
current_ip_label =
if b'BT=' in chunk_line:
byte_sections = process_byte_data(chunk_line, byte_sections, current_ip_site)
return endian, byte_sections
def process_byte_data(chunk_line, current_ip_site):
Processes a line of chunk data by identifying the binary type and extracting
various data properties. Returns a list containing the unit size, unit number,
format char, name, and lead.
unit_size = None
format_char = None
name = None
lead = None
# Check the binary type
if b'BT="xs:unsignedByte' in chunk_line:
unit_size = 8
format_char = 'c'
elif b'BT="xs:short' in chunk_line:
unit_size = 16
format_char = 'h'
elif b'BT="xs:unsignedShort' in chunk_line:
unit_size = 16
format_char = 'H'
elif b'BT="xs:unsignedInt' in chunk_line:
unit_size = 32
format_char = 'I'
elif b'BT="xb:NTP-32' in chunk_line:
unit_size = 32
format_char = 'L'
elif b'BT="xb:bool-8' in chunk_line:
unit_size = 8
format_char = 'c'
raise Exception('Unrecognized data type')
# Extract array size
m ='asizeBT="([0-9]+)"', str(chunk_line))
unit_num = int( if m is not None else 1
# Extract name
m ='<([a-zA-Z]+) ', str(chunk_line))
name = if m is not None else ''
# Extract lead
m ='lead="([a-zA-Z0-9]+)"', str(chunk_line))
lead = if m is not None else ''
# Rename if necessary
if name == 'ipWaveform':
name = 'Pressure - ' + current_ip_site
elif name == 'pleth':
name = 'Pulse oximetry'
elif name == 'ecgWaveform':
name = 'Electrocardiogram'
return [unit_size, unit_num, format_char, name, lead]
def process_binary_data(binary_lines, byte_sections, endian):
Processes binary data by parsing the data based on the byte sections.
Populates the leads and sources. Returns the processed leads and sources.
leads = {'I': [], 'II': [], 'III': [], 'AVR': [], 'AVF': [], 'AVL': [], 'V1': []}
sources = {'Electrocardiogram': [], 'Pulse oximetry': [],
'Pressure - artery': [], 'Pressure - central_venous': []}
for binary_data in binary_lines:
pointer = 0
for byte_section in byte_sections:
section_length = byte_section[0] * byte_section[1]
section_format = byte_section[2]
byte_length = int(section_length / 8)
section_data = binary_data[pointer:pointer+byte_length]
section_name = byte_section[3]
section_lead = byte_section[4]
parsed_data = iter_unpack(endian + section_format, section_data)
section_data = []
for item in parsed_data:
if section_name in sources.keys():
if len(np.unique(section_data)) > 1:
sources[section_name] = sources[section_name] + list(section_data)
if section_name == 'Electrocardiogram' and section_lead in leads.keys():
if len(np.unique(section_data)) > 1:
leads[section_lead] = leads[section_lead] + list(section_data)
pointer += byte_length
assert(pointer == len(binary_data))
return leads, sources
def process_binary_data_chunk(chunk, binary_lines):
Processes a chunk of binary data by identifying the start of the binary data and
joining all binary lines. Returns the binary lines.
binary_started = False
binary_lines_chunk = []
for chunk_line in chunk:
if chunk_line == b'Content-Transfer-Encoding: binary\r\n':
binary_started = True
if chunk_line == b'\r\n':
if binary_started:
binary_line = chunk_line
binary_line = b''.join(binary_lines_chunk)
# trailing \r
if binary_line[-1] == 10:
binary_line = binary_line[0:-1]
# trailing \n
if binary_line[-1] == 13:
binary_line = binary_line[0:-1]
return binary_lines
def plot_data(leads, sources):
Plots the processed data using the plot_series function.
leads_with_data = [x for x in leads.keys() if len(leads[x]) > 0]
sources_with_data = [x for x in sources.keys() if len(sources[x]) > 0]
num_channels = len(leads_with_data) + len(sources_with_data)
fig, axs = plt.subplots(num_channels)
for i, lead in enumerate(leads_with_data):
plot_series(axs[i], leads[lead], 'EKG - ' + lead, UNITS['EKG'],
for j, source in enumerate(sources_with_data):
plot_series(axs[i+j+1], sources[source], source, UNITS[source],
UNIT_SCALE, Y_LIM_LOW[source], Y_LIM_HIGH[source])
def plot_series(ax, data, title, units, unit_scale, ylim_low=None, ylim_high=None):
Plots a series of data on a given axes.
ax.plot(np.asarray(data) * unit_scale)
ax.set_ylabel(units, fontsize=7)
if ylim_low and ylim_high:
ax.set_ylim([ylim_low, ylim_high])
def main():
Loads data, processes it, and then plots the results.
# Load data
lines = load_data('ecg_test2.txt')
# Process data
leads, sources = process_data(lines)
# Plot data
plot_data(leads, sources)
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment