Last active
April 23, 2023 17:31
-
-
Save anezih/104913ede563024f814a49fa42d8a0ad to your computer and use it in GitHub Desktop.
Read dictzip files without having to decompress them first. Useful for StarDict .dict.dz files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://github.com/anezih/StarDictNet/blob/main/StarDictNet/DictZip.cs | |
# https://framagit.org/tuxor1337/dictzip.js/-/blob/main/dictzip_sync.js | |
import os | |
import zlib | |
from dataclasses import dataclass | |
from struct import unpack | |
class SUBFIELD: | |
def __init__(self) -> None: | |
self.SI1: str | |
self.SI2: str | |
self.LEN: int | |
self.DATA: bytes | |
@dataclass | |
class FEXTRA: | |
XLEN: int | |
SUBFIELDS: list[SUBFIELD] | |
class DictZipHeader: | |
def __init__(self) -> None: | |
self.ID1: bytes | |
self.ID2: bytes | |
self.CM: bytes | |
self.FLG: bytes | |
self.MTIME: int | |
self.XFL: bytes | |
self.OS: bytes | |
self.FEXTRA: FEXTRA = FEXTRA(0, []) | |
self.FNAME: str | |
self.FCOMMENT: str | |
self.FHCRC: int | |
self.LENGTH: int | |
class DictZip: | |
def __init__(self, filename: str) -> None: | |
self.file_obj = open(filename, "rb") | |
self.size = os.stat(filename).st_size | |
self.header = DictZipHeader() | |
self.FTEXT: bytes = 0x01 | |
self.FHCRC: bytes = 0x02 | |
self.FEXTRA: bytes = 0x04 | |
self.FNAME: bytes = 0x08 | |
self.FCOMMENT: bytes = 0x10 | |
self.VER: int = 0 | |
self.CHLEN: int = 0 | |
self.CHCOUNT: int = 0 | |
self.CHUNKS: list[tuple[int, int]] = [] | |
self.__read_header() | |
self.__get_chunks() | |
def __read_header(self) -> None: | |
pos = 0 | |
first_10_bytes = self.file_obj.read(10) | |
self.header.ID1 = first_10_bytes[0] | |
self.header.ID2 = first_10_bytes[1] | |
if self.header.ID1 != 0x1F or self.header.ID2 != 0x8B: | |
print("Not a valid gzip header.") | |
raise Exception | |
self.header.CM = first_10_bytes[2] | |
self.header.FLG = first_10_bytes[3] | |
self.header.MTIME = first_10_bytes[4] << 0 | |
self.header.MTIME |= first_10_bytes[5] << 8 | |
self.header.MTIME |= first_10_bytes[6] << 16 | |
self.header.MTIME |= first_10_bytes[7] << 24 | |
self.header.XFL = first_10_bytes[8] | |
self.header.OS = first_10_bytes[9] | |
pos += 10 | |
if (self.header.FLG & self.FEXTRA) != 0x00: | |
self.file_obj.seek(pos, 0) | |
_fextra = self.file_obj.read(2) | |
self.header.FEXTRA.XLEN = unpack("<H", _fextra)[0] | |
pos += 2 | |
self.file_obj.seek(pos, 0) | |
fextra_subfields = self.file_obj.read(self.header.FEXTRA.XLEN) | |
while True: | |
_len = fextra_subfields[2] + 256*fextra_subfields[3] | |
s = SUBFIELD() | |
s.SI1 = chr(fextra_subfields[0]) | |
s.SI2 = chr(fextra_subfields[1]) | |
s.LEN = _len | |
s.DATA = fextra_subfields[4:(4+_len)] | |
self.header.FEXTRA.SUBFIELDS.append(s) | |
fextra_subfields = fextra_subfields[(4+_len):len(fextra_subfields)] | |
if len(fextra_subfields) == 0: | |
break | |
pos += self.header.FEXTRA.XLEN | |
if (self.header.FLG & self.FNAME) != 0x00: | |
self.file_obj.seek(pos, 0) | |
fname_temp = self.file_obj.read(1024) | |
idx = fname_temp.index(b"\x00") | |
self.header.FNAME = fname_temp[0:idx].decode("latin1") | |
pos += len(self.header.FNAME) + 1 | |
if (self.header.FLG & self.FCOMMENT) != 0x00: | |
self.file_obj.seek(pos, 0) | |
fcomment_temp = self.file_obj.read(1024) | |
idx = fcomment_temp.index(b"\x00") | |
self.header.FCOMMENT = fcomment_temp[0:idx].decode("latin1") | |
pos += len(self.header.FCOMMENT) + 1 | |
if (self.header.FLG & self.FHCRC) != 0x00: | |
self.file_obj.seek(pos, 0) | |
fhcrc = self.file_obj.read(2) | |
self.header.FHCRC = unpack("<H", fhcrc) | |
pos += 2 | |
self.header.LENGTH = pos | |
def __get_chunks(self) -> None: | |
found = False | |
sf = None | |
for i in self.header.FEXTRA.SUBFIELDS: | |
if i.SI1 == 'R' or i.SI2 == 'A': | |
found = True | |
sf = i | |
break | |
if not found: | |
print("Not a dictzip file.") | |
raise Exception | |
else: | |
data = sf.DATA | |
self.VER = data[0] + 256 * data[1] | |
self.CHLEN = data[2] + 256 * data[3] | |
self.CHCOUNT = data[4] + 256 * data[5] | |
j = 0 | |
chpos = 0 | |
while (j < self.CHCOUNT) and (2*j + 6 < len(data)): | |
tmp_chlen = data[2*j+6] + 256*data[2*j+7] | |
pair = (chpos, tmp_chlen) | |
self.CHUNKS.append(pair) | |
chpos += tmp_chlen | |
j += 1 | |
def read_at(self, pos: int, length: int): | |
if pos < 0 or length > self.size: | |
return | |
first_chunk = min((pos//self.CHLEN), (len(self.CHUNKS) - 1)) | |
last_chunk = min((pos+length//self.CHLEN), (len(self.CHUNKS) - 1)) | |
offset = pos - (first_chunk * self.CHLEN) | |
finish = offset + length | |
self.file_obj.seek(self.header.LENGTH + self.CHUNKS[first_chunk][0], 0) | |
in_bytes = self.file_obj.read(self.header.LENGTH + self.CHUNKS[last_chunk][0] + self.CHUNKS[last_chunk][1]) | |
f = first_chunk | |
z = 0 | |
parts = bytearray() | |
while (f <= last_chunk) and (z < len(in_bytes)): | |
chunk = in_bytes[z:(z+self.CHUNKS[f][1])] | |
# https://docs.python.org/3/library/zlib.html#zlib.decompress -> -15 for raw stream | |
z_obj = zlib.decompressobj(wbits=-15) | |
parts += z_obj.decompress(chunk) | |
z += self.CHUNKS[f][1] | |
f += 1 | |
try: | |
return parts[offset:finish].decode() | |
except UnicodeDecodeError: | |
print("Couldn't decode requested slice.") | |
return None | |
def __del__(self) -> None: | |
self.file_obj.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment