Last active
April 25, 2022 11:45
-
-
Save tuxite/428f66017f7123f777cf to your computer and use it in GitHub Desktop.
A very simple ISO 8211 Decoder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""A very simple ISO8211 decoder.""" | |
# Done without the ISO 8211 standard. | |
# Imports | |
import re | |
# Constants | |
DDF_LEADER_SIZE = 24 | |
DDF_FIELD_TERMINATOR = chr(30) | |
DDF_UNIT_TERMINATOR = chr(31) | |
# Methods | |
def bin2int(data, sign=None): | |
"""Retuns a tuple of hexadecimal string and integer.""" | |
try: | |
raw = data.encode('hex') | |
integer = int(raw, 16) | |
except ValueError, error: | |
print "b ERROR", error, raw, sign | |
return False | |
if sign == '1': | |
# Unsigned integer | |
return integer | |
elif sign == '2': | |
# Signed integer | |
if ord(data[0]) > 127: | |
# Negative number | |
result = ~integer +1 | |
else: | |
# Positive number | |
result = integer | |
return result | |
else: | |
print "b ERROR", sign, raw | |
return None | |
# Function for data parsing | |
DATA_TYPES = { | |
"A": str, | |
"I": int, | |
"R": float, | |
"B": str, | |
"b": bin2int | |
} | |
def data_parser(function_type, data, sign=None): | |
if not data.strip(): | |
return None | |
try: | |
if sign: | |
result = DATA_TYPES[function_type](data, sign) | |
else: | |
result = DATA_TYPES[function_type](data) | |
except ValueError: | |
result = False | |
return result | |
#~ def data_parser(function_type, data, sign=None): | |
#~ """Returns the parsed data according to the function type | |
#~ (interger, string, float or binary). | |
#~ """ | |
#~ if not data.strip(): | |
#~ return None | |
#~ | |
#~ if function_type == "A": | |
#~ try: | |
#~ result = str(data) | |
#~ except ValueError, error: | |
#~ print "A ERROR", error, data | |
#~ result = False | |
#~ elif function_type == "I": | |
#~ try: | |
#~ result = int(data) | |
#~ except ValueError, error: | |
#~ print "I ERROR", error, data | |
#~ result = False | |
#~ elif function_type == "R": | |
#~ try: | |
#~ result = float(data) | |
#~ except ValueError, error: | |
#~ print "R ERROR", error, data | |
#~ result = False | |
#~ elif function_type == "B": | |
#~ try: | |
#~ result = str(data) | |
#~ except ValueError, error: | |
#~ print "B ERROR", error, data | |
#~ result = False | |
#~ elif function_type == "b": | |
#~ try: | |
#~ raw = data.encode('hex') | |
#~ integer = int(raw, 16) | |
#~ except ValueError, error: | |
#~ print "b ERROR", error, raw, sign | |
#~ return False | |
#~ | |
#~ if sign == '1': | |
#~ # Unsigned integer | |
#~ result = integer | |
#~ elif sign == '2': | |
#~ # Signed integer | |
#~ if ord(data[0]) > 127: | |
#~ # Negative number | |
#~ result = ~integer +1 | |
#~ else: | |
#~ # Positive number | |
#~ result = integer | |
#~ else: | |
#~ print "b ERROR", sign, raw | |
#~ result = None | |
#~ | |
#~ else: | |
#~ # No matches as per ISO 8211. | |
#~ result = False | |
#~ | |
#~ return result | |
def parse_formats(formats): | |
"""Parses the descriptive string of the formats into a dictionary.""" | |
fmts = [] | |
for item in formats[1:-1].split(","): | |
regex = re.match(r"(?P<multiple>\d+)?(?P<type>[a-zA-Z])(?P<bin>\d+)?(\((?P<length>\d+)\))?", item) | |
fmt_dict = { | |
"type": regex.group("type"), | |
"length": None, | |
"sign": None | |
} | |
if regex.group("length"): | |
fmt_dict["length"] = int(regex.group("length")) | |
if regex.group("bin"): | |
# '1': unsigned / '2': signed integer | |
fmt_dict["sign"] = regex.group("bin")[0] | |
# The second character defines the precision/width | |
fmt_dict["length"] = int(regex.group("bin")[1]) | |
if regex.group("multiple"): | |
for _ in range(int(regex.group("multiple"))): | |
fmts.append(fmt_dict.copy()) ## Needs to copy the dict! | |
else: | |
fmts.append(fmt_dict) | |
return fmts | |
class DR(object): | |
"""An ISO 8211 Directory Record.""" | |
def __init__(self): | |
self.data = None | |
self.entry_map = {} | |
self.leader_map = {} | |
self.fields = [] | |
self.ddf = [] | |
self.index = 0 | |
self.record_length = 0 | |
def leader(self, data): | |
"""For DR Leader""" | |
self.data = data | |
try: | |
self.entry_map = { | |
'field_length_field': int(self.data[20]), | |
'field_position_field': int(self.data[21]), | |
'reserved': int(self.data[22]), | |
'field_tag_field': int(self.data[23]) | |
} | |
self.record_length = int(self.data[0:5]) | |
self.leader_map = { | |
'record_length': self.record_length, | |
#~ 'interchange_level': int(self.data[5]), | |
'leader_identifier': self.data[6], | |
'extension_indicator': self.data[7], | |
#~ 'version_number': int(self.data[8]), | |
'application_indicator': self.data[9], | |
#~ 'field_control_length': int(self.data[10:12]), | |
'base_address': int(self.data[12:17]), | |
'set_indicator': self.data[17:20], | |
'entry_map': self.entry_map | |
} | |
except ValueError, error: | |
print "Leader not correct", error | |
return False | |
# Handling some exceptions | |
try: | |
self.leader_map['interchange_level'] = int(self.data[5]) | |
except ValueError: | |
self.leader_map['interchange_level'] = self.data[5] | |
try: | |
self.leader_map['version_number'] = int(self.data[8]) | |
except ValueError: | |
self.leader_map['version_number'] = self.data[8] | |
try: | |
self.leader_map['field_control_length'] = int(self.data[10:12]) | |
except ValueError: | |
self.leader_map['field_control_length'] = self.data[10:12] | |
self.index += DDF_LEADER_SIZE | |
return self.leader_map | |
def directory(self, data): | |
"""For DR Directory""" | |
self.data = data | |
# Starting index of the directory | |
raw = self.data[self.index:self.index + self.leader_map['record_length']] | |
i = raw.index(DDF_FIELD_TERMINATOR) | |
directory = raw[:i] | |
# Check that the length correspond to the sum of entry_map values | |
length_tag = self.entry_map['field_tag_field'] | |
length_len = self.entry_map['field_length_field'] | |
length_pos = self.entry_map['field_position_field'] | |
directory_field_length = length_tag + length_len + length_pos | |
if (i % directory_field_length) != 0: | |
print "Directory not correct: length is not a multiple of the entry_map sum.", self.index, i, directory_field_length | |
return False | |
# Loop the values | |
index = 0 | |
base = self.leader_map['base_address'] - DDF_LEADER_SIZE | |
data = raw[base:] | |
for _ in range(i/directory_field_length): | |
field = {} | |
field['tag'] = directory[index:index + length_tag] | |
index += length_tag | |
try: | |
field['length'] = int(directory[index:index + length_len]) | |
index += length_len | |
field['position'] = int(directory[index:index + length_pos]) | |
except ValueError, error: | |
print "Directory not correct", error | |
return False | |
index += length_pos | |
field['data'] = data[field['position']:field['position'] + field['length']].strip(DDF_FIELD_TERMINATOR) | |
self.fields.append(field) | |
self.index = self.leader_map['record_length'] | |
return True | |
def ddr_field(self): | |
"""Reads DDR Field Area Fields (aka Field Control Field and DDF)""" | |
# Assuming that we are in the same instance. | |
field = self.fields[0]['data'] | |
fcl = self.leader_map['field_control_length'] | |
# Parsing the Field Control Field | |
fcf = {} | |
fcf['controls'] = field[0][:fcl] | |
fcf['external_file'] = field[0][fcl:] | |
fcf['pairs'] = field[1] | |
# Parsing the Data Descriptive Fields (DDF) | |
for field in self.fields[1:]: | |
ddf = {} | |
data = field['data'].split(DDF_UNIT_TERMINATOR) | |
ddf['controls'] = data[0][:fcl] | |
ddf['name'] = data[0][fcl:] | |
ddf['tag'] = field['tag'] | |
ddf['array_descriptor'] = data[1] | |
ddf['format_controls'] = parse_formats(data[2]) | |
self.ddf.append(ddf) | |
return {'field_control_field': fcl, 'data_descriptive_fields': self.ddf} | |
class Decoder(object): | |
"""Decodes an ISO/IEC 8211 encoded file. | |
Does not interpret the values (as per S-57 for instance).""" | |
def __init__(self, data): | |
self.data = data | |
self.index = 0 | |
self.ddr = {} | |
self.records = [] | |
self.ddf = None | |
def parse_ddr(self): | |
"""Extracts and parses the DDR.""" | |
ddr = DR() | |
self.ddr['leader'] = ddr.leader(self.data[:DDF_LEADER_SIZE]) | |
ddr.directory(self.data[:ddr.record_length]) | |
self.ddr['directory'] = ddr.fields | |
self.ddr['fields'] = ddr.ddr_field() | |
self.index += ddr.index | |
self.ddf = self.parse_ddf() | |
if not self.ddf: | |
return False | |
return True | |
def parse_ddf(self): | |
"""Parses the format of the DDF and returns a dictionaries with | |
all tags and their subkeys.""" | |
result = {} | |
if (len(self.ddr['directory']) - 1) != len(self.ddr['fields']['data_descriptive_fields']): | |
print "The DDR Directory and Fields lengths do not match." | |
return False | |
for item in self.ddr['fields']['data_descriptive_fields']: | |
field = [] | |
tag = item['tag'] | |
if len(item['array_descriptor']) != 0: | |
item['array_descriptor'] = item['array_descriptor'].split('!') | |
else: | |
item['array_descriptor'] = None | |
if not item['array_descriptor']: | |
result[tag] = item['format_controls'][0] | |
continue | |
if len(item['format_controls']) != len(item['array_descriptor']): | |
print "Length of keys is not the same as the formats." | |
return False | |
for i in range(len(item['format_controls'])): | |
item['format_controls'][i]['name'] = item['array_descriptor'][i] | |
field.append(item['format_controls'][i]) | |
result[tag] = field | |
return result | |
def parse_dr(self): | |
"""Extracts and parses all the DR.""" | |
append = self.records.append | |
while self.index < len(self.data): | |
_dr = DR() | |
# Load first the leader byte string (ie 24 bytes) | |
_dr.leader(self.data[self.index:self.index + DDF_LEADER_SIZE]) | |
# Then, load the necessary length | |
# TODO: can strip the leader? | |
if not _dr.directory(self.data[self.index:self.index + _dr.record_length]): | |
break | |
self.parse_field(_dr.fields) | |
append(_dr.fields) | |
self.index += _dr.index | |
return True | |
def parse_field(self, fields): | |
"""Parses a field area from its description (array_descriptor / format_controls).""" | |
# Selecting the good tags in the DDF | |
for field in fields: | |
# Get the good sub-DDF | |
if not field['tag'] in self.ddf: | |
print "This tag is not in the DDF.", field['tag'] | |
return False | |
ddf = self.ddf[field['tag']] | |
data = field['data'] | |
# Only one kind of data | |
if type(ddf) == dict: | |
if ddf['length']: | |
data = data[:ddf['length']] | |
field['values'] = data_parser(ddf['type'], data, ddf['sign']) | |
# Multiple subkeys | |
else: | |
subfield = {} | |
index = 0 # For cutting the data | |
for item in ddf: | |
if item['length']: | |
end = index + item['length'] | |
subfield[item['name']] = data_parser(item['type'], data[index:end], item['sign']) | |
index = end | |
else: | |
end = data[index:].index(DDF_UNIT_TERMINATOR) | |
if end == 0: | |
subfield[item['name']] = None | |
else: | |
subfield[item['name']] = data_parser(item['type'], data[index:index + end], item['sign']) | |
index += end + 1 # +1 for the terminator | |
field['values'] = subfield | |
return True | |
def export(self): | |
"""Parses and exports the data into a list of dictionnary with | |
only essential information. | |
""" | |
if not (self.parse_ddr() and self.parse_dr()): | |
print "File not ISO8211 readable." | |
return None | |
result = {} | |
result['description'] = [] | |
result['data'] = [] | |
for i in range(len(self.ddr['directory']) - 1): | |
desc_dict = {} | |
desc_dict['tag'] = self.ddr['directory'][i + 1]['tag'] | |
desc_dict['description'] = self.ddr['fields']['data_descriptive_fields'][i]['name'] | |
result['description'].append(desc_dict) | |
result['data'] = self.records | |
return result | |
if __name__ == "__main__": | |
try: | |
with open("CATALOG.031", 'r') as f: | |
C = Decoder(f.read()) | |
f.close() | |
except IOError: | |
print "File not found." | |
exit() | |
test = C.export() | |
print "Number of records:", len(test['data']) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment