Skip to content

Instantly share code, notes, and snippets.

@UserUnknownFactor
Created August 4, 2020 04:47
Show Gist options
  • Save UserUnknownFactor/eb639936d9496fc4d6e70ed5f9a99180 to your computer and use it in GitHub Desktop.
Save UserUnknownFactor/eb639936d9496fc4d6e70ed5f9a99180 to your computer and use it in GitHub Desktop.
import itertools
import sys
import struct
import zlib
from io import open, BytesIO, SEEK_CUR, SEEK_END # noqa
PY2 = sys.version_info[0] == 2
# Kaitai Struct runtime streaming API version, defined as per PEP-0396
# standard. Used for two purposes:
#
# * .py files generated by ksc from .ksy check that they import proper
# KS runtime library by this version number;
# * distribution utils (setup.py) use this when packaging for PyPI
#
__version__ = '0.9'
class KaitaiStruct(object):
def __init__(self, stream):
self._io = stream
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
def close(self):
self._io.close()
@classmethod
def from_file(cls, filename):
f = open(filename, 'rb')
try:
return cls(KaitaiStream(f))
except Exception:
# close file descriptor, then reraise the exception
f.close()
raise
@classmethod
def to_file(cls, filename, data=None):
f = open(filename, 'rb')
try:
return cls(KaitaiStream(f), _mode='w', _data=data)
except Exception:
# close file descriptor, then reraise the exception
f.close()
raise
@classmethod
def from_bytes(cls, buf):
return cls(KaitaiStream(BytesIO(buf)))
@classmethod
def to_bytes(cls, buf, data=None):
return cls(KaitaiStream(BytesIO(buf)), _mode='w', _data=data)
@classmethod
def from_io(cls, io):
return cls(KaitaiStream(io))
@classmethod
def to_io(cls, io, data=None):
return cls(KaitaiStream(io), _mode='w', _data=data)
class KaitaiStream(object):
def __init__(self, io):
self._io = io
self.align_to_byte()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
def close(self):
self._io.close()
# ========================================================================
# Stream positioning
# ========================================================================
def is_eof(self):
if self.bits_left > 0:
return False
io = self._io
t = io.read(1)
if t == b'':
return True
else:
io.seek(-1, SEEK_CUR)
return False
def seek(self, n):
self._io.seek(n)
def pos(self):
return self._io.tell()
def size(self):
# Python has no internal File object API function to get
# current file / StringIO size, thus we use the following
# trick.
io = self._io
# Remember our current position
cur_pos = io.tell()
# Seek to the end of the File object
io.seek(0, SEEK_END)
# Remember position, which is equal to the full length
full_size = io.tell()
# Seek back to the current position
io.seek(cur_pos)
return full_size
# ========================================================================
# Integer numbers
# ========================================================================
packer_s1 = struct.Struct('b')
packer_s2be = struct.Struct('>h')
packer_s4be = struct.Struct('>i')
packer_s8be = struct.Struct('>q')
packer_s2le = struct.Struct('<h')
packer_s4le = struct.Struct('<i')
packer_s8le = struct.Struct('<q')
packer_u1 = struct.Struct('B')
packer_u2be = struct.Struct('>H')
packer_u4be = struct.Struct('>I')
packer_u8be = struct.Struct('>Q')
packer_u2le = struct.Struct('<H')
packer_u4le = struct.Struct('<I')
packer_u8le = struct.Struct('<Q')
# ------------------------------------------------------------------------
# Signed
# ------------------------------------------------------------------------
def read_s1(self):
return KaitaiStream.packer_s1.unpack(self.read_bytes(1))[0]
def write_s1(self, data):
return self.write_bytes(KaitaiStream.packer_s1.pack(data))
# ........................................................................
# Big-endian
# ........................................................................
def read_s2be(self):
return KaitaiStream.packer_s2be.unpack(self.read_bytes(2))[0]
def read_s4be(self):
return KaitaiStream.packer_s4be.unpack(self.read_bytes(4))[0]
def read_s8be(self):
return KaitaiStream.packer_s8be.unpack(self.read_bytes(8))[0]
def write_s2be(self, data):
return self.write_bytes(KaitaiStream.packer_s2be.pack(data))
def write_s4be(self, data):
return self.write_bytes(KaitaiStream.packer_s4be.pack(data))
def write_s8be(self, data):
return self.write_bytes(KaitaiStream.packer_s8be.pack(data))
# ........................................................................
# Little-endian
# ........................................................................
def read_s2le(self):
return KaitaiStream.packer_s2le.unpack(self.read_bytes(2))[0]
def read_s4le(self):
return KaitaiStream.packer_s4le.unpack(self.read_bytes(4))[0]
def read_s8le(self):
return KaitaiStream.packer_s8le.unpack(self.read_bytes(8))[0]
def write_s2le(self, data):
return self.write_bytes(KaitaiStream.packer_s2le.pack(data))
def write_s4le(self, data):
return self.write_bytes(KaitaiStream.packer_s4le.pack(data))
def write_s8le(self, data):
return self.write_bytes(KaitaiStream.packer_s8le.pack(data))
# ------------------------------------------------------------------------
# Unsigned
# ------------------------------------------------------------------------
def read_u1(self):
return KaitaiStream.packer_u1.unpack(self.read_bytes(1))[0]
def write_u1(self, data):
return self.write_bytes(KaitaiStream.packer_u1.pack(data))
# ........................................................................
# Big-endian
# ........................................................................
def read_u2be(self):
return KaitaiStream.packer_u2be.unpack(self.read_bytes(2))[0]
def read_u4be(self):
return KaitaiStream.packer_u4be.unpack(self.read_bytes(4))[0]
def read_u8be(self):
return KaitaiStream.packer_u8be.unpack(self.read_bytes(8))[0]
def write_u2be(self, data):
return self.write_bytes(KaitaiStream.packer_u2be.pack(data))
def write_u4be(self, data):
return self.write_bytes(KaitaiStream.packer_u4be.pack(data))
def write_u8be(self, data):
return self.write_bytes(KaitaiStream.packer_u8be.pack(data))
# ........................................................................
# Little-endian
# ........................................................................
def read_u2le(self):
return KaitaiStream.packer_u2le.unpack(self.read_bytes(2))[0]
def read_u4le(self):
return KaitaiStream.packer_u4le.unpack(self.read_bytes(4))[0]
def read_u8le(self):
return KaitaiStream.packer_u8le.unpack(self.read_bytes(8))[0]
def write_u2le(self, data):
return self.write_bytes(KaitaiStream.packer_u2le.pack(data))
def write_u4le(self, data):
return self.write_bytes(KaitaiStream.packer_u4le.pack(data))
def write_u8le(self, data):
return self.write_bytes(KaitaiStream.packer_u8le.pack(data))
# ========================================================================
# Floating point numbers
# ========================================================================
packer_f4be = struct.Struct('>f')
packer_f8be = struct.Struct('>d')
packer_f4le = struct.Struct('<f')
packer_f8le = struct.Struct('<d')
# ........................................................................
# Big-endian
# ........................................................................
def read_f4be(self):
return KaitaiStream.packer_f4be.unpack(self.read_bytes(4))[0]
def read_f8be(self):
return KaitaiStream.packer_f8be.unpack(self.read_bytes(8))[0]
def write_f4be(self, data):
return self.write_bytes(KaitaiStream.packer_f4be.pack(data))
def write_f8be(self, data):
return self.write_bytes(KaitaiStream.packer_f8be.pack(data))
# ........................................................................
# Little-endian
# ........................................................................
def read_f4le(self):
return KaitaiStream.packer_f4le.unpack(self.read_bytes(4))[0]
def read_f8le(self):
return KaitaiStream.packer_f8le.unpack(self.read_bytes(8))[0]
def write_f4le(self, data):
return self.write_bytes(KaitaiStream.packer_f4le.pack(data))
def write_f8le(self, data):
return self.write_bytes(KaitaiStream.packer_f8le.pack(data))
# ........................................................................
# Ruby VLB integer
# ........................................................................
def read_ruby_long(self):
length = self.read_s1()
if length == 0:
return 0
if 5 < length < 128:
return length - 5
elif -129 < length < -5:
return length + 5
result = 0
factor = 1
for s in range(abs(length)):
result += self.read_u1() * factor
factor *= 256
if length < 0:
result = result - factor
return result
def write_ruby_long(self, data):
if data == 0:
self.write_u1(0)
elif 0 < data < 123:
self.write_s1(data + 5)
elif -124 < data < 0:
self.write_s1(data - 5)
else:
size = int(math.ceil(data.bit_length() / 8.0))
if size > 5:
raise ValueError("%d too long for serialization" % data)
original_data = data
factor = 256 ** size
if data < 0 and data == -factor:
size -= 1
data += factor / 256
elif data < 0:
data += factor
sign = int(math.copysign(size, original_data))
self.write_s1(sign)
for i in range(size):
self.write_u1(data % 256)
data //= 256
# ========================================================================
# Unaligned bit values
# ========================================================================
def align_to_byte(self):
self.bits = 0
self.bits_left = 0
def read_bits_int_be(self, n):
bits_needed = n - self.bits_left
if bits_needed > 0:
# 1 bit => 1 byte
# 8 bits => 1 byte
# 9 bits => 2 bytes
bytes_needed = ((bits_needed - 1) // 8) + 1
buf = self.read_bytes(bytes_needed)
for byte in buf:
byte = KaitaiStream.int_from_byte(byte)
self.bits <<= 8
self.bits |= byte
self.bits_left += 8
# raw mask with required number of 1s, starting from lowest bit
mask = (1 << n) - 1
# shift self.bits to align the highest bits with the mask & derive reading result
shift_bits = self.bits_left - n
res = (self.bits >> shift_bits) & mask
# clear top bits that we've just read => AND with 1s
self.bits_left -= n
mask = (1 << self.bits_left) - 1
self.bits &= mask
return res
# Unused since Kaitai Struct Compiler v0.9+ - compatibility with
# older versions.
def read_bits_int(self, n):
return self.read_bits_int_be(n)
def read_bits_int_le(self, n):
bits_needed = n - self.bits_left
if bits_needed > 0:
# 1 bit => 1 byte
# 8 bits => 1 byte
# 9 bits => 2 bytes
bytes_needed = ((bits_needed - 1) // 8) + 1
buf = self.read_bytes(bytes_needed)
for byte in buf:
byte = KaitaiStream.int_from_byte(byte)
self.bits |= (byte << self.bits_left)
self.bits_left += 8
# raw mask with required number of 1s, starting from lowest bit
mask = (1 << n) - 1
# derive reading result
res = self.bits & mask
# remove bottom bits that we've just read by shifting
self.bits >>= n
self.bits_left -= n
return res
# ========================================================================
# Byte arrays
# ========================================================================
def alignment(self, a):
return (a - self.pos()) % a
def read_bytes(self, n, align=0):
if n < 0:
raise ValueError(
"requested invalid %d amount of bytes" %
(n,)
)
r = self._io.read(n)
if len(r) < n:
raise EOFError(
"requested %d bytes, but got only %d bytes" %
(n, len(r))
)
if align > 1:
self._io.seek(self.alignment(align), 1)
return r
def write_bytes(self, data, align=0, pad=0, padding=b'\0'):
if data is None:
return
nb = len(data)
if nb == 0 and align < 2 and pad < 1:
return
if self._io.write(data) != nb:
raise Exception("not all bytes written")
if pad > 0:
self._io.write(padding * pad)
if align > 1:
self._io.write(padding * self.alignment(align))
return
def read_bytes_full(self):
return self._io.read()
def read_bytes_term(self, term, include_term=False, consume_term=True, eos_error=True, elem_size=1):
r = b''
while True:
c = self._io.read(elem_size)
if c == b'':
if eos_error:
raise Exception(
"end of stream reached, but no terminator %d found" %
(term,)
)
else:
return r
elif ord(c) == term:
if include_term:
r += c
if not consume_term:
self._io.seek(-elem_size, SEEK_CUR)
return r
else:
r += c
def write_bytes_term(self, data, term=b'\0', align=0):
self.write_bytes(data, align=align, pad=1, padding=term)
def ensure_fixed_contents(self, expected):
actual = self._io.read(len(expected))
if actual != expected:
raise Exception(
"unexpected fixed contents: got %r, was waiting for %r" %
(actual, expected)
)
return actual
@staticmethod
def bytes_strip_right(data, pad_byte=b'\0'):
new_len = len(data)
if PY2:
# data[...] must yield an integer, to compare with integer pad_byte
data = bytearray(data)
while new_len > 0 and data[new_len - 1] == pad_byte:
new_len -= 1
return data[:new_len]
@staticmethod
def bytes_terminate(data, term, include_term=True, elem_size=1):
new_len = 0
max_len = len(data)
if PY2:
# data[...] must yield an integer, to compare with integer term
data = bytearray(data)
while new_len < max_len and data[new_len] != term:
new_len += elem_size
if include_term and new_len < max_len:
new_len += elem_size
return data[:new_len]
# ========================================================================
# Byte array processing
# ========================================================================
@staticmethod
def process_xor_one(data, key):
if PY2:
return bytes(bytearray(v ^ key for v in bytearray(data)))
else:
return bytes(v ^ key for v in data)
@staticmethod
def process_xor_many(data, key):
if PY2:
return bytes(bytearray(a ^ b for a, b in zip(bytearray(data), itertools.cycle(bytearray(key)))))
else:
return bytes(a ^ b for a, b in zip(data, itertools.cycle(key)))
@staticmethod
def process_rotate_left(data, amount, group_size):
if group_size != 1:
raise Exception(
"unable to rotate group of %d bytes yet" %
(group_size,)
)
mask = group_size * 8 - 1
anti_amount = -amount & mask
r = bytearray(data)
for i in range(len(r)):
r[i] = (r[i] << amount) & 0xff | (r[i] >> anti_amount)
return bytes(r)
@staticmethod
def process_decompress(data, method="zlib"):
if method == "zlib":
return zlib.decompress(data)
raise ValueError("unsupported unpacker: %s" % method)
@staticmethod
def process_compress(data, method="zlib"):
if method == "zlib":
return zlib.compress(data)
raise ValueError("unsupported packer: %s" % method)
# ========================================================================
# Misc
# ========================================================================
@staticmethod
def int_from_byte(v):
if PY2:
return ord(v)
return v
@staticmethod
def byte_array_index(data, i):
return KaitaiStream.int_from_byte(data[i])
@staticmethod
def byte_array_min(b):
return KaitaiStream.int_from_byte(min(b))
@staticmethod
def byte_array_max(b):
return KaitaiStream.int_from_byte(max(b))
@staticmethod
def resolve_enum(enum_obj, value):
"""Resolves value using enum: if the value is not found in the map,
we'll just use literal value per se. Works around problem with Python
enums throwing an exception when encountering unknown value.
"""
try:
return enum_obj(value)
except ValueError:
return value
class KaitaiStructError(BaseException):
"""Common ancestor for all error originating from Kaitai Struct usage.
Stores KSY source path, pointing to an element supposedly guilty of
an error.
"""
def __init__(self, msg, src_path):
super(KaitaiStructError, self).__init__("%s: %s" % (src_path, msg))
self.src_path = src_path
class UndecidedEndiannessError(KaitaiStructError):
"""Error that occurs when default endianness should be decided with
switch, but nothing matches (although using endianness expression
implies that there should be some positive result).
"""
def __init__(self, src_path):
super(KaitaiStructError, self).__init__("unable to decide on endianness for a type", src_path)
class ValidationFailedError(KaitaiStructError):
"""Common ancestor for all validation failures. Stores pointer to
KaitaiStream IO object which was involved in an error.
"""
def __init__(self, msg, io, src_path):
super(ValidationFailedError, self).__init__("at pos %d: validation failed: %s" % (io.pos(), msg), src_path)
self.io = io
class ValidationNotEqualError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to be equal to
"expected", but it turned out that it's not.
"""
def __init__(self, expected, actual, io, src_path):
super(ValidationNotEqualError, self).__init__("not equal, expected %s, but got %s" % (repr(expected), repr(actual)), io, src_path)
self.expected = expected
self.actual = actual
class ValidationLessThanError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to be
greater than or equal to "min", but it turned out that it's not.
"""
def __init__(self, min, actual, io, src_path):
super(ValidationLessThanError, self).__init__("not in range, min %s, but got %s" % (repr(min), repr(actual)), io, src_path)
self.min = min
self.actual = actual
class ValidationGreaterThanError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to be
less than or equal to "max", but it turned out that it's not.
"""
def __init__(self, max, actual, io, src_path):
super(ValidationGreaterThanError, self).__init__("not in range, max %s, but got %s" % (repr(max), repr(actual)), io, src_path)
self.max = max
self.actual = actual
class ValidationNotAnyOfError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to be
from the list, but it turned out that it's not.
"""
def __init__(self, actual, io, src_path):
super(ValidationNotAnyOfError, self).__init__("not any of the list, got %s" % (repr(actual)), io, src_path)
self.actual = actual
class ValidationExprError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to match
the expression, but it turned out that it doesn't.
"""
def __init__(self, actual, io, src_path):
super(ValidationExprError, self).__init__("not matching the expression, got %s" % (repr(actual)), io, src_path)
self.actual = actual
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
import kaitaistruct
from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO
VERSION_REQUIRED = '0.9'
if parse_version(kaitaistruct.__version__) < parse_version(VERSION_REQUIRED):
raise Exception("Incompatible KaitaiStruct Python API: %s or later is required, but you have %s"
% (VERSION_REQUIRED, kaitaistruct.__version__))
class UnityStrDat(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None, _mode="r", _data=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
if _mode == "r": self._read()
elif _mode == "w" and _data: self._write(_data)
def _read(self):
self.header1 = UnityStrDat.HeaderRaw(self._io, self, self._root)
self.num_strings = self._io.read_u4le()
self.strings_arr = [None] * (self.num_strings)
for i in range(self.num_strings):
self.strings_arr[i] = UnityStrDat.StringStruct(self._io, self, self._root)
def _write(self, data):
if not data: return
UnityStrDat.HeaderRaw(self._io, self, self._root, _mode="w", _data=data[0])
self._io.write_u4le(len(data[1]) - 1)
for i in range(len(data[1])):
UnityStrDat.StringStruct(self._io, self, self._root, _mode="w", _data=data[1][i][1])
class HeaderRaw(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None, _mode="r", _data=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._length = 68
if _mode == "r": self._read()
elif _mode == "w" and _data: self._write(_data)
def _read(self):
self.header_raw_bytes = self._io.read_bytes(self._length)
def _write(self, data=None):
if not data and self.header_raw_bytes: data = self.header_raw_bytes
l = len(data)
if not data or self._length != l:
raise Exception("No header provided or wrong size (%d, instead of %d)" % (l, self._length))
self._io.write_bytes(data)
class StringStruct(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None, _mode="r", _data=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._encoding = u"utf-8"
if _mode == "r": self._read()
elif _mode == "w" and _data: self._write(_data)
def _read(self):
self.s_size = self._io.read_u4le()
self.string = self._io.read_bytes(self.s_size, align=4).decode(self._encoding)
def _write(self, data):
if not data: return
data = data.encode(self._encoding)
self._io.write_u4le(len(data))
#data = data[:self.s_size] # keep the same size
self._io.write_bytes(data, align=4)
import csv, os, sys, argparse, kaitaistruct
from unity_str_dat import *
ZERO_WIDTH = '\u200C' #'\u200B'
SPACE_CHAR = ' '
ESCAPE_CHAR = '¶'
DELIMITER_CHAR = '→'
CSV_ENCODING = "utf-8-sig"
DIALECT_TRANSLATION = 'trans'
csv.register_dialect(DIALECT_TRANSLATION, delimiter=DELIMITER_CHAR, quotechar='', doublequote=False, quoting=csv.QUOTE_NONE, escapechar=ESCAPE_CHAR, lineterminator='\n')
def read_csv_list(fn, ftype=DIALECT_TRANSLATION):
if os.path.isfile(fn):
with open(fn, 'r', newline='', encoding=CSV_ENCODING) as f:
return list(csv.reader(f, ftype))
else:
return list()
def write_csv_list(fn, lst, ftype=DIALECT_TRANSLATION):
if not lst or len(lst) == 0: return
with open(fn, 'w', newline='', encoding=CSV_ENCODING) as f:
writer = csv.writer(f, ftype)
for row in lst:
writer.writerow(row)
def main():
parser = argparse.ArgumentParser(description='Unity file raw tool')
parser.add_argument('filename', metavar='file_name', help='Name of dat file with strings')
mode = parser.add_mutually_exclusive_group()
mode.add_argument('-pack', help="pack", action="store_true")
mode.add_argument("-unpack", help="unpack", action="store_true")
if len(sys.argv) < 2:
print("Unity hack tool v1")
parser.print_help(sys.stderr)
return
app_args = parser.parse_args()
if not app_args.filename: return
else: file_name = app_args.filename
if app_args.pack:
a = UnityStrDat.from_file(file_name)
a.close()
data = read_csv_list(os.path.splitext(file_name)[0]+'_strings.csv')
#fill_or_cut_string
new_name = "translation_out\\" + os.path.splitext(file_name)[0] + "_trans" + os.path.splitext(file_name)[1]
path_out = os.path.dirname(new_name)
if path_out != '' and not os.path.exists(path_out):
os.makedirs(path_out, exist_ok=True)
a = UnityStrDat.to_file(new_name, (a.header1.header_raw_bytes, data))
a.close()
else:
data = []
with UnityStrDat.from_file(file_name) as a:
for i in a.strings_arr:
data.append([i.string, ''])
write_csv_list(os.path.splitext(file_name)[0]+'_strings.csv', data)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment