Skip to content

Instantly share code, notes, and snippets.

@UserUnknownFactor
Last active March 30, 2024 15:49
Show Gist options
  • Save UserUnknownFactor/7a399bfbf3969e48711e290bbfa0b71d to your computer and use it in GitHub Desktop.
Save UserUnknownFactor/7a399bfbf3969e48711e290bbfa0b71d to your computer and use it in GitHub Desktop.
Bakin resource extractor
import os, re, argparse, glob
from unicodedata import category
from filetranslate.service_fn import write_csv_list, read_csv_list, read_csv_dict
from filetranslate.language_fn import tag_hash
# NOTE: This should work for both Bakin RPG Maker and Smile Game Builder files
def write_7bit_encoded_int(stream, value):
result = bytearray()
while True:
b = value & 0x7F
value >>= 7
if value == 0:
result.extend([b])
break
else:
result.extend([b | 0x80])
return bytes(result)
def read_7bit_encoded_int(stream):
result = 0
bit_count = 0
while True:
if bit_count == 35:
raise ValueError("Bad 7-bit encoded integer")
b = stream.read(1)[0]
result |= (b & 0x7F) << bit_count
bit_count += 7
if (b & 0x80) == 0:
break
return result
def has_special_chars(string):
return any(category(c) in ("Cc", "Cf", "Cs", "Co", "Cn", "Sk", "Mn", "Mc", "Me") for c in string if ord(c) not in (0xad, 0xa, 0xd)) #"Sm", # include tab 0x9?
def is_not_skippable(string):
return string and not re.match(r'^[ a-zA-Z\r\n\t\-\d_<>#\$%@\^&*()\[\]{}:`\'\"/\\.\|,+=!?]+$', string) and not string.startswith("shader ") and "_" not in string
def has_english(string):
return any((ord(c) <= 128 for c in string))
def read_string(stream, maximum):
pos = stream.tell()
size = read_7bit_encoded_int(stream)
if size <= 0 or size >= maximum - pos:
return None
string = stream.read(size).decode('utf-8')
if has_special_chars(string):
raise Exception("Has special characters")
return string
def search_strings_in_binary_file(file_path, file_size):
strings = []
position = file.tell()
while position <= file_size:
try:
string = read_string(file, file_size)
if string:
length = file.tell() - position
if is_not_skippable(string):
strings.append([string.replace('\r', ''), '', position, length]) # JIK
has_en = has_english(string)
i = 1
while has_en and i < length:
start = position + i
file.seek(start)
try:
# mistaken a char for a length?
new_string = read_string(file, file_size)
length = file.tell() - start
has_en = has_english(new_string)
if is_not_skippable(new_string):
strings.append([new_string.replace('\r', ''), '', start, file.tell() - start])
except:
file.seek(start - 1 + length)
break
i += 1
else:
file.seek(position + 1)
except EOFError:
break
except:
file.seek(position + 1)
position = file.tell()
return strings
def replace_raw_strings(bytes_var, replacements):
split_bytes_var = []
old_offset = 0
old_length = 0
for old_string, _, offset, length in replacements:
if old_string.startswith("//"): continue
split_bytes_var.append(bytes_var[old_offset+old_length:offset])
split_bytes_var.append(bytes_var[offset:offset+length])
old_offset = offset
old_length = length
split_bytes_var.append(bytes_var[old_offset+old_length:])
for i, byte_val in enumerate(split_bytes_var):
for old_string, new_string, offset, length in replacements:
if old_string.startswith("//"): continue
new_bytes = new_string.encode('utf-8')
if byte_val == new_bytes or byte_val == new_string.replace('\n', '\r\n').encode('utf-8'):
new_len = write_7bit_encoded_int(len(new_bytes))
split_bytes_var[i] = new_len + new_bytes
break
return b''.join(split_bytes_var)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Unpack or pack a file")
parser.add_argument("-m", "--mask", nargs=1, type=str, default=".\\**\\*.rbr", help="Mask of the files (default: \".\\**\\*.rbr\")")
parser.add_argument("-u", "--unpack", action="store_true", help="Extract the strings")
parser.add_argument("-p", "--pack", action="store_true", help="Replace the strings")
args = parser.parse_args()
tag_pattern = re.compile(r'\\[a-zA-Z](?:\[[^\]]+\])?|[「」]|\<\/?[^>\[\]]+\>')
if args.unpack:
tags = set()
for fn in glob.glob(args.mask, recursive=True):
print(f"Searching strings of {fn} ...")
result = None
with open(fn, "rb") as file:
file.seek(8) # header
result = search_strings_in_binary_file(file, os.path.getsize(fn))
if result:
file_name, file_extension = os.path.splitext(fn)
file_name += "_attributes.csv"
transaltions = read_csv_dict(file_name)
if transaltions:
# retain old translations on rescan
for i, r in enumerate(result):
if r[0] in transaltions:
result[i][1] = transaltions[r[0]]
elif ("//" + r[0]) in transaltions:
result[i][1] = transaltions["//" + r[0]]
result[i][0] = "//" + r[0]
write_csv_list(file_name, result)
for r in result:
matches = tag_pattern.findall(r[0])
tags.update(matches)
tags_array = []
for t in tags:
tags_array.append([t, tag_hash(t, lang='EN')])
tags_array = sorted(tags_array, key=lambda x: len(x[0]), reverse=True)
write_csv_list("replacement_tags.csv", tags_array)
elif args.pack:
for fn in glob.glob(args.mask, recursive=True):
print(f"Replacing strings of {fn} ...")
directory, file_name = os.path.split(fn)
file_name, file_extension = os.path.splitext(fn)
file_name += "_attributes.csv"
with open(fn, "rb") as f:
transaltions = read_csv_list(file_name)
if transaltions:
new_data = replace_raw_strings(f.read(), transaltions)
out_dir = os.path.join(directory, "translation_out")
if not os.path.isdir(out_dir):
os.makedirs(out_dir, exist_ok=True)
with open(fn.replace(directory, out_dir), "wb") as o:
o.write(new_data)
else:
print("Please specify whether to unpack (-u) or pack (-p) the file(s) (by mask -m)")
import numpy as np
from io import BufferedReader
ZIP_HEADER = b"PK\x03\x04\x20\x00\x00\x00"
def decrypt(buff: bytes, KEY, start: int = 0) -> bytes:
start_pad = start % 16
if start_pad:
KEY = np.roll(KEY, -start_pad)
buff = np.frombuffer(buff, dtype='uint8')
buff_len = buff.shape[0]
return np.subtract(buff, np.tile(KEY, buff_len//KEY.shape[0]+1)[:buff_len]).tobytes()
def enrypt(buff: bytes, KEY, start: int = 0) -> bytes:
start_pad = start % 16
if start_pad:
KEY = np.roll(KEY, -start_pad)
buff = np.frombuffer(buff, dtype='uint8')
buff_len = buff.shape[0]
return np.add(buff, np.tile(KEY, buff_len//KEY.shape[0]+1)[:buff_len]).tobytes()
class EncrBufferedReader(BufferedReader):
def __init__(self, raw, KEY=None, offset=8,
new_header=None, old_header=ZIP_HEADER,
decrypt_func=decrypt, encrypt_func=enrypt,
*args, **kwargs):
super().__init__(raw, *args, **kwargs)
self.KEY = KEY
self.decrypt_func = decrypt_func
self.encrypt_func = encrypt_func
self.block_offset = 0
self.base_offset = offset
self.original_header = old_header
self.replacement_header = new_header
self.cut_start = len(new_header) if new_header is not None else 0
def _decrypt_block(self, block_data):
if self.decrypt_func is None or self.KEY is None:
raise ValueError("Decryption function or key not provided.")
return self.decrypt_func(block_data, self.KEY, self.block_offset + self.base_offset)
def _encrypt_block(self, block_data):
if self.encrypt_func is None or self.KEY is None:
raise ValueError("Encryption function or key not provided.")
return self.encrypt_func(block_data, self.KEY, self.block_offset + self.base_offset)
def _parse_first(self, data):
if self.cut_start and self.block_offset == 0:
if len(data) >= self.cut_start:
return self.original_header + data[self.cut_start:]
else:
return self.original_header[:len(data)]
return data
def read(self, size=-1):
self.block_offset = self.tell()
data = super().read() if size < 0 else super().read(size)
if data:
return self._parse_first(self._decrypt_block(data))
return b''
def read1(self, size=-1):
self.block_offset = self.tell()
data = super().read1() if size < 0 else super().read1(size)
if data:
return self._parse_first(self._decrypt_block(data))
return b''
def write(self, data):
self.block_offset = self.tell()
encrypted_data = self._encrypt_block(data)
if self.cut_start and self.block_offset == 0:
if len(data) >= len(self.replacement_header):
data = self.replacement_header + data[self.cut_start:]
else:
data = (self.replacement_header)[self.block_offset:len(data)]
return super().write(encrypted_data)
import os, sys, io, zlib, re
from hashlib import md5
from io import BufferedReader
from zipfiledel import ZipFile, DeletableZipFile, ZIP_DEFLATED
from encreader import encrypt, decrypt, ZIP_HEADER
import numpy as np
# big thanks to HNIdesu for the format reversing
KEY1 = np.frombuffer(b"\x00\x08\x0e\t\x14<BFH\t\x14\x9a0\xa9T\xe1", dtype=np.uint8)
KEY2 = np.frombuffer(b"\x00\x0e\x08\x1e\x187\x12\x00H\x87F\x0b\x9ch\xa8K", dtype=np.uint8)
RBPACK_HEADER = b"BKNPAK"
DEFAULT_VERSION = b"\x00\x01"
class FileEntry:
name: str
comp_size: int
uncomp_size: int
def __init__(self, file):
self.name = read_encrypted_string(file)
self.comp_size = int.from_bytes(file.read(8),"little")
self.uncomp_size = int.from_bytes(file.read(8),"little")
def __repr__(self) -> str:
return f"{self.name} ({self.comp_size} -> {self.uncomp_size})"
def equals(a1:bytes, a2:bytes) -> bool:
if(len(a1) != len(a2)):
return False
return a1 == a2
def read_string(file:BufferedReader) -> str:
str_length = file.read(1)[0]
return file.read(str_length).decode("utf-8")
def read_encrypted_string(file:BufferedReader) -> str:
str_length = file.read(1)[0]
offset = file.tell()
return decrypt(file.read(str_length),KEY2, offset).decode("utf-8")
def verify(verify_code:int, md5_hash:bytes) -> bool:
if(equals(md5(int.to_bytes(verify_code+2525,4,byteorder = "little")).digest(),md5_hash)):
return True
if(equals(md5(int.to_bytes(verify_code+5252,4,byteorder = "little")).digest(),md5_hash)):
return True
return False
def is_primary_type(filepath:str) -> bool:
return any([filepath.endswith(ext) for ext in (".cg",".cgh",".dlp_d",".exe",".dll",".dlp",".webm")])
def extract_main_zip(file:BufferedReader, save_directory:str):
print("Extracting and decrypting main zip file...")
zip_length = int.from_bytes(file.read(8),"little")
offset = file.tell()
zip_extract_path = save_directory #os.path.join(save_directory,"unpack.zip")
if not os.path.exists(zip_extract_path):
os.mkdir(zip_extract_path)
main_zip = set()
with ZipFile(io.BytesIO(ZIP_HEADER+decrypt(file.read(zip_length-8), KEY1, offset))) as zipfile:
for entry_name in zipfile.namelist():
save_path = os.path.join(zip_extract_path,entry_name.replace("/", os.path.sep))
if(entry_name.endswith("/")):#directory entry
os.makedirs(save_path,exist_ok = True)
continue
os.makedirs(os.path.dirname(save_path),exist_ok = True)
if not os.path.isfile(save_path):
print(f"Extracting main zip's file {entry_name}...")
data = zipfile.read(entry_name)
data = decrypt(data, KEY1 if is_primary_type(save_path) else KEY2)
with open(save_path,"wb") as fs:
fs.write(data)
else:
print(f"Main zip's file {entry_name} already extracted")
main_zip.add(entry_name)
with open("main_zip_files.txt", "w", encoding="utf-8") as mzf:
mzf.write("\n".join(main_zip))
"""
# Restore files from the same dir using main_zip_files.txt list
and the rest as File entries
main_zip = set()
with open("main_zip_files.txt", "r", encoding="utf-8") as f:
main_zip = set([line.strip() for line in f])
"""
def sanitize_path(path:str) -> str:
return re.sub("\.+" + re.escape(os.path.sep), '', path.replace("/", os.path.sep))
def extract_entries(file:BufferedReader, entry_list:list[FileEntry], save_dir:str):
print("Decrypting all entries...")
for e in entry_list:
offset = file.tell()
dest_path = os.path.join(save_dir, sanitize_path(e.name))
if not os.path.isfile(dest_path):
print(f"Extracting {e.name} at {offset:02X} (size:{e.uncomp_size} B)...")
data = file.read(e.comp_size)
if(e.comp_size != e.uncomp_size):
data = zlib.decompress(data)
data = decrypt(data, KEY2, 0)
if not os.path.exists(os.path.dirname(dest_path)):
os.makedirs(os.path.dirname(dest_path), exist_ok = True)
with open(dest_path,"wb") as fs:
fs.write(data)
else:
print(f"File {e.name} already extracted")
def main():
print(f"Usage: {os.path.basename(sys.argv[0]).replace('.py', '')} [rbpack_archive] [output_directory]")
if len(sys.argv) < 2:
print(f"Example: {sys.argv[0]} data.rbpack data")
archive_path = sys.argv[1] if len(sys.argv) > 1 else "data.rbpack"
save_directory = sys.argv[2] if len(sys.argv) > 2 else "data"
if not os.path.exists(save_directory):
os.mkdir(save_directory)
if not os.path.exists(archive_path):
print(f"file {archive_path} not found!")
exit()
with open(archive_path,"rb") as file:
if(file.read(len(RBPACK_HEADER)) != RBPACK_HEADER):
print("Signature mismatch!")
exit()
HEADER_VERSION = file.read(2)
print(f"Archive format version: {int.from_bytes(HEADER_VERSION, 'big')}")
resource_table_offset = int.from_bytes(file.read(8),"little")+file.tell()
loading_form_title = read_string(file)
verify_code = int.from_bytes(file.read(4), "little")
md5_hash_length = file.read(1)[0]
md5_hash = file.read(md5_hash_length)
if not verify(verify_code, md5_hash):
print("Archive verification failed")
exit()
extract_main_zip(file, save_directory)
file.seek(resource_table_offset, 0)
entry_count = int.from_bytes(file.read(4),"little")
entry_list = list()
for i in range(0,entry_count):
entry_list.append(FileEntry(file))
extract_entries(file, entry_list, save_directory)
if __name__ == '__main__':
main()
import os, sys, re
from zipfiledel import ZipFile, EditableZipFile, ZIP_DEFLATED
from time import mktime
import numpy as np
from encreader import EncrBufferedReader, decrypt
KEY1 = np.frombuffer(b"H\t\x14\x9a0\xa9T\xe1\x00\x08\x0e\t\x14<BF", dtype=np.uint8)
KEY2 = np.frombuffer(b"\x00\x0e\x08\x1e\x187\x12\x00H\x87F\x0b\x9ch\xa8K", dtype=np.uint8)
SGBPACK_HEADER = b"SGBDAT"
def is_primary_type(filepath:str) -> bool:
return any([filepath.endswith(ext) for ext in (".cg",".cgh",".dlp_d",".exe",".dll",".dlp",".webm")])
def sanitize_path(path:str) -> str:
return re.sub("\.+" + re.escape(os.path.sep), '', path.replace("/", os.path.sep))
def main():
print(f"Usage: {os.path.basename(sys.argv[0]).replace('.py', '')} [sgbpack_archive] [output_directory]")
if len(sys.argv) < 2:
print(f"Example: {sys.argv[0]} data.sgbpack data")
archive_path = sys.argv[1] if len(sys.argv) > 1 else "data.sgbpack"
save_directory = sys.argv[2] if len(sys.argv) > 2 else "data"
if not os.path.exists(save_directory):
os.mkdir(save_directory)
if not os.path.exists(archive_path):
print(f"file {archive_path} not found!")
exit()
with open(archive_path,"rb") as file:
if(file.read(len(SGBPACK_HEADER)) != SGBPACK_HEADER):
print("Signature mismatch!")
exit()
HEADER_VERSION = file.read(2)
print(f"Archive format version: {int.from_bytes(HEADER_VERSION, 'big')}")
with EncrBufferedReader(open(archive_path, 'rb'), KEY=KEY1, new_header=SGBPACK_HEADER+HEADER_VERSION) as file:
print("Extracting and decrypting the package...")
dirs = {}
with ZipFile(file) as zipfile:
for entry in zipfile.infolist():
name = ZipFile._sanitize_windows_name(entry.filename, os.path.sep)
mod_time = mktime(entry.date_time + (0, 0, -1))
save_path = os.path.join(save_directory, name.replace("/", os.path.sep))
if(name.endswith("/")): # directory entry
os.makedirs(save_path, exist_ok = True)
dirs[save_path[:-1]] = mod_time # last to do
continue
os.makedirs(os.path.dirname(save_path),exist_ok = True)
if not os.path.isfile(save_path):
print(f"Extracting {name}...")
data = zipfile.read(name)
data = decrypt(data, KEY2)
with open(save_path,"wb") as fs:
fs.write(data)
# set original modification time
os.utime(save_path, (mod_time, mod_time))
else:
print(f"{name} already extracted")
#for name in dirs:
#mod_time = dirs[name]
#os.utime(name, (mod_time, mod_time)) # need to be enabled on windows to work?
def update_files_in_zip(zip_path, translation_dir):
with open(zip_path, 'r+b') as zip:
with EditableZipFile(zip, 'a', ZIP_DEFLATED) as zfile:
for folder, _, filenames in os.walk(translation_dir):
for name in filenames:
file_path = os.path.join(folder, name).repace(os.path.sep, '/')
if file_path in zfile.namelist():
info = zfile.getinfo(name)
if info.date_time < os.path.getmtime(file_path):
with open(file_path, 'rb') as f:
zfile.replace(info, f.read())
if __name__ == '__main__':
main()
from __future__ import annotations
from zipfile import *
from zipfile import _get_compressor
from zlib import crc32
from collections import deque
class EditableZipFile(ZipFile):
"""Implements ZipFile version that supports file deleting and replacing"""
def remove(self, zinfo_or_name: ZipInfo | str):
"""Remove a member from the archive.
### Parameters
zinfo_or_name : set[ZipInfo] | str
ZipInfo structure or name of the file.
"""
self._check_mod_state()
# Make sure we have an existing info object
zinfo_or_name = self._return_zinfo_from_zinfo_or_name(zinfo_or_name)
return self._remove_members({zinfo_or_name})
def replace(self, zinfo_or_name: ZipInfo | str, data: bytes):
"""Replaces a member from the archive.
### Parameters
zinfo_or_name : set[ZipInfo] | str
ZipInfo structure or name of the file.
"""
self._check_mod_state()
# Make sure we have an existing info object
zinfo_or_name = self._return_zinfo_from_zinfo_or_name(zinfo_or_name)
return self._replace_member(zinfo_or_name, data)
def _return_zinfo_from_zinfo_or_name(self, zinfo_or_name):
if isinstance(zinfo_or_name, ZipInfo):
if zinfo_or_name not in self.filelist:
raise KeyError(
f"There is no file {zinfo_or_name} in the archive")
elif isinstance(zinfo_or_name, str):
zinfo_or_name = self.getinfo(zinfo_or_name)
else:
raise KeyError(
f"Key should be ZipInfo or string but got {type(zinfo_or_name)}")
return zinfo_or_name
def _check_mod_state(self):
if self.mode not in ('w', 'x', 'a'):
raise ValueError("This function requires mode 'w', 'x', or 'a'")
if not self.fp:
raise ValueError(
"Attempt to write to a closed ZIP archive")
if self._writing:
raise ValueError(
"Can't write to ZIP archive while an open writing handle exists")
def _remove_members(self, members: set[ZipInfo], *, scrub: bool = True, chunk_size: int = 2 ** 20):
"""Remove members in a zip file.
All members (as ZipInfo) should exist in the zip;
otherwise it'll end in an inconsistent state.
### Parameters
members : set[ZipInfo]
Input members set.
scrub : bool
Remove previously occupied space from the archive.
chunk_size : int
Chunk size for reading (default: `1048576`)
"""
fp = self.fp
entry_offset = 0
member_seen = False
# get a sorted filelist by header offset
filelist = deque(sorted(self.filelist, key=lambda x: x.header_offset))
while filelist:
info = filelist.popleft()
is_member = info in members
if not (member_seen or is_member):
continue
# get the total size of the entry
next_header_offset = next((entry.header_offset for entry in filelist), self.start_dir)
entry_size = next_header_offset - info.header_offset
if is_member:
member_seen = True
entry_offset += entry_size
# update caches
self.filelist.remove(info)
if info.filename in self.NameToInfo:
del self.NameToInfo[info.filename]
continue
if scrub:
# update the header and move entry data to the new position
old_header_offset = info.header_offset
info.header_offset -= entry_offset
fp.seek(old_header_offset)
read_size = 0
while read_size < entry_size:
data = fp.read(min(entry_size - read_size, chunk_size))
fp.seek(info.header_offset + read_size)
fp.write(data)
fp.flush()
read_size += len(data)
# Avoid missing entry if entries have a duplicated name.
# Reverse the order as NameToInfo normally stores the last added one.
for info in reversed(self.filelist):
self.NameToInfo.setdefault(info.filename, info)
# update state
if scrub:
self.start_dir -= entry_offset
self._didModify = True
# seek to the start of the central dir
fp.seek(self.start_dir)
def _replace_member(self, member: ZipInfo, data: bytes, owerwrite_mode=True):
"""Replace a member in a zip file with new data.
If the new data takes less space than the old data, it is written in place.
If the new data takes more space, it is added after all the old data.
### Parameters
member : ZipInfo
The member to replace.
data : bytes
The new data to write.
owerwrite_mode : Optional[bool]
`False` = default replace with deleting;
`True` = try to replace data in place.
"""
if member not in self.filelist:
raise ValueError("Member not found in zip file.")
compress_type = member.compress_type
compress_level = member.compress_level
# Compress the new data
_compressor = _get_compressor(compress_type, compress_level)
compressed_data = _compressor.compress(data) + _compressor.flush()
# Check if the new data takes less space than the old data
compressed_len = len(compressed_data)
if owerwrite_mode and compressed_len <= member.compress_size:
# Write the new data in place of the old data
self.fp.seek(member.header_offset + 30 + len(member.filename) + len(member.extra))
self.fp.write(compressed_data)
pad = member.compress_size - compressed_len
if pad:
self.fp.write(b'\0' * pad)
self.fp.flush()
# Update the member info
member.compress_size = compressed_len
new_member.compress_type = compress_type
member.file_size = len(data)
member.CRC = crc32(data)
else:
# Create a new member with the updated data
new_member = ZipInfo(member.filename, member.date_time)
new_member.compress_type = compress_type
new_member.file_size = len(data)
new_member.CRC = crc32(data)
# Remove the old member
self._remove_members({member})
self._writecheck(new_member)
# Write the new member to the end of the archive
self.fp.seek(self.start_dir)
self.writestr(new_member, data)
# Update the central directory
self._didModify = True
self.start_dir = self.fp.tell()
self.fp.seek(self.start_dir)
self._write_end_record()
if __name__ == '__main__':
with EditableZipFile(open("test.zip", "r+b"), "a") as zip:
with open("zipfiledel.py", "rb") as f:
zip.replace("zipfiledel.py", f.read())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment