Skip to content

Instantly share code, notes, and snippets.

@niespodd
Created July 3, 2024 23:13
Show Gist options
  • Save niespodd/ca788b0225323016a1ae603f45d6b267 to your computer and use it in GitHub Desktop.
Save niespodd/ca788b0225323016a1ae603f45d6b267 to your computer and use it in GitHub Desktop.
import re
import zlib
from datetime import datetime
from decimal import Decimal
def get_value_from_line(fields, line_no):
for field in fields:
if field['field_line'] == line_no:
return field['field_text']
return None
def decode_bytes(raw_data: bytes):
decoded_len = int(len(raw_data) / 8)
bytes_array = []
for pos in range(decoded_len):
bytes_array.append(int(raw_data[pos * 8:(pos + 1) * 8], 2))
decoded = zlib.decompress(bytes(bytes_array)[70:], -15) # .decode('utf-8')
data = {}
offset = decoded.index(b'U_TLAY')
offset += 6 # U_TLAY (6A)
data['record_version'] = decoded[offset:offset + 2]
offset += 2 # 2N
data['record_length'] = decoded[offset:offset + 4]
offset += 4 # 4N
data['layout_standard'] = decoded[offset:offset + 4]
offset += 4 # 4A
data['number_of_fields'] = int(decoded[offset:offset + 4])
offset += 4 # 4N
data['fields'] = []
for n in range(data['number_of_fields']):
field = {}
field['field_line'] = int(decoded[offset:offset + 2])
offset += 2
field['field_column'] = int(decoded[offset:offset + 2])
offset += 2
field['field_height'] = int(decoded[offset:offset + 2])
offset += 2
field['field_width'] = int(decoded[offset:offset + 2])
offset += 2
field['field_formatting'] = int(decoded[offset:offset + 1])
offset += 1
field['field_text_len'] = int(decoded[offset:offset + 4])
offset += 4
field['field_text'] = decoded[offset:offset + field['field_text_len']]
offset += field['field_text_len']
data['fields'].append(field)
offset = decoded.index(b'U_IC')
offset += 4
decoded_uic = decoded[offset:].decode('utf-8')
uic_dates = re.findall(r'10([0-9]{2})\.([0-9]{2})\.([0-9]{4})', decoded_uic)
offset = decoded_uic.index('4330100') + len('4330100') # likely field header
uic_passenger_len = int(decoded_uic[offset:offset+2])
uic_passenger = decoded_uic[offset+2:offset+2+uic_passenger_len]
# polskie znaki diakrytyczne
uic_passenger = uic_passenger.replace("<5>", "ń")
uic_travel_date = ".".join(uic_dates[0])
uic_year = int(uic_dates[0][-1])
# U_IC 010134 0007000434010001-00043701000100004380100083572541100043901001025.05.202200044001001025.05.2022000433010013Anna Budzy<5>000443010000
train_no = get_value_from_line(data['fields'], line_no=18)
train_from = get_value_from_line(data['fields'], line_no=41).decode('ascii')
train_through = None
train_to = get_value_from_line(data['fields'], line_no=42).decode('ascii')
time_depart = get_value_from_line(data['fields'], line_no=10).decode('ascii').split(":")
date_depart = get_value_from_line(data['fields'], line_no=11).decode('ascii').split(".")
time_arrive = get_value_from_line(data['fields'], line_no=14).decode('ascii').split(":")
date_arrive = get_value_from_line(data['fields'], line_no=15).decode('ascii').split(".")
ticket_no = get_value_from_line(data['fields'], line_no=35).decode('ascii')
distance = get_value_from_line(data['fields'], line_no=24)
price = Decimal(get_value_from_line(data['fields'], line_no=28).decode('ascii')) / 100
return dict(
source='pkp',
train_no=train_no,
passenger_name=uic_passenger,
stations=[dict(
name=train_from,
time=datetime(year=uic_year, month=int(date_depart[1]), day=int(date_depart[0]), hour=int(time_depart[0]),
minute=int(time_depart[1])),
), dict(
name=train_to,
time=datetime(year=uic_year, month=int(date_arrive[1]), day=int(date_arrive[0]), hour=int(time_arrive[0]),
minute=int(time_arrive[1])),
)],
distance=int(distance),
price=price,
ticket_no=ticket_no,
meta=dict(
layout=data['layout_standard'],
version=data['record_version'],
),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment