Skip to content

Instantly share code, notes, and snippets.

@imLinguin
Last active September 6, 2023 13:02
Show Gist options
  • Save imLinguin/603c2d879c3db29eb8fff604216adfc4 to your computer and use it in GitHub Desktop.
Save imLinguin/603c2d879c3db29eb8fff604216adfc4 to your computer and use it in GitHub Desktop.
Demo showcasing a way to parse Linux native installer without access to whole file
# MIT LICENSE
# Copyright © 2023 Paweł Lidwin
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software
# and associated documentation files (the “Software”), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish, distribute,
# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import zlib
import requests
import logging
# =============== API =============
END_OF_CENTRAL_DIRECTORY = b"\x50\x4b\x05\x06"
CENTRAL_DIRECTORY = b"\x50\x4b\x01\x02"
LOCAL_FILE_HEADER = b"\x50\x4b\x03\x04"
# ZIP64
ZIP_64_END_OF_CD_LOCATOR = b"\x50\x4b\x06\x07"
ZIP_64_END_OF_CD = b"\x50\x4b\x06\x06"
class LocalFile:
def load_data(self, handler):
return handler.get_bytes_from_file(
from_b=self.last_byte + self.relative_local_file_offset,
size=self.compressed_size,
raw_response=True
)
@classmethod
def from_bytes(cls, data, offset, handler):
local_file = cls()
local_file.relative_local_file_offset = None
local_file.version_needed = data[4:6]
local_file.general_purpose_bit_flag = data[6:8]
local_file.compression_method = int.from_bytes(data[8:10], "little")
local_file.last_modification_time = data[10:12]
local_file.last_modification_date = data[12:14]
local_file.crc32 = data[14:18]
local_file.compressed_size = int.from_bytes(data[18:22], "little")
local_file.uncompressed_size = int.from_bytes(data[22:26], "little")
local_file.file_name_length = int.from_bytes(data[26:28], "little")
local_file.extra_field_length = int.from_bytes(data[28:30], "little")
extra_data = handler.get_bytes_from_file(
from_b=30 + offset,
size=local_file.file_name_length + local_file.extra_field_length,
)
local_file.file_name = bytes(
extra_data[0: local_file.file_name_length]
).decode()
local_file.extra_field = data[
local_file.file_name_length: local_file.file_name_length
+ local_file.extra_field_length
]
local_file.last_byte = (
local_file.file_name_length + local_file.extra_field_length + 30
)
return local_file
def __str__(self):
return f"\nCompressionMethod: {self.compression_method} \nFileNameLen: {self.file_name_length} \nFileName: {self.file_name} \nCompressedSize: {self.compressed_size} \nUncompressedSize: {self.uncompressed_size}"
class CentralDirectoryFile:
def __init__(self):
self.version_made_by: bytes
self.version_needed_to_extract: bytes
self.general_purpose_bit_flag: bytes
self.compression_method: bytes
self.last_modification_time: bytes
self.last_modification_date: bytes
self.crc32: bytes
self.compressed_size: int
self.uncompressed_size: int
self.file_name_length: int
self.extra_field_length: int
self.file_comment_length: int
self.disk_number_start: bytes
self.int_file_attrs: bytes
self.ext_file_attrs: bytes
self.relative_local_file_offet: int
self.file_name: str
self.extra_field: bytes
self.comment: bytes
self.last_byte: int
@classmethod
def from_bytes(cls, data):
cd_file = cls()
cd_file.version_made_by = data[4:6]
cd_file.version_needed_to_extract = data[6:8]
cd_file.general_purpose_bit_flag = data[8:10]
cd_file.compression_method = data[10:12]
cd_file.last_modification_time = data[12:14]
cd_file.last_modification_date = data[14:16]
cd_file.crc32 = data[16:20]
cd_file.compressed_size = int.from_bytes(data[20:24], "little")
cd_file.uncompressed_size = int.from_bytes(data[24:28], "little")
cd_file.file_name_length = int.from_bytes(data[28:30], "little")
cd_file.extra_field_length = int.from_bytes(data[30:32], "little")
cd_file.file_comment_length = int.from_bytes(data[32:34], "little")
cd_file.disk_number_start = data[34:36]
cd_file.int_file_attrs = data[36:38]
cd_file.ext_file_attrs = data[38:42]
cd_file.relative_local_file_offset = int.from_bytes(data[42:46], "little")
extra_field_start = 46 + cd_file.file_name_length
cd_file.file_name = bytes(data[46:extra_field_start]).decode("iso-8859-15")
cd_file.extra_field = data[
extra_field_start: extra_field_start + cd_file.extra_field_length
]
comment_start = extra_field_start + cd_file.extra_field_length
cd_file.comment = data[
comment_start: comment_start + cd_file.file_comment_length
]
cd_file.last_byte = comment_start + cd_file.file_comment_length
return cd_file, comment_start + cd_file.file_comment_length
def __str__(self):
return f"\nCompressionMethod: {self.compression_method} \nFileNameLen: {self.file_name_length} \nFileName: {bytes(self.file_name).decode('iso-8859-15')} \nStartDisk: {self.disk_number_start} \nCompressedSize: {self.compressed_size} \nUncompressedSize: {self.uncompressed_size}"
def __repr__(self):
return self.file_name
class CentralDirectory:
def __init__(self):
self.files = []
@staticmethod
def create_central_dir_file(data):
return CentralDirectoryFile.from_bytes(data)
@classmethod
def from_bytes(cls, data, n):
central_dir = cls()
for record in range(n):
cd_file, next_offset = central_dir.create_central_dir_file(data)
central_dir.files.append(cd_file)
data = data[next_offset:]
return central_dir
class Zip64EndOfCentralDirLocator:
def __init__(self):
self.number_of_disk = None
self.zip64_end_of_cd_offset = None
self.total_number_of_disks = None
@classmethod
def from_bytes(cls, data):
zip64_end_of_cd = cls()
zip64_end_of_cd.number_of_disk = int.from_bytes(data[4:8], "little")
zip64_end_of_cd.zip64_end_of_cd_offset = int.from_bytes(data[8:16], "little")
zip64_end_of_cd.total_number_of_disks = int.from_bytes(data[16:20], "little")
return zip64_end_of_cd
def __str__(self):
return f"\nZIP64EOCDLocator\nDisk Number: {self.number_of_disk}\nZ64_EOCD Offset: {self.zip64_end_of_cd_offset}\nNumber of disks: {self.total_number_of_disks}"
class Zip64EndOfCentralDir:
def __init__(self):
self.size = None
self.version_made_by = None
self.version_needed = None
self.number_of_disk = None
self.central_directory_start_disk = None
self.number_of_entries_on_this_disk = None
self.number_of_entries_total = None
self.size_of_central_directory = None
self.central_directory_offset = None
self.extensible_data = None
@classmethod
def from_bytes(cls, data):
end_of_cd = cls()
end_of_cd.size = int.from_bytes(data[4:12], "little")
end_of_cd.version_made_by = data[12:14]
end_of_cd.version_needed = data[14:16]
end_of_cd.number_of_disk = data[16:20]
end_of_cd.central_directory_start_disk = data[20:24]
end_of_cd.number_of_entries_on_this_disk = int.from_bytes(data[24:32], "little")
end_of_cd.number_of_entries_total = int.from_bytes(data[32:40], "little")
end_of_cd.size_of_central_directory = int.from_bytes(data[40:48], "little")
end_of_cd.central_directory_offset = int.from_bytes(data[48:56], "little")
return end_of_cd
def __str__(self) -> str:
return f"\nZ64 EndOfCD\nSize: {self.size}\nNumber of disk: {self.number_of_disk}\nEntries on this disk: {self.number_of_entries_on_this_disk}\nEntries total: {self.number_of_entries_total}\nCD offset: {self.central_directory_offset}"
class EndOfCentralDir:
def __init__(self):
self.number_of_disk = None
self.central_directory_disk = None
self.central_directory_records = None
self.size_of_central_directory = None
self.central_directory_offset = None
self.comment_length = None
self.comment = None
@classmethod
def from_bytes(cls, data):
central_dir = cls()
central_dir.number_of_disk = data[4:6]
central_dir.central_directory_disk = data[6:8]
central_dir.central_directory_records = int.from_bytes(data[8:10], "little")
central_dir.size_of_central_directory = int.from_bytes(data[12:16], "little")
central_dir.central_directory_offset = int.from_bytes(data[16:20], "little")
central_dir.comment_length = data[20:22]
central_dir.comment = data[
22: 22 + int.from_bytes(central_dir.comment_length, "little")
]
return central_dir
def __str__(self):
return f"\nDiskNumber: {self.number_of_disk} \nCentralDirDisk: {self.central_directory_disk} \nCentralDirRecords: {self.central_directory_records} \nCentralDirSize: {self.size_of_central_directory} \nCentralDirOffset: {self.central_directory_offset}\n\n"
class InstallerHandler:
def __init__(self, url):
self.url = url
self.bytes_read = 0
self.session = requests.session()
self.file_size = None
beginning_of_file = self.get_bytes_from_file(
from_b=0, size=1024 * 1024 * 2, add_archive_index=False
)
self.start_of_archive_index = beginning_of_file.find(LOCAL_FILE_HEADER)
def get_bytes_from_file(self, from_b=-1, size=None, add_archive_index=True, raw_response=False):
if add_archive_index:
from_b += self.start_of_archive_index
from_b_repr = str(from_b) if from_b > -1 else ""
if size:
end_b = from_b + size - 1
else:
end_b = ""
range_header = self.get_range_header(from_b_repr, end_b)
response = self.session.get(self.url, headers={'Range': range_header,
'User-Agent': 'Heroic-Gogdl Linux Downloads PoC (1.0.0)'},
allow_redirects=False, stream=raw_response)
if response.status_code == 302:
self.url = response.headers.get("Location") or self.url
return self.get_bytes_from_file(from_b, size, add_archive_index, raw_response)
if not self.file_size:
self.file_size = int(response.headers.get("Content-Range").split("/")[-1])
if raw_response:
return response
else:
data = response.content
self.bytes_read += len(data)
return data
def get_range_header(self, from_b="", to_b=""):
return f"bytes={from_b}-{to_b}"
# ========================= Functionality ============================
def download(url):
handler = InstallerHandler(url)
if handler.start_of_archive_index < 0:
raise Exception("Start of the archive couldnt be calculated")
print("Start of archive index:", handler.start_of_archive_index)
end_of_cd_data = handler.get_bytes_from_file(
from_b=handler.file_size - 100, add_archive_index=False
)
end_of_cd_header_data_index = end_of_cd_data.find(END_OF_CENTRAL_DIRECTORY)
zip64_end_of_cd_locator_index = end_of_cd_data.find(ZIP_64_END_OF_CD_LOCATOR)
end_of_cd = EndOfCentralDir.from_bytes(end_of_cd_data[end_of_cd_header_data_index:])
central_directory_offset = end_of_cd.central_directory_offset
size_of_central_directory =end_of_cd.size_of_central_directory
if central_directory_offset == int.from_bytes(b"\xFF\xFF\xFF\xFF", "little") and zip64_end_of_cd_locator_index > -1:
zip64_end_of_cd_locator = Zip64EndOfCentralDirLocator.from_bytes(end_of_cd_data[zip64_end_of_cd_locator_index:])
zip64_end_of_cd_data = handler.get_bytes_from_file(from_b=zip64_end_of_cd_locator.zip64_end_of_cd_offset, size=200)
zip64_end_of_cd = Zip64EndOfCentralDir.from_bytes(zip64_end_of_cd_data)
central_directory_offset = zip64_end_of_cd.central_directory_offset
size_of_central_directory = zip64_end_of_cd.size_of_central_directory
print("Central directory offset", central_directory_offset)
print("Central directory size", size_of_central_directory / 1024 / 1024)
central_directory_data = handler.get_bytes_from_file(
from_b=central_directory_offset,
size=size_of_central_directory
)
print("Parsing central directory")
central_directory = CentralDirectory.from_bytes(
central_directory_data, end_of_cd.central_directory_records
)
print("\nCentral Directory files:", len(central_directory.files))
filtered_files: list[CentralDirectoryFile] = []
# Loading LocalFile headers
for file in central_directory.files:
# Filter out folders and not game files related files
if file.file_name.startswith("data/noarch/") and not file.file_name.endswith("/"):
filtered_files.append(file)
print("Filtered files:", len(filtered_files))
for file in filtered_files:
# Get file permissions from Central Directory object
file_permissions = bin(int.from_bytes(file.ext_file_attrs, "little"))[9:][:9]
# Load local file header
file_data = handler.get_bytes_from_file(
from_b=file.relative_local_file_offset,
size=30,
)
local_file = LocalFile.from_bytes(
file_data,
file.relative_local_file_offset,
handler, # Passsing in handler to be able to pull more data
)
local_file.relative_local_file_offset = file.relative_local_file_offset
# For our purposes unpack whole thing into testdir into current working directory
path = local_file.file_name.replace("data/noarch/", "testdir/")
print("Downloading", path)
directory, name = os.path.split(path)
os.makedirs(directory, exist_ok=True)
try:
with open(path, "wb") as f:
response = local_file.load_data(handler)
decompressor = zlib.decompressobj(-15)
for data in response.iter_content(1024*1024):
decompressed = None
if local_file.compression_method == 8:
decompressed = decompressor.decompress(data)
elif local_file.compression_method == 0:
decompressed = data
else:
print("Unsupported compression method", local_file.compression_method)
f.write(decompressed)
f.flush()
f.close()
except zlib.error:
print(local_file)
raise
# Set file permissions
os.chmod(path, int(f"0b{file_permissions}", base=0))
print("\n\nTotal bytes read:", handler.bytes_read)
def filter_linux_installers(installers):
linux_installers = []
# Filter out linux installers
for installer in installers:
if installer["os"] == "linux":
linux_installers.append(installer)
return linux_installers
if __name__ == "__main__":
id = sys.argv[1]
token = sys.argv[2]
logging.basicConfig(level=logging.DEBUG)
game_data = requests.get(f"https://api.gog.com/products/{id}?expand=downloads,expanded_dlcs",
headers={"User-Agent": "Heroic-Gogdl Linux Downloads PoC (1.0.0)"})
json_game_data = game_data.json()
installers = filter_linux_installers(json_game_data["downloads"]["installers"])
if len(installers) == 0:
print("No installers to download")
exit()
downlink = requests.get(installers[0]["files"][0]["downlink"],
headers={"User-Agent": "Heroic-Gogdl Linux Downloads PoC (1.0.0)",
"Authorization": f"Bearer {token}"}, allow_redirects=True).json()["downlink"]
download(downlink)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment