Skip to content

Instantly share code, notes, and snippets.

@ozomer
Created March 10, 2022 20:55
Show Gist options
  • Save ozomer/7d215e44036eb13f8027043c130be007 to your computer and use it in GitHub Desktop.
Save ozomer/7d215e44036eb13f8027043c130be007 to your computer and use it in GitHub Desktop.
Embla Time Fixer
#!/usr/bin/env python3
# Inspired from https://github.com/bwrc/embla-r
from datetime import datetime
from glob import glob
import os.path
import re
import struct
try:
from gooey import Gooey as main_wrapper, GooeyParser as ArgumentParser
gooey_installed = True
except ImportError:
from argparse import ArgumentParser
main_wrapper = lambda f: f
gooey_installed = False
SUFFIXES = [".ebm", ".esrc"]
EBM_END_OF_SIGNATURE = b"\x1a"
EBM_R_TIME = 0x84
DATE_STRUCT_FORMAT = "HBBBBBB"
ESRC_DATETIME_PREFIX = b"d\x00a\x00t\x00e\x00t\x00i\x00m\x00e\x00\"\x00>"
ESRC_DATETIME_PATTERN = re.compile(
"^" +
"\x00".join([*"datetime\">"] +
[r"\d"] * 4 + [r"-"] + [r"\d"] * 2 + [r"-"] + [r"\d"] * 2 + [r"T"] +
[r"\d"] * 2 + [r":"] + [r"\d"] * 2 + [r":"] + [r"\d"] * 2 + [r"."] +
[r"\d"] * 6 + [r"<"]) + "$"
)
class ParsedEsrcFile:
def __init__(self, filename):
self.filename = filename
with open(filename, "rb") as stream:
self.buffer = stream.read()
@staticmethod
def parse_time(record_payload):
digits = re.findall(r"\d", record_payload)
return datetime(*(
int("".join(sub_digits)) for sub_digits in [
digits[0:4], digits[4:6], digits[6:8],
digits[8:10], digits[10:12], digits[12:14], digits[14:20]
]
))
def get_time_records(self):
records = []
offset = -1
while True:
offset = self.buffer.find(ESRC_DATETIME_PREFIX, offset + 1)
if offset < 0:
break
record_bytes = self.buffer[offset:][:len("datetime\">yyyy-mm-ddTHH:MM:SS.ZZZZZZ<") * 2 - 1]
record_payload = "".join(chr(c) for c in record_bytes)
if not ESRC_DATETIME_PATTERN.match(record_payload):
continue
records.append(self.parse_time(record_payload))
return records
def add_time(self, time_delta):
offset = -1
while True:
offset = self.buffer.find(ESRC_DATETIME_PREFIX, offset + 1)
if offset < 0:
break
record_bytes = self.buffer[offset:][:len("datetime\">yyyy-mm-ddTHH:MM:SS.ZZZZZZ<") * 2 - 1]
record_payload = "".join(chr(c) for c in record_bytes)
if not ESRC_DATETIME_PATTERN.match(record_payload):
continue
new_time = self.parse_time(record_payload) + time_delta
digits = [ord(digit) for digit in new_time.strftime("%Y%m%d%H%M%S%f")]
new_record_bytes = bytes(
digits.pop(0) if ord("0") <= c <= ord("9") else c
for c in record_bytes
)
self.buffer = self.buffer[:offset] + new_record_bytes + self.buffer[offset + len(new_record_bytes):]
def write(self):
with open(self.filename, "wb") as stream:
stream.write(self.buffer)
class ParsedEbmFile:
def __init__(self, filename):
self.filename = filename
with open(filename, "rb") as stream:
file_buffer = stream.read()
signature_end_offset = file_buffer.find(EBM_END_OF_SIGNATURE)
if signature_end_offset < 0:
raise Exception("Could not find end of signature")
self.signature = file_buffer[0:signature_end_offset]
self.endian_buffer = file_buffer[signature_end_offset + 1:][:32]
if len(self.endian_buffer) < 32:
raise Exception("Endian buffer is too short")
if self.endian_buffer[0] == 0xff:
# Big Endian
self.endian_format = ">"
elif self.endian_buffer[0] == 0x00:
# Little Endian
self.endian_format = "<"
else:
raise Exception(f"Could not parse endian {self.endian_buffer[0:1]}")
self.ebmvertmp = self.endian_buffer[1:][:5]
self.id_record_format = "L" if self.ebmvertmp == b"\xff" * 5 else "B"
record_header_struct = struct.Struct(f"{self.endian_format}{self.id_record_format}L")
self.records = []
offset = signature_end_offset + 1 + 32
while offset < len(file_buffer):
record_header_buffer = file_buffer[offset:][:record_header_struct.size]
if len(record_header_buffer) < record_header_struct.size:
raise Exception("Not enough bytes for record header")
(record_id, record_size) = record_header_struct.unpack(record_header_buffer)
record_payload = file_buffer[offset + record_header_struct.size:][:record_size]
if len(record_payload) < record_size:
raise Exception("Not enough bytes for record payload")
self.records.append((record_id, record_payload))
offset += record_header_struct.size + record_size
def write(self):
with open(self.filename, "wb") as stream:
stream.write(self.signature)
stream.write(EBM_END_OF_SIGNATURE)
stream.write(self.endian_buffer)
record_header_struct = struct.Struct(f"{self.endian_format}{self.id_record_format}L")
for (record_id, record_payload) in self.records:
stream.write(record_header_struct.pack(record_id, len(record_payload)))
stream.write(record_payload)
def parse_time(self, record_payload):
(year, month, day, hour, minute, second, sec100) = struct.unpack(f"{self.endian_format}{DATE_STRUCT_FORMAT}", record_payload)
return datetime(year, month, day, hour, minute, second, sec100 * 10000)
def add_time_to_record_payload(self, record_payload, delta):
new_time = self.parse_time(record_payload) + delta
return struct.pack(f"{self.endian_format}{DATE_STRUCT_FORMAT}", new_time.year, new_time.month, new_time.day, new_time.hour, new_time.minute, new_time.second, new_time.microsecond // 10000)
def get_time_records(self):
return [self.parse_time(record_payload) for (record_id, record_payload) in self.records if record_id == EBM_R_TIME]
def add_time(self, delta):
self.records = [
(record_id, self.add_time_to_record_payload(record_payload, delta) if record_id == EBM_R_TIME else record_payload)
for (record_id, record_payload) in self.records
]
@main_wrapper
def main():
parser = ArgumentParser(description="Embla Time Fixer")
parser.add_argument_group("Input").add_argument(
"-i", "--input", help="path of filename or dirname with embla files", dest="input", required=True,
**{"widget": "DirChooser"} if gooey_installed else {}
)
target_datetime_group = parser.add_argument_group("Change Date and Time (optional)")
target_datetime_group.add_argument(
"-d", "--date", help="target start date", dest="date",
**{"widget": "DateChooser"} if gooey_installed else {}
)
target_datetime_group.add_argument(
"-t", "--time", help="target start time", dest="time",
**{"widget": "TimeChooser"} if gooey_installed else {}
)
args = parser.parse_args()
if any(args.input.lower().endswith(suffix) for suffix in SUFFIXES):
filenames = [args.input]
else:
filenames = sum([glob(os.path.join(args.input, f"*{suffix}")) for suffix in SUFFIXES], [])
target_date = None
if args.time and not args.date:
raise Exception("Time can be defined only when date is defined")
if args.date:
print(f"Parsing target date: {args.date}")
date_parts = [int(part) for part in re.split(r"\D", args.date) if part != ""]
if len(date_parts) != 3:
raise Exception("Date must specify year, month, day")
if args.time:
print(f"Parsing target time: {args.time}")
time_parts = [int(part) for part in re.split(r"\D", args.time) if part != ""]
if len(time_parts) > 3:
raise Exception("Time must must have up to 3 numbers: hour, minute, second")
else:
time_parts = []
target_date = datetime(*date_parts, *time_parts)
print(f"Parsed date: {target_date}")
if len(filenames) == 0:
raise Exception("Could not find any .ebm or .esrc files")
parsed_files = []
min_time_total = None
for filename in filenames:
print(f"Parsing {filename}")
if filename.lower().endswith(".ebm"):
parsed_file = ParsedEbmFile(filename)
elif filename.lower().endswith(".esrc"):
parsed_file = ParsedEsrcFile(filename)
else:
print(" Skipping file, unknown filename extension")
time_records = parsed_file.get_time_records()
min_time_record = min(time_records) if len(time_records) > 0 else None
print(f" {len(time_records)} time-records, min: {min_time_record} ")
if min_time_record is not None:
if min_time_total is None:
min_time_total = min_time_record
min_time_total = min([min_time_total, min_time_record])
parsed_files.append(parsed_file)
print(f"min_time_total: {min_time_total}")
if target_date:
time_delta = target_date - min_time_total
print(f"time delta: {time_delta}")
for parsed_file in parsed_files:
print(f"adding time to {parsed_file.filename}")
parsed_file.add_time(time_delta)
for parsed_file in parsed_files:
print(f"writing {parsed_file.filename}")
parsed_file.write()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment