Skip to content

Instantly share code, notes, and snippets.

@niwinz
Created November 18, 2012 16:06
Show Gist options
  • Save niwinz/4106007 to your computer and use it in GitHub Desktop.
Save niwinz/4106007 to your computer and use it in GitHub Desktop.
HL7 Prototype
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function
import io
SEGMENT_SEPARATOR = b'\r'
SECTION_SEPARATOR = b'^'
FIELD_SEPARATOR = b'|'
class Field(object):
def __init__(self, value, description=None, index=None):
self._raw_value = value
self._value = self.parse(value)
self._description = description
self._index = index
def parse(self, value):
return value
def __len__(self):
return len(self._raw_value)
def __bytes__(self):
return self.value
def __str__(self):
return self.value.decode("utf-8")
@property
def value(self):
return self._value
@property
def description(self):
return self._description
@property
def index(self):
return self._index
class STField(Field):
"""
String representation on hl7 protocol.
"""
def __bytes__(self):
return self.value
def __str__(self):
return self.value.decode('utf-8')
class IDField(Field):
pass
class ISField(Field):
pass
class CEField(Field):
pass
class SIField(Field):
pass
class CXField(Field):
pass
class XPNField(Field):
pass
class TSField(Field):
pass
class XADField(Field):
_definition = [
(0, STField, "Street address"),
(1, STField, "Other destination"),
(2, STField, "City"),
(3, STField, "State or Province"),
(4, STField, "Zip or postal code"),
(5, IDField, "Country"),
(6, IDField, "Address type"),
(7, STField, "Other geographical designation"),
(8, ISField, "country/parish code"),
(9, ISField, "census tract"),
(10, IDField, "address representation code"),
]
def parse(self, value):
parsed_value = []
for _def, _data in zip(self._definition, value.split(SECTION_SEPARATOR)):
cls_index, cls, cls_desc = _def
parsed_value.append(cls(_data, cls_desc, cls_index))
return parsed_value
@property
def value(self):
return b" ".join(bytes(x) for x in self._value)
class Segment(object):
_fields = None
_identifier = None
_definition = None
def __init__(self, data, message):
self._message = message
self._data = data.split(FIELD_SEPARATOR)
self._fields = self.parse(self._data)
def parse(self, data):
raise NotImplementedError
@property
def fields(self):
return self._fields
class PIDSegment(Segment):
_identifier = "PID"
_definition = [
# (seq, len, field, opt, desc)
(0, 4, SIField, "O", "setid - pid"),
(1, 20, CXField, "B", "patient id"),
(2, 20, CXField, "R", "patient identifier list"),
(3, 20, CXField, "B", "alternate patient id"),
(4, 48, XPNField, "R", "patient name"),
(5, 48, XPNField, "O", "mothers maiden name"),
(6, 26, TSField, "O", "date/time of birth"),
(7, 1, ISField, "O", "sex"),
(8, 48, XPNField, "O", "patient alias"),
(9, 80, CEField, "O", "race"),
(10, 106, XADField, "O", "patient address"),
# TODO: incomplete
]
def parse(self, data):
parsed_data = []
for _def, _data in zip(self._definition, self._data[1:]):
i, size, cls, opt, desc = _def
#if len(_data) > size:
# import pdb; pdb.set_trace()
# raise RuntimeError("Field '{0}' with incorect size.".format(i))
parsed_data.append((i, cls(_data, desc, i)))
return parsed_data
class Header(object):
def __init__(self, data, message):
self._message = message
self._data = data.split(FIELD_SEPARATOR)
@property
def sending_application(self):
return self._data[2]
@property
def sending_facility(self):
return self._data[3]
@property
def receiving_application(self):
return self._data[4]
@property
def receiving_facility(self):
return self._data[5]
@property
def timestamp(self):
return self._data[6]
@property
def message_type(self):
return self._data[8]
@property
def control_id(self):
return self._data[9]
@property
def processing_id(self):
return self._data[10]
@property
def version(self):
return self._data[11]
class Message(object):
header_cls = Header
_header = None
_segments = None
def __init__(self, message):
if not isinstance(message, bytes):
message = message.encode('utf-8')
self._message = message.split(SEGMENT_SEPARATOR)
self._parse_header()
def _parse_header(self):
return self.header_cls(self._message[0], self)
@property
def header(self):
if self._header is None:
self._header = self._parse_header()
return self._header
def parse(data=None, filename=None, cls=Message):
if filename is not None:
with io.open(filename, "rb") as f:
return cls(f.read())
# -*- coding: utf-8 -*-
import unittest
import pdb
import hl7
class TestPIDSegment(unittest.TestCase):
def setUp(self):
self.data = ("PID|||56782445^^^UAReg^PI||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ"
"|260 GOODWIN CREST DRIVE^^BIRMINGHAM^AL^35 209^^M~NICKELL’S PICKLES^10000 W 100TH AVE^BIRMIN"
"GHAM^AL^35200^^O |||||||0105I30001^^^99DEF^AN")
def test_parse_01(self):
segment = hl7.PIDSegment(self.data.encode('utf-8'), None)
# Sex field
index, field = segment.fields[7]
self.assertEqual(field.description, "sex")
self.assertEqual(field.value, b"M")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment