Skip to content

Instantly share code, notes, and snippets.

@louismullie
Last active June 15, 2023 20:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save louismullie/b111be352e4f581670ed8dedf00cfd13 to your computer and use it in GitHub Desktop.
Save louismullie/b111be352e4f581670ed8dedf00cfd13 to your computer and use it in GitHub Desktop.
CareScape parser
import re
from struct import iter_unpack
from quopri import decodestring
import magic
import matplotlib.pyplot as plt
import numpy as np
# Constants
UNITS = {
'EKG': 'uV',
'Pulse oximetry': '%',
'Pressure - artery': 'mmHg',
'Pressure - central_venous': 'mmHg',
}
UNIT_SCALE = 0.001
# Plotting constants
Y_LIM_LOW = {
'EKG': -0.75,
'Pulse oximetry': 0,
'Pressure - artery': 0,
'Pressure - central_venous': 0,
}
Y_LIM_HIGH = {
'EKG': 0.75,
'Pulse oximetry': 150,
'Pressure - artery': 150,
'Pressure - central_venous': 30,
}
def load_data(file_name):
"""
Reads a binary file and returns its content as a list of lines.
"""
with open(file_name, 'rb') as file:
lines = file.readlines()
return lines
def process_data(lines):
"""
Main data processing function. Breaks the input lines into chunks, processes these
chunks, and then processes the binary data. Returns the processed leads and sources.
"""
chunks = chunk_lines(lines)
binary_lines, byte_sections, endian = process_chunks(chunks)
leads, sources = process_binary_data(binary_lines, byte_sections, endian)
return leads, sources
def chunk_lines(lines):
"""
Splits the input lines into chunks using the '--' separator.
"""
chunks = []
current_chunk = []
for line in lines:
if line[0:2] == b'--' and len(current_chunk) > 0:
chunks.append(current_chunk)
current_chunk = []
else:
current_chunk.append(line)
return chunks
def process_chunks(chunks):
"""
Processes each chunk based on its MIME type, identified by the guess_mime_type function.
Returns the processed binary lines, byte sections and endianess.
"""
binary_lines = []
byte_sections = []
endian = None
for chunk in chunks:
guessed_mime_type, joined_chunk = guess_mime_type(chunk)
if guessed_mime_type in ['text/html', 'application/xml']:
endian, byte_sections = process_textual_data(joined_chunk, byte_sections)
elif guessed_mime_type == 'application/octet-stream':
binary_lines = process_binary_data_chunk(chunk, binary_lines)
elif guessed_mime_type == 'text/plain':
continue
else:
raise Exception(f'ERROR: unrecognized MIME type {guessed_mime_type}')
return binary_lines, byte_sections, endian
def guess_mime_type(chunk):
"""
Joins a chunk into a single string and guesses its MIME type using the magic module.
Returns the guessed MIME type and the joined chunk.
"""
joined_chunk = b''.join(chunk)
guessed_mime_type = magic.from_buffer(joined_chunk, mime=True)
return guessed_mime_type, joined_chunk
def process_textual_data(joined_chunk, byte_sections):
"""
Processes textual data by decoding the chunk, identifying endianess, and
extracting data from chunk lines. Returns the endianess and byte sections.
"""
endian = None
current_ip_site = None
current_ip_label = None
decoded_str = decodestring(joined_chunk)
if b'bigEndian' in decoded_str:
endian = '>'
elif b'littleEndian' in decoded_str:
endian = '<'
chunk_lines = decoded_str.split(b'\n')
for chunk_line in chunk_lines:
m = re.search(r'site="([a-zA-Z0-9_]+)"', str(chunk_line))
if m is not None:
current_ip_site = m.group(1)
m = re.search(r'label="([a-zA-Z0-9]+)"', str(chunk_line))
if m is not None:
current_ip_label = m.group(1)
if b'BT=' in chunk_line:
byte_sections = process_byte_data(chunk_line, byte_sections, current_ip_site)
return endian, byte_sections
def process_byte_data(chunk_line, current_ip_site):
"""
Processes a line of chunk data by identifying the binary type and extracting
various data properties. Returns a list containing the unit size, unit number,
format char, name, and lead.
"""
unit_size = None
format_char = None
name = None
lead = None
# Check the binary type
if b'BT="xs:unsignedByte' in chunk_line:
unit_size = 8
format_char = 'c'
elif b'BT="xs:short' in chunk_line:
unit_size = 16
format_char = 'h'
elif b'BT="xs:unsignedShort' in chunk_line:
unit_size = 16
format_char = 'H'
elif b'BT="xs:unsignedInt' in chunk_line:
unit_size = 32
format_char = 'I'
elif b'BT="xb:NTP-32' in chunk_line:
unit_size = 32
format_char = 'L'
elif b'BT="xb:bool-8' in chunk_line:
unit_size = 8
format_char = 'c'
else:
raise Exception('Unrecognized data type')
# Extract array size
m = re.search(r'asizeBT="([0-9]+)"', str(chunk_line))
unit_num = int(m.group(1)) if m is not None else 1
# Extract name
m = re.search(r'<([a-zA-Z]+) ', str(chunk_line))
name = m.group(1) if m is not None else ''
# Extract lead
m = re.search(r'lead="([a-zA-Z0-9]+)"', str(chunk_line))
lead = m.group(1) if m is not None else ''
# Rename if necessary
if name == 'ipWaveform':
name = 'Pressure - ' + current_ip_site
elif name == 'pleth':
name = 'Pulse oximetry'
elif name == 'ecgWaveform':
name = 'Electrocardiogram'
return [unit_size, unit_num, format_char, name, lead]
def process_binary_data(binary_lines, byte_sections, endian):
"""
Processes binary data by parsing the data based on the byte sections.
Populates the leads and sources. Returns the processed leads and sources.
"""
leads = {'I': [], 'II': [], 'III': [], 'AVR': [], 'AVF': [], 'AVL': [], 'V1': []}
sources = {'Electrocardiogram': [], 'Pulse oximetry': [],
'Pressure - artery': [], 'Pressure - central_venous': []}
for binary_data in binary_lines:
pointer = 0
for byte_section in byte_sections:
section_length = byte_section[0] * byte_section[1]
section_format = byte_section[2]
byte_length = int(section_length / 8)
section_data = binary_data[pointer:pointer+byte_length]
section_name = byte_section[3]
section_lead = byte_section[4]
parsed_data = iter_unpack(endian + section_format, section_data)
section_data = []
for item in parsed_data:
section_data.append(item[0])
if section_name in sources.keys():
if len(np.unique(section_data)) > 1:
sources[section_name] = sources[section_name] + list(section_data)
if section_name == 'Electrocardiogram' and section_lead in leads.keys():
if len(np.unique(section_data)) > 1:
leads[section_lead] = leads[section_lead] + list(section_data)
pointer += byte_length
assert(pointer == len(binary_data))
return leads, sources
def process_binary_data_chunk(chunk, binary_lines):
"""
Processes a chunk of binary data by identifying the start of the binary data and
joining all binary lines. Returns the binary lines.
"""
binary_started = False
binary_lines_chunk = []
for chunk_line in chunk:
if chunk_line == b'Content-Transfer-Encoding: binary\r\n':
binary_started = True
continue
if chunk_line == b'\r\n':
continue
if binary_started:
binary_line = chunk_line
binary_lines_chunk.append(binary_line)
binary_line = b''.join(binary_lines_chunk)
# trailing \r
if binary_line[-1] == 10:
binary_line = binary_line[0:-1]
# trailing \n
if binary_line[-1] == 13:
binary_line = binary_line[0:-1]
binary_lines.append(binary_line)
return binary_lines
def plot_data(leads, sources):
"""
Plots the processed data using the plot_series function.
"""
leads_with_data = [x for x in leads.keys() if len(leads[x]) > 0]
sources_with_data = [x for x in sources.keys() if len(sources[x]) > 0]
num_channels = len(leads_with_data) + len(sources_with_data)
fig, axs = plt.subplots(num_channels)
for i, lead in enumerate(leads_with_data):
plot_series(axs[i], leads[lead], 'EKG - ' + lead, UNITS['EKG'],
UNIT_SCALE, Y_LIM_LOW['EKG'], Y_LIM_HIGH['EKG'])
for j, source in enumerate(sources_with_data):
plot_series(axs[i+j+1], sources[source], source, UNITS[source],
UNIT_SCALE, Y_LIM_LOW[source], Y_LIM_HIGH[source])
plt.subplots_adjust(hspace=2.5)
plt.show()
def plot_series(ax, data, title, units, unit_scale, ylim_low=None, ylim_high=None):
"""
Plots a series of data on a given axes.
"""
ax.set_title(title)
ax.tick_params(labelsize=5)
ax.plot(np.asarray(data) * unit_scale)
ax.set_ylabel(units, fontsize=7)
if ylim_low and ylim_high:
ax.set_ylim([ylim_low, ylim_high])
def main():
"""
Loads data, processes it, and then plots the results.
"""
# Load data
lines = load_data('ecg_test2.txt')
# Process data
leads, sources = process_data(lines)
# Plot data
plot_data(leads, sources)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment