Last active
March 30, 2024 15:49
-
-
Save UserUnknownFactor/7a399bfbf3969e48711e290bbfa0b71d to your computer and use it in GitHub Desktop.
Bakin resource extractor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, re, argparse, glob | |
from unicodedata import category | |
from filetranslate.service_fn import write_csv_list, read_csv_list, read_csv_dict | |
from filetranslate.language_fn import tag_hash | |
# NOTE: This should work for both Bakin RPG Maker and Smile Game Builder files | |
def write_7bit_encoded_int(stream, value): | |
result = bytearray() | |
while True: | |
b = value & 0x7F | |
value >>= 7 | |
if value == 0: | |
result.extend([b]) | |
break | |
else: | |
result.extend([b | 0x80]) | |
return bytes(result) | |
def read_7bit_encoded_int(stream): | |
result = 0 | |
bit_count = 0 | |
while True: | |
if bit_count == 35: | |
raise ValueError("Bad 7-bit encoded integer") | |
b = stream.read(1)[0] | |
result |= (b & 0x7F) << bit_count | |
bit_count += 7 | |
if (b & 0x80) == 0: | |
break | |
return result | |
def has_special_chars(string): | |
return any(category(c) in ("Cc", "Cf", "Cs", "Co", "Cn", "Sk", "Mn", "Mc", "Me") for c in string if ord(c) not in (0xad, 0xa, 0xd)) #"Sm", # include tab 0x9? | |
def is_not_skippable(string): | |
return string and not re.match(r'^[ a-zA-Z\r\n\t\-\d_<>#\$%@\^&*()\[\]{}:`\'\"/\\.\|,+=!?]+$', string) and not string.startswith("shader ") and "_" not in string | |
def has_english(string): | |
return any((ord(c) <= 128 for c in string)) | |
def read_string(stream, maximum): | |
pos = stream.tell() | |
size = read_7bit_encoded_int(stream) | |
if size <= 0 or size >= maximum - pos: | |
return None | |
string = stream.read(size).decode('utf-8') | |
if has_special_chars(string): | |
raise Exception("Has special characters") | |
return string | |
def search_strings_in_binary_file(file_path, file_size): | |
strings = [] | |
position = file.tell() | |
while position <= file_size: | |
try: | |
string = read_string(file, file_size) | |
if string: | |
length = file.tell() - position | |
if is_not_skippable(string): | |
strings.append([string.replace('\r', ''), '', position, length]) # JIK | |
has_en = has_english(string) | |
i = 1 | |
while has_en and i < length: | |
start = position + i | |
file.seek(start) | |
try: | |
# mistaken a char for a length? | |
new_string = read_string(file, file_size) | |
length = file.tell() - start | |
has_en = has_english(new_string) | |
if is_not_skippable(new_string): | |
strings.append([new_string.replace('\r', ''), '', start, file.tell() - start]) | |
except: | |
file.seek(start - 1 + length) | |
break | |
i += 1 | |
else: | |
file.seek(position + 1) | |
except EOFError: | |
break | |
except: | |
file.seek(position + 1) | |
position = file.tell() | |
return strings | |
def replace_raw_strings(bytes_var, replacements): | |
split_bytes_var = [] | |
old_offset = 0 | |
old_length = 0 | |
for old_string, _, offset, length in replacements: | |
if old_string.startswith("//"): continue | |
split_bytes_var.append(bytes_var[old_offset+old_length:offset]) | |
split_bytes_var.append(bytes_var[offset:offset+length]) | |
old_offset = offset | |
old_length = length | |
split_bytes_var.append(bytes_var[old_offset+old_length:]) | |
for i, byte_val in enumerate(split_bytes_var): | |
for old_string, new_string, offset, length in replacements: | |
if old_string.startswith("//"): continue | |
new_bytes = new_string.encode('utf-8') | |
if byte_val == new_bytes or byte_val == new_string.replace('\n', '\r\n').encode('utf-8'): | |
new_len = write_7bit_encoded_int(len(new_bytes)) | |
split_bytes_var[i] = new_len + new_bytes | |
break | |
return b''.join(split_bytes_var) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Unpack or pack a file") | |
parser.add_argument("-m", "--mask", nargs=1, type=str, default=".\\**\\*.rbr", help="Mask of the files (default: \".\\**\\*.rbr\")") | |
parser.add_argument("-u", "--unpack", action="store_true", help="Extract the strings") | |
parser.add_argument("-p", "--pack", action="store_true", help="Replace the strings") | |
args = parser.parse_args() | |
tag_pattern = re.compile(r'\\[a-zA-Z](?:\[[^\]]+\])?|[「」]|\<\/?[^>\[\]]+\>') | |
if args.unpack: | |
tags = set() | |
for fn in glob.glob(args.mask, recursive=True): | |
print(f"Searching strings of {fn} ...") | |
result = None | |
with open(fn, "rb") as file: | |
file.seek(8) # header | |
result = search_strings_in_binary_file(file, os.path.getsize(fn)) | |
if result: | |
file_name, file_extension = os.path.splitext(fn) | |
file_name += "_attributes.csv" | |
transaltions = read_csv_dict(file_name) | |
if transaltions: | |
# retain old translations on rescan | |
for i, r in enumerate(result): | |
if r[0] in transaltions: | |
result[i][1] = transaltions[r[0]] | |
elif ("//" + r[0]) in transaltions: | |
result[i][1] = transaltions["//" + r[0]] | |
result[i][0] = "//" + r[0] | |
write_csv_list(file_name, result) | |
for r in result: | |
matches = tag_pattern.findall(r[0]) | |
tags.update(matches) | |
tags_array = [] | |
for t in tags: | |
tags_array.append([t, tag_hash(t, lang='EN')]) | |
tags_array = sorted(tags_array, key=lambda x: len(x[0]), reverse=True) | |
write_csv_list("replacement_tags.csv", tags_array) | |
elif args.pack: | |
for fn in glob.glob(args.mask, recursive=True): | |
print(f"Replacing strings of {fn} ...") | |
directory, file_name = os.path.split(fn) | |
file_name, file_extension = os.path.splitext(fn) | |
file_name += "_attributes.csv" | |
with open(fn, "rb") as f: | |
transaltions = read_csv_list(file_name) | |
if transaltions: | |
new_data = replace_raw_strings(f.read(), transaltions) | |
out_dir = os.path.join(directory, "translation_out") | |
if not os.path.isdir(out_dir): | |
os.makedirs(out_dir, exist_ok=True) | |
with open(fn.replace(directory, out_dir), "wb") as o: | |
o.write(new_data) | |
else: | |
print("Please specify whether to unpack (-u) or pack (-p) the file(s) (by mask -m)") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from io import BufferedReader | |
ZIP_HEADER = b"PK\x03\x04\x20\x00\x00\x00" | |
def decrypt(buff: bytes, KEY, start: int = 0) -> bytes: | |
start_pad = start % 16 | |
if start_pad: | |
KEY = np.roll(KEY, -start_pad) | |
buff = np.frombuffer(buff, dtype='uint8') | |
buff_len = buff.shape[0] | |
return np.subtract(buff, np.tile(KEY, buff_len//KEY.shape[0]+1)[:buff_len]).tobytes() | |
def enrypt(buff: bytes, KEY, start: int = 0) -> bytes: | |
start_pad = start % 16 | |
if start_pad: | |
KEY = np.roll(KEY, -start_pad) | |
buff = np.frombuffer(buff, dtype='uint8') | |
buff_len = buff.shape[0] | |
return np.add(buff, np.tile(KEY, buff_len//KEY.shape[0]+1)[:buff_len]).tobytes() | |
class EncrBufferedReader(BufferedReader): | |
def __init__(self, raw, KEY=None, offset=8, | |
new_header=None, old_header=ZIP_HEADER, | |
decrypt_func=decrypt, encrypt_func=enrypt, | |
*args, **kwargs): | |
super().__init__(raw, *args, **kwargs) | |
self.KEY = KEY | |
self.decrypt_func = decrypt_func | |
self.encrypt_func = encrypt_func | |
self.block_offset = 0 | |
self.base_offset = offset | |
self.original_header = old_header | |
self.replacement_header = new_header | |
self.cut_start = len(new_header) if new_header is not None else 0 | |
def _decrypt_block(self, block_data): | |
if self.decrypt_func is None or self.KEY is None: | |
raise ValueError("Decryption function or key not provided.") | |
return self.decrypt_func(block_data, self.KEY, self.block_offset + self.base_offset) | |
def _encrypt_block(self, block_data): | |
if self.encrypt_func is None or self.KEY is None: | |
raise ValueError("Encryption function or key not provided.") | |
return self.encrypt_func(block_data, self.KEY, self.block_offset + self.base_offset) | |
def _parse_first(self, data): | |
if self.cut_start and self.block_offset == 0: | |
if len(data) >= self.cut_start: | |
return self.original_header + data[self.cut_start:] | |
else: | |
return self.original_header[:len(data)] | |
return data | |
def read(self, size=-1): | |
self.block_offset = self.tell() | |
data = super().read() if size < 0 else super().read(size) | |
if data: | |
return self._parse_first(self._decrypt_block(data)) | |
return b'' | |
def read1(self, size=-1): | |
self.block_offset = self.tell() | |
data = super().read1() if size < 0 else super().read1(size) | |
if data: | |
return self._parse_first(self._decrypt_block(data)) | |
return b'' | |
def write(self, data): | |
self.block_offset = self.tell() | |
encrypted_data = self._encrypt_block(data) | |
if self.cut_start and self.block_offset == 0: | |
if len(data) >= len(self.replacement_header): | |
data = self.replacement_header + data[self.cut_start:] | |
else: | |
data = (self.replacement_header)[self.block_offset:len(data)] | |
return super().write(encrypted_data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, sys, io, zlib, re | |
from hashlib import md5 | |
from io import BufferedReader | |
from zipfiledel import ZipFile, DeletableZipFile, ZIP_DEFLATED | |
from encreader import encrypt, decrypt, ZIP_HEADER | |
import numpy as np | |
# big thanks to HNIdesu for the format reversing | |
KEY1 = np.frombuffer(b"\x00\x08\x0e\t\x14<BFH\t\x14\x9a0\xa9T\xe1", dtype=np.uint8) | |
KEY2 = np.frombuffer(b"\x00\x0e\x08\x1e\x187\x12\x00H\x87F\x0b\x9ch\xa8K", dtype=np.uint8) | |
RBPACK_HEADER = b"BKNPAK" | |
DEFAULT_VERSION = b"\x00\x01" | |
class FileEntry: | |
name: str | |
comp_size: int | |
uncomp_size: int | |
def __init__(self, file): | |
self.name = read_encrypted_string(file) | |
self.comp_size = int.from_bytes(file.read(8),"little") | |
self.uncomp_size = int.from_bytes(file.read(8),"little") | |
def __repr__(self) -> str: | |
return f"{self.name} ({self.comp_size} -> {self.uncomp_size})" | |
def equals(a1:bytes, a2:bytes) -> bool: | |
if(len(a1) != len(a2)): | |
return False | |
return a1 == a2 | |
def read_string(file:BufferedReader) -> str: | |
str_length = file.read(1)[0] | |
return file.read(str_length).decode("utf-8") | |
def read_encrypted_string(file:BufferedReader) -> str: | |
str_length = file.read(1)[0] | |
offset = file.tell() | |
return decrypt(file.read(str_length),KEY2, offset).decode("utf-8") | |
def verify(verify_code:int, md5_hash:bytes) -> bool: | |
if(equals(md5(int.to_bytes(verify_code+2525,4,byteorder = "little")).digest(),md5_hash)): | |
return True | |
if(equals(md5(int.to_bytes(verify_code+5252,4,byteorder = "little")).digest(),md5_hash)): | |
return True | |
return False | |
def is_primary_type(filepath:str) -> bool: | |
return any([filepath.endswith(ext) for ext in (".cg",".cgh",".dlp_d",".exe",".dll",".dlp",".webm")]) | |
def extract_main_zip(file:BufferedReader, save_directory:str): | |
print("Extracting and decrypting main zip file...") | |
zip_length = int.from_bytes(file.read(8),"little") | |
offset = file.tell() | |
zip_extract_path = save_directory #os.path.join(save_directory,"unpack.zip") | |
if not os.path.exists(zip_extract_path): | |
os.mkdir(zip_extract_path) | |
main_zip = set() | |
with ZipFile(io.BytesIO(ZIP_HEADER+decrypt(file.read(zip_length-8), KEY1, offset))) as zipfile: | |
for entry_name in zipfile.namelist(): | |
save_path = os.path.join(zip_extract_path,entry_name.replace("/", os.path.sep)) | |
if(entry_name.endswith("/")):#directory entry | |
os.makedirs(save_path,exist_ok = True) | |
continue | |
os.makedirs(os.path.dirname(save_path),exist_ok = True) | |
if not os.path.isfile(save_path): | |
print(f"Extracting main zip's file {entry_name}...") | |
data = zipfile.read(entry_name) | |
data = decrypt(data, KEY1 if is_primary_type(save_path) else KEY2) | |
with open(save_path,"wb") as fs: | |
fs.write(data) | |
else: | |
print(f"Main zip's file {entry_name} already extracted") | |
main_zip.add(entry_name) | |
with open("main_zip_files.txt", "w", encoding="utf-8") as mzf: | |
mzf.write("\n".join(main_zip)) | |
""" | |
# Restore files from the same dir using main_zip_files.txt list | |
and the rest as File entries | |
main_zip = set() | |
with open("main_zip_files.txt", "r", encoding="utf-8") as f: | |
main_zip = set([line.strip() for line in f]) | |
""" | |
def sanitize_path(path:str) -> str: | |
return re.sub("\.+" + re.escape(os.path.sep), '', path.replace("/", os.path.sep)) | |
def extract_entries(file:BufferedReader, entry_list:list[FileEntry], save_dir:str): | |
print("Decrypting all entries...") | |
for e in entry_list: | |
offset = file.tell() | |
dest_path = os.path.join(save_dir, sanitize_path(e.name)) | |
if not os.path.isfile(dest_path): | |
print(f"Extracting {e.name} at {offset:02X} (size:{e.uncomp_size} B)...") | |
data = file.read(e.comp_size) | |
if(e.comp_size != e.uncomp_size): | |
data = zlib.decompress(data) | |
data = decrypt(data, KEY2, 0) | |
if not os.path.exists(os.path.dirname(dest_path)): | |
os.makedirs(os.path.dirname(dest_path), exist_ok = True) | |
with open(dest_path,"wb") as fs: | |
fs.write(data) | |
else: | |
print(f"File {e.name} already extracted") | |
def main(): | |
print(f"Usage: {os.path.basename(sys.argv[0]).replace('.py', '')} [rbpack_archive] [output_directory]") | |
if len(sys.argv) < 2: | |
print(f"Example: {sys.argv[0]} data.rbpack data") | |
archive_path = sys.argv[1] if len(sys.argv) > 1 else "data.rbpack" | |
save_directory = sys.argv[2] if len(sys.argv) > 2 else "data" | |
if not os.path.exists(save_directory): | |
os.mkdir(save_directory) | |
if not os.path.exists(archive_path): | |
print(f"file {archive_path} not found!") | |
exit() | |
with open(archive_path,"rb") as file: | |
if(file.read(len(RBPACK_HEADER)) != RBPACK_HEADER): | |
print("Signature mismatch!") | |
exit() | |
HEADER_VERSION = file.read(2) | |
print(f"Archive format version: {int.from_bytes(HEADER_VERSION, 'big')}") | |
resource_table_offset = int.from_bytes(file.read(8),"little")+file.tell() | |
loading_form_title = read_string(file) | |
verify_code = int.from_bytes(file.read(4), "little") | |
md5_hash_length = file.read(1)[0] | |
md5_hash = file.read(md5_hash_length) | |
if not verify(verify_code, md5_hash): | |
print("Archive verification failed") | |
exit() | |
extract_main_zip(file, save_directory) | |
file.seek(resource_table_offset, 0) | |
entry_count = int.from_bytes(file.read(4),"little") | |
entry_list = list() | |
for i in range(0,entry_count): | |
entry_list.append(FileEntry(file)) | |
extract_entries(file, entry_list, save_directory) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, sys, re | |
from zipfiledel import ZipFile, EditableZipFile, ZIP_DEFLATED | |
from time import mktime | |
import numpy as np | |
from encreader import EncrBufferedReader, decrypt | |
KEY1 = np.frombuffer(b"H\t\x14\x9a0\xa9T\xe1\x00\x08\x0e\t\x14<BF", dtype=np.uint8) | |
KEY2 = np.frombuffer(b"\x00\x0e\x08\x1e\x187\x12\x00H\x87F\x0b\x9ch\xa8K", dtype=np.uint8) | |
SGBPACK_HEADER = b"SGBDAT" | |
def is_primary_type(filepath:str) -> bool: | |
return any([filepath.endswith(ext) for ext in (".cg",".cgh",".dlp_d",".exe",".dll",".dlp",".webm")]) | |
def sanitize_path(path:str) -> str: | |
return re.sub("\.+" + re.escape(os.path.sep), '', path.replace("/", os.path.sep)) | |
def main(): | |
print(f"Usage: {os.path.basename(sys.argv[0]).replace('.py', '')} [sgbpack_archive] [output_directory]") | |
if len(sys.argv) < 2: | |
print(f"Example: {sys.argv[0]} data.sgbpack data") | |
archive_path = sys.argv[1] if len(sys.argv) > 1 else "data.sgbpack" | |
save_directory = sys.argv[2] if len(sys.argv) > 2 else "data" | |
if not os.path.exists(save_directory): | |
os.mkdir(save_directory) | |
if not os.path.exists(archive_path): | |
print(f"file {archive_path} not found!") | |
exit() | |
with open(archive_path,"rb") as file: | |
if(file.read(len(SGBPACK_HEADER)) != SGBPACK_HEADER): | |
print("Signature mismatch!") | |
exit() | |
HEADER_VERSION = file.read(2) | |
print(f"Archive format version: {int.from_bytes(HEADER_VERSION, 'big')}") | |
with EncrBufferedReader(open(archive_path, 'rb'), KEY=KEY1, new_header=SGBPACK_HEADER+HEADER_VERSION) as file: | |
print("Extracting and decrypting the package...") | |
dirs = {} | |
with ZipFile(file) as zipfile: | |
for entry in zipfile.infolist(): | |
name = ZipFile._sanitize_windows_name(entry.filename, os.path.sep) | |
mod_time = mktime(entry.date_time + (0, 0, -1)) | |
save_path = os.path.join(save_directory, name.replace("/", os.path.sep)) | |
if(name.endswith("/")): # directory entry | |
os.makedirs(save_path, exist_ok = True) | |
dirs[save_path[:-1]] = mod_time # last to do | |
continue | |
os.makedirs(os.path.dirname(save_path),exist_ok = True) | |
if not os.path.isfile(save_path): | |
print(f"Extracting {name}...") | |
data = zipfile.read(name) | |
data = decrypt(data, KEY2) | |
with open(save_path,"wb") as fs: | |
fs.write(data) | |
# set original modification time | |
os.utime(save_path, (mod_time, mod_time)) | |
else: | |
print(f"{name} already extracted") | |
#for name in dirs: | |
#mod_time = dirs[name] | |
#os.utime(name, (mod_time, mod_time)) # need to be enabled on windows to work? | |
def update_files_in_zip(zip_path, translation_dir): | |
with open(zip_path, 'r+b') as zip: | |
with EditableZipFile(zip, 'a', ZIP_DEFLATED) as zfile: | |
for folder, _, filenames in os.walk(translation_dir): | |
for name in filenames: | |
file_path = os.path.join(folder, name).repace(os.path.sep, '/') | |
if file_path in zfile.namelist(): | |
info = zfile.getinfo(name) | |
if info.date_time < os.path.getmtime(file_path): | |
with open(file_path, 'rb') as f: | |
zfile.replace(info, f.read()) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from zipfile import * | |
from zipfile import _get_compressor | |
from zlib import crc32 | |
from collections import deque | |
class EditableZipFile(ZipFile): | |
"""Implements ZipFile version that supports file deleting and replacing""" | |
def remove(self, zinfo_or_name: ZipInfo | str): | |
"""Remove a member from the archive. | |
### Parameters | |
zinfo_or_name : set[ZipInfo] | str | |
ZipInfo structure or name of the file. | |
""" | |
self._check_mod_state() | |
# Make sure we have an existing info object | |
zinfo_or_name = self._return_zinfo_from_zinfo_or_name(zinfo_or_name) | |
return self._remove_members({zinfo_or_name}) | |
def replace(self, zinfo_or_name: ZipInfo | str, data: bytes): | |
"""Replaces a member from the archive. | |
### Parameters | |
zinfo_or_name : set[ZipInfo] | str | |
ZipInfo structure or name of the file. | |
""" | |
self._check_mod_state() | |
# Make sure we have an existing info object | |
zinfo_or_name = self._return_zinfo_from_zinfo_or_name(zinfo_or_name) | |
return self._replace_member(zinfo_or_name, data) | |
def _return_zinfo_from_zinfo_or_name(self, zinfo_or_name): | |
if isinstance(zinfo_or_name, ZipInfo): | |
if zinfo_or_name not in self.filelist: | |
raise KeyError( | |
f"There is no file {zinfo_or_name} in the archive") | |
elif isinstance(zinfo_or_name, str): | |
zinfo_or_name = self.getinfo(zinfo_or_name) | |
else: | |
raise KeyError( | |
f"Key should be ZipInfo or string but got {type(zinfo_or_name)}") | |
return zinfo_or_name | |
def _check_mod_state(self): | |
if self.mode not in ('w', 'x', 'a'): | |
raise ValueError("This function requires mode 'w', 'x', or 'a'") | |
if not self.fp: | |
raise ValueError( | |
"Attempt to write to a closed ZIP archive") | |
if self._writing: | |
raise ValueError( | |
"Can't write to ZIP archive while an open writing handle exists") | |
def _remove_members(self, members: set[ZipInfo], *, scrub: bool = True, chunk_size: int = 2 ** 20): | |
"""Remove members in a zip file. | |
All members (as ZipInfo) should exist in the zip; | |
otherwise it'll end in an inconsistent state. | |
### Parameters | |
members : set[ZipInfo] | |
Input members set. | |
scrub : bool | |
Remove previously occupied space from the archive. | |
chunk_size : int | |
Chunk size for reading (default: `1048576`) | |
""" | |
fp = self.fp | |
entry_offset = 0 | |
member_seen = False | |
# get a sorted filelist by header offset | |
filelist = deque(sorted(self.filelist, key=lambda x: x.header_offset)) | |
while filelist: | |
info = filelist.popleft() | |
is_member = info in members | |
if not (member_seen or is_member): | |
continue | |
# get the total size of the entry | |
next_header_offset = next((entry.header_offset for entry in filelist), self.start_dir) | |
entry_size = next_header_offset - info.header_offset | |
if is_member: | |
member_seen = True | |
entry_offset += entry_size | |
# update caches | |
self.filelist.remove(info) | |
if info.filename in self.NameToInfo: | |
del self.NameToInfo[info.filename] | |
continue | |
if scrub: | |
# update the header and move entry data to the new position | |
old_header_offset = info.header_offset | |
info.header_offset -= entry_offset | |
fp.seek(old_header_offset) | |
read_size = 0 | |
while read_size < entry_size: | |
data = fp.read(min(entry_size - read_size, chunk_size)) | |
fp.seek(info.header_offset + read_size) | |
fp.write(data) | |
fp.flush() | |
read_size += len(data) | |
# Avoid missing entry if entries have a duplicated name. | |
# Reverse the order as NameToInfo normally stores the last added one. | |
for info in reversed(self.filelist): | |
self.NameToInfo.setdefault(info.filename, info) | |
# update state | |
if scrub: | |
self.start_dir -= entry_offset | |
self._didModify = True | |
# seek to the start of the central dir | |
fp.seek(self.start_dir) | |
def _replace_member(self, member: ZipInfo, data: bytes, owerwrite_mode=True): | |
"""Replace a member in a zip file with new data. | |
If the new data takes less space than the old data, it is written in place. | |
If the new data takes more space, it is added after all the old data. | |
### Parameters | |
member : ZipInfo | |
The member to replace. | |
data : bytes | |
The new data to write. | |
owerwrite_mode : Optional[bool] | |
`False` = default replace with deleting; | |
`True` = try to replace data in place. | |
""" | |
if member not in self.filelist: | |
raise ValueError("Member not found in zip file.") | |
compress_type = member.compress_type | |
compress_level = member.compress_level | |
# Compress the new data | |
_compressor = _get_compressor(compress_type, compress_level) | |
compressed_data = _compressor.compress(data) + _compressor.flush() | |
# Check if the new data takes less space than the old data | |
compressed_len = len(compressed_data) | |
if owerwrite_mode and compressed_len <= member.compress_size: | |
# Write the new data in place of the old data | |
self.fp.seek(member.header_offset + 30 + len(member.filename) + len(member.extra)) | |
self.fp.write(compressed_data) | |
pad = member.compress_size - compressed_len | |
if pad: | |
self.fp.write(b'\0' * pad) | |
self.fp.flush() | |
# Update the member info | |
member.compress_size = compressed_len | |
new_member.compress_type = compress_type | |
member.file_size = len(data) | |
member.CRC = crc32(data) | |
else: | |
# Create a new member with the updated data | |
new_member = ZipInfo(member.filename, member.date_time) | |
new_member.compress_type = compress_type | |
new_member.file_size = len(data) | |
new_member.CRC = crc32(data) | |
# Remove the old member | |
self._remove_members({member}) | |
self._writecheck(new_member) | |
# Write the new member to the end of the archive | |
self.fp.seek(self.start_dir) | |
self.writestr(new_member, data) | |
# Update the central directory | |
self._didModify = True | |
self.start_dir = self.fp.tell() | |
self.fp.seek(self.start_dir) | |
self._write_end_record() | |
if __name__ == '__main__': | |
with EditableZipFile(open("test.zip", "r+b"), "a") as zip: | |
with open("zipfiledel.py", "rb") as f: | |
zip.replace("zipfiledel.py", f.read()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment