Skip to content

Instantly share code, notes, and snippets.

@tuxite
Last active April 25, 2022 11:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tuxite/428f66017f7123f777cf to your computer and use it in GitHub Desktop.
Save tuxite/428f66017f7123f777cf to your computer and use it in GitHub Desktop.
A very simple ISO 8211 Decoder
# -*- coding: utf-8 -*-
"""A very simple ISO8211 decoder."""
# Done without the ISO 8211 standard.
# Imports
import re
# Constants
DDF_LEADER_SIZE = 24
DDF_FIELD_TERMINATOR = chr(30)
DDF_UNIT_TERMINATOR = chr(31)
# Methods
def bin2int(data, sign=None):
"""Retuns a tuple of hexadecimal string and integer."""
try:
raw = data.encode('hex')
integer = int(raw, 16)
except ValueError, error:
print "b ERROR", error, raw, sign
return False
if sign == '1':
# Unsigned integer
return integer
elif sign == '2':
# Signed integer
if ord(data[0]) > 127:
# Negative number
result = ~integer +1
else:
# Positive number
result = integer
return result
else:
print "b ERROR", sign, raw
return None
# Function for data parsing
DATA_TYPES = {
"A": str,
"I": int,
"R": float,
"B": str,
"b": bin2int
}
def data_parser(function_type, data, sign=None):
if not data.strip():
return None
try:
if sign:
result = DATA_TYPES[function_type](data, sign)
else:
result = DATA_TYPES[function_type](data)
except ValueError:
result = False
return result
#~ def data_parser(function_type, data, sign=None):
#~ """Returns the parsed data according to the function type
#~ (interger, string, float or binary).
#~ """
#~ if not data.strip():
#~ return None
#~
#~ if function_type == "A":
#~ try:
#~ result = str(data)
#~ except ValueError, error:
#~ print "A ERROR", error, data
#~ result = False
#~ elif function_type == "I":
#~ try:
#~ result = int(data)
#~ except ValueError, error:
#~ print "I ERROR", error, data
#~ result = False
#~ elif function_type == "R":
#~ try:
#~ result = float(data)
#~ except ValueError, error:
#~ print "R ERROR", error, data
#~ result = False
#~ elif function_type == "B":
#~ try:
#~ result = str(data)
#~ except ValueError, error:
#~ print "B ERROR", error, data
#~ result = False
#~ elif function_type == "b":
#~ try:
#~ raw = data.encode('hex')
#~ integer = int(raw, 16)
#~ except ValueError, error:
#~ print "b ERROR", error, raw, sign
#~ return False
#~
#~ if sign == '1':
#~ # Unsigned integer
#~ result = integer
#~ elif sign == '2':
#~ # Signed integer
#~ if ord(data[0]) > 127:
#~ # Negative number
#~ result = ~integer +1
#~ else:
#~ # Positive number
#~ result = integer
#~ else:
#~ print "b ERROR", sign, raw
#~ result = None
#~
#~ else:
#~ # No matches as per ISO 8211.
#~ result = False
#~
#~ return result
def parse_formats(formats):
"""Parses the descriptive string of the formats into a dictionary."""
fmts = []
for item in formats[1:-1].split(","):
regex = re.match(r"(?P<multiple>\d+)?(?P<type>[a-zA-Z])(?P<bin>\d+)?(\((?P<length>\d+)\))?", item)
fmt_dict = {
"type": regex.group("type"),
"length": None,
"sign": None
}
if regex.group("length"):
fmt_dict["length"] = int(regex.group("length"))
if regex.group("bin"):
# '1': unsigned / '2': signed integer
fmt_dict["sign"] = regex.group("bin")[0]
# The second character defines the precision/width
fmt_dict["length"] = int(regex.group("bin")[1])
if regex.group("multiple"):
for _ in range(int(regex.group("multiple"))):
fmts.append(fmt_dict.copy()) ## Needs to copy the dict!
else:
fmts.append(fmt_dict)
return fmts
class DR(object):
"""An ISO 8211 Directory Record."""
def __init__(self):
self.data = None
self.entry_map = {}
self.leader_map = {}
self.fields = []
self.ddf = []
self.index = 0
self.record_length = 0
def leader(self, data):
"""For DR Leader"""
self.data = data
try:
self.entry_map = {
'field_length_field': int(self.data[20]),
'field_position_field': int(self.data[21]),
'reserved': int(self.data[22]),
'field_tag_field': int(self.data[23])
}
self.record_length = int(self.data[0:5])
self.leader_map = {
'record_length': self.record_length,
#~ 'interchange_level': int(self.data[5]),
'leader_identifier': self.data[6],
'extension_indicator': self.data[7],
#~ 'version_number': int(self.data[8]),
'application_indicator': self.data[9],
#~ 'field_control_length': int(self.data[10:12]),
'base_address': int(self.data[12:17]),
'set_indicator': self.data[17:20],
'entry_map': self.entry_map
}
except ValueError, error:
print "Leader not correct", error
return False
# Handling some exceptions
try:
self.leader_map['interchange_level'] = int(self.data[5])
except ValueError:
self.leader_map['interchange_level'] = self.data[5]
try:
self.leader_map['version_number'] = int(self.data[8])
except ValueError:
self.leader_map['version_number'] = self.data[8]
try:
self.leader_map['field_control_length'] = int(self.data[10:12])
except ValueError:
self.leader_map['field_control_length'] = self.data[10:12]
self.index += DDF_LEADER_SIZE
return self.leader_map
def directory(self, data):
"""For DR Directory"""
self.data = data
# Starting index of the directory
raw = self.data[self.index:self.index + self.leader_map['record_length']]
i = raw.index(DDF_FIELD_TERMINATOR)
directory = raw[:i]
# Check that the length correspond to the sum of entry_map values
length_tag = self.entry_map['field_tag_field']
length_len = self.entry_map['field_length_field']
length_pos = self.entry_map['field_position_field']
directory_field_length = length_tag + length_len + length_pos
if (i % directory_field_length) != 0:
print "Directory not correct: length is not a multiple of the entry_map sum.", self.index, i, directory_field_length
return False
# Loop the values
index = 0
base = self.leader_map['base_address'] - DDF_LEADER_SIZE
data = raw[base:]
for _ in range(i/directory_field_length):
field = {}
field['tag'] = directory[index:index + length_tag]
index += length_tag
try:
field['length'] = int(directory[index:index + length_len])
index += length_len
field['position'] = int(directory[index:index + length_pos])
except ValueError, error:
print "Directory not correct", error
return False
index += length_pos
field['data'] = data[field['position']:field['position'] + field['length']].strip(DDF_FIELD_TERMINATOR)
self.fields.append(field)
self.index = self.leader_map['record_length']
return True
def ddr_field(self):
"""Reads DDR Field Area Fields (aka Field Control Field and DDF)"""
# Assuming that we are in the same instance.
field = self.fields[0]['data']
fcl = self.leader_map['field_control_length']
# Parsing the Field Control Field
fcf = {}
fcf['controls'] = field[0][:fcl]
fcf['external_file'] = field[0][fcl:]
fcf['pairs'] = field[1]
# Parsing the Data Descriptive Fields (DDF)
for field in self.fields[1:]:
ddf = {}
data = field['data'].split(DDF_UNIT_TERMINATOR)
ddf['controls'] = data[0][:fcl]
ddf['name'] = data[0][fcl:]
ddf['tag'] = field['tag']
ddf['array_descriptor'] = data[1]
ddf['format_controls'] = parse_formats(data[2])
self.ddf.append(ddf)
return {'field_control_field': fcl, 'data_descriptive_fields': self.ddf}
class Decoder(object):
"""Decodes an ISO/IEC 8211 encoded file.
Does not interpret the values (as per S-57 for instance)."""
def __init__(self, data):
self.data = data
self.index = 0
self.ddr = {}
self.records = []
self.ddf = None
def parse_ddr(self):
"""Extracts and parses the DDR."""
ddr = DR()
self.ddr['leader'] = ddr.leader(self.data[:DDF_LEADER_SIZE])
ddr.directory(self.data[:ddr.record_length])
self.ddr['directory'] = ddr.fields
self.ddr['fields'] = ddr.ddr_field()
self.index += ddr.index
self.ddf = self.parse_ddf()
if not self.ddf:
return False
return True
def parse_ddf(self):
"""Parses the format of the DDF and returns a dictionaries with
all tags and their subkeys."""
result = {}
if (len(self.ddr['directory']) - 1) != len(self.ddr['fields']['data_descriptive_fields']):
print "The DDR Directory and Fields lengths do not match."
return False
for item in self.ddr['fields']['data_descriptive_fields']:
field = []
tag = item['tag']
if len(item['array_descriptor']) != 0:
item['array_descriptor'] = item['array_descriptor'].split('!')
else:
item['array_descriptor'] = None
if not item['array_descriptor']:
result[tag] = item['format_controls'][0]
continue
if len(item['format_controls']) != len(item['array_descriptor']):
print "Length of keys is not the same as the formats."
return False
for i in range(len(item['format_controls'])):
item['format_controls'][i]['name'] = item['array_descriptor'][i]
field.append(item['format_controls'][i])
result[tag] = field
return result
def parse_dr(self):
"""Extracts and parses all the DR."""
append = self.records.append
while self.index < len(self.data):
_dr = DR()
# Load first the leader byte string (ie 24 bytes)
_dr.leader(self.data[self.index:self.index + DDF_LEADER_SIZE])
# Then, load the necessary length
# TODO: can strip the leader?
if not _dr.directory(self.data[self.index:self.index + _dr.record_length]):
break
self.parse_field(_dr.fields)
append(_dr.fields)
self.index += _dr.index
return True
def parse_field(self, fields):
"""Parses a field area from its description (array_descriptor / format_controls)."""
# Selecting the good tags in the DDF
for field in fields:
# Get the good sub-DDF
if not field['tag'] in self.ddf:
print "This tag is not in the DDF.", field['tag']
return False
ddf = self.ddf[field['tag']]
data = field['data']
# Only one kind of data
if type(ddf) == dict:
if ddf['length']:
data = data[:ddf['length']]
field['values'] = data_parser(ddf['type'], data, ddf['sign'])
# Multiple subkeys
else:
subfield = {}
index = 0 # For cutting the data
for item in ddf:
if item['length']:
end = index + item['length']
subfield[item['name']] = data_parser(item['type'], data[index:end], item['sign'])
index = end
else:
end = data[index:].index(DDF_UNIT_TERMINATOR)
if end == 0:
subfield[item['name']] = None
else:
subfield[item['name']] = data_parser(item['type'], data[index:index + end], item['sign'])
index += end + 1 # +1 for the terminator
field['values'] = subfield
return True
def export(self):
"""Parses and exports the data into a list of dictionnary with
only essential information.
"""
if not (self.parse_ddr() and self.parse_dr()):
print "File not ISO8211 readable."
return None
result = {}
result['description'] = []
result['data'] = []
for i in range(len(self.ddr['directory']) - 1):
desc_dict = {}
desc_dict['tag'] = self.ddr['directory'][i + 1]['tag']
desc_dict['description'] = self.ddr['fields']['data_descriptive_fields'][i]['name']
result['description'].append(desc_dict)
result['data'] = self.records
return result
if __name__ == "__main__":
try:
with open("CATALOG.031", 'r') as f:
C = Decoder(f.read())
f.close()
except IOError:
print "File not found."
exit()
test = C.export()
print "Number of records:", len(test['data'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment