-
-
Save drego85/f3fa845b2f14b29e926417d697131134 to your computer and use it in GitHub Desktop.
Fix APK (Android Package) files that have corrupt or non-standard ZIP headers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# This Python script is designed to fix APK (Android Package) | |
# files that have corrupt or non-standard ZIP headers. | |
# Specifically, it corrects invalid compression methods, sets | |
# the compression method to “no compression” and updates the | |
# file sizes in the headers. This allows the APK to conform | |
# to the standard ZIP specification, facilitating analysis | |
# and decompilation with tools such as JADX. | |
# | |
# Usage: apk-patcher.py <file.apk> [--debug] | |
# | |
# Many thanks to @qfalconer for the initial reverse engineering: | |
# https://github.com/skylot/jadx/pull/2298 | |
# | |
# Made with ♥ by Andrea Draghetti | |
# | |
# This file may be licensed under the terms of of the | |
# GNU General Public License Version 3 (the ``GPL''). | |
# | |
import os | |
import sys | |
import shutil | |
import struct | |
import tempfile | |
import logging | |
from pathlib import Path | |
from typing import BinaryIO, Dict, List, Union | |
# Configure logging | |
logging.basicConfig(format='%(levelname)s: %(message)s') | |
logger = logging.getLogger('APKPatcher') | |
class APKPatcher: | |
""" | |
APK file patcher that fixes corrupted ZIP headers commonly used to evade analysis. | |
This patcher handles cases where: | |
1. Compression method in headers is invalid | |
2. Compressed size in headers is invalid | |
3. Extra data in Local Headers is invalid | |
""" | |
# ZIP format signatures | |
END_OF_CENTRAL_DIR_SIGNATURE = 0x06054b50 # End of Central Directory Record | |
CENTRAL_DIR_SIGNATURE = 0x02014b50 # Central Directory Header | |
LOCAL_HEADER_SIGNATURE = 0x04034b50 # Local File Header | |
def __init__(self, apk_path: Union[str, Path], debug: bool = False): | |
""" | |
Initialize the patcher with the target APK file. | |
Args: | |
apk_path: Path to the APK file | |
debug: Enable debug logging | |
""" | |
self.apk_path = Path(apk_path) | |
if not self.apk_path.suffix.lower() == '.apk': | |
raise ValueError("File must be an APK") | |
# Set logging level based on debug flag | |
logger.setLevel(logging.DEBUG if debug else logging.INFO) | |
def patch(self) -> Path: | |
""" | |
Apply necessary patches to the APK file. | |
Returns: | |
Path to the patched file, or original file if no patching was needed | |
""" | |
logger.info(f"Starting analysis of {self.apk_path}") | |
# Create a temporary copy of the file | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.apk') | |
temp_path = Path(temp_file.name) | |
shutil.copy2(self.apk_path, temp_path) | |
logger.debug(f"Created temporary file: {temp_path}") | |
try: | |
with open(temp_path, 'rb+') as f: | |
# Find End of Central Directory | |
eocd_offset = self._find_end_of_central_dir(f) | |
logger.debug(f"Found End of Central Directory at offset: 0x{eocd_offset:08X}") | |
# Read Central Directory offset and number of entries | |
f.seek(eocd_offset + 0x10) | |
cdir_offset = struct.unpack('<I', f.read(4))[0] | |
f.seek(eocd_offset + 0x0A) | |
num_entries = struct.unpack('<H', f.read(2))[0] | |
logger.debug(f"Central Directory starts at: 0x{cdir_offset:08X}") | |
logger.debug(f"Number of entries: {num_entries}") | |
# Lists to track headers that need fixing | |
cdir_entries_to_fix: List[int] = [] | |
local_headers: List[int] = [] | |
# Analyze Central Directory | |
offset = cdir_offset | |
for entry_idx in range(num_entries): | |
logger.debug(f"\nAnalyzing Central Directory entry {entry_idx + 1}/{num_entries}") | |
info = self._read_header(f, offset) | |
if not self._is_valid_compression(info['compression']): | |
logger.debug(f"Found invalid compression method: 0x{info['compression']:04X}") | |
cdir_entries_to_fix.append(offset) | |
# Read local header offset | |
f.seek(offset + 0x2A) | |
local_offset = struct.unpack('<I', f.read(4))[0] | |
local_headers.append(local_offset) | |
logger.debug(f"Local header offset: 0x{local_offset:08X}") | |
offset += info['data_offset'] | |
# Find local headers that need fixing | |
local_headers_to_fix: List[int] = [] | |
for idx, offset in enumerate(local_headers): | |
logger.debug(f"\nAnalyzing Local Header {idx + 1}/{len(local_headers)}") | |
info = self._read_header(f, offset) | |
if not self._is_valid_compression(info['compression']): | |
logger.debug(f"Found invalid compression method: 0x{info['compression']:04X}") | |
local_headers_to_fix.append(offset) | |
# If nothing needs fixing, use original file | |
if not cdir_entries_to_fix and not local_headers_to_fix: | |
logger.info("No corrupted headers found, APK is clean") | |
os.unlink(temp_path) | |
return self.apk_path | |
logger.info(f"Found {len(cdir_entries_to_fix)} Central Directory entries and " | |
f"{len(local_headers_to_fix)} Local Headers to fix") | |
# Fix Central Directory entries | |
for offset in cdir_entries_to_fix: | |
logger.debug(f"\nFixing Central Directory entry at offset 0x{offset:08X}") | |
info = self._read_header(f, offset) | |
# Set compression method to 0 (stored) | |
f.seek(offset + 0x0A) | |
f.write(struct.pack('<H', 0)) | |
logger.debug("Set compression method to 0 (stored)") | |
# Update compressed size | |
f.seek(offset + 0x14) | |
f.write(struct.pack('<I', info['uncompressed_size'])) | |
logger.debug(f"Updated compressed size to: {info['uncompressed_size']}") | |
# Fix Local headers | |
for offset in local_headers_to_fix: | |
logger.debug(f"\nFixing Local Header at offset 0x{offset:08X}") | |
info = self._read_header(f, offset) | |
# Set compression method to 0 (stored) | |
f.seek(offset + 0x08) | |
f.write(struct.pack('<H', 0)) | |
logger.debug("Set compression method to 0 (stored)") | |
# Update compressed size | |
f.seek(offset + 0x12) | |
f.write(struct.pack('<I', info['uncompressed_size'])) | |
logger.debug(f"Updated compressed size to: {info['uncompressed_size']}") | |
# Clear extra length | |
f.seek(offset + 0x1C) | |
f.write(struct.pack('<H', 0)) | |
logger.debug("Cleared extra field length") | |
# Move data block if needed | |
if info['extra_len'] > 0: | |
logger.debug(f"Moving data block back by {info['extra_len']} bytes") | |
self._move_block_back(f, | |
offset + info['data_offset'], | |
info['uncompressed_size'], | |
info['extra_len']) | |
logger.info("Successfully patched all corrupted headers") | |
return temp_path | |
except Exception as e: | |
logger.error(f"Error while patching: {e}") | |
os.unlink(temp_path) | |
raise e | |
def _find_end_of_central_dir(self, f: BinaryIO) -> int: | |
""" | |
Find the offset of End of Central Directory record. | |
Args: | |
f: File object positioned at the start | |
Returns: | |
Offset of the End of Central Directory record | |
Raises: | |
ValueError: If End of Central Directory record is not found | |
""" | |
f.seek(0, 2) # Go to end of file | |
file_size = f.tell() | |
logger.debug(f"Searching for End of Central Directory in file size: {file_size}") | |
# Search for EOCD signature from the end | |
for offset in range(file_size - 4, -1, -1): | |
f.seek(offset) | |
if struct.unpack('<I', f.read(4))[0] == self.END_OF_CENTRAL_DIR_SIGNATURE: | |
return offset | |
raise ValueError("Invalid ZIP file: End of Central Directory record not found") | |
def _read_header(self, f: BinaryIO, offset: int) -> Dict: | |
""" | |
Read ZIP header information. | |
Args: | |
f: File object | |
offset: Offset where the header starts | |
Returns: | |
Dictionary containing header information | |
Raises: | |
ValueError: If header signature is invalid | |
""" | |
f.seek(offset) | |
signature = struct.unpack('<I', f.read(4))[0] | |
if signature not in [self.CENTRAL_DIR_SIGNATURE, self.LOCAL_HEADER_SIGNATURE]: | |
raise ValueError(f"Invalid ZIP header signature {hex(signature)} at offset {hex(offset)}") | |
is_central = signature == self.CENTRAL_DIR_SIGNATURE | |
delta = 0 if is_central else -2 | |
# Read compression method | |
f.seek(offset + 0x0A + delta) | |
compression = struct.unpack('<H', f.read(2))[0] | |
# Read uncompressed size | |
f.seek(offset + 0x18 + delta) | |
uncompressed_size = struct.unpack('<I', f.read(4))[0] | |
# Read name, extra and comment lengths | |
f.seek(offset + 0x1C + delta) | |
name_len = struct.unpack('<H', f.read(2))[0] | |
extra_len = struct.unpack('<H', f.read(2))[0] | |
comment_len = struct.unpack('<H', f.read(2))[0] if is_central else 0 | |
return { | |
'compression': compression, | |
'uncompressed_size': uncompressed_size, | |
'data_offset': (0x2E if is_central else 0x1E) + name_len + extra_len + comment_len, | |
'extra_len': extra_len | |
} | |
def _is_valid_compression(self, compression: int) -> bool: | |
""" | |
Check if compression method is valid. | |
Args: | |
compression: Compression method value | |
Returns: | |
True if compression method is valid, False otherwise | |
""" | |
return compression in [0x0, 0x8] # 0 = stored (no compression), 8 = deflate | |
def _move_block_back(self, f: BinaryIO, offset: int, size: int, delta: int): | |
""" | |
Move a block of data backwards by delta bytes. | |
Args: | |
f: File object | |
offset: Start offset of the block | |
size: Size of the block to move | |
delta: Number of bytes to move back | |
""" | |
buffer_size = 1024 * 1024 # 1MB buffer | |
while size > 0: | |
chunk_size = min(buffer_size, size) | |
# Read chunk | |
f.seek(offset) | |
data = f.read(chunk_size) | |
# Write chunk at new position | |
f.seek(offset - delta) | |
f.write(data) | |
size -= chunk_size | |
offset += chunk_size | |
logger.debug(f"Moved {chunk_size} bytes, {size} remaining") | |
def main(): | |
if len(sys.argv) not in [2, 3]: | |
print(f"Usage: {sys.argv[0]} <file.apk> [--debug]") | |
sys.exit(1) | |
debug = "--debug" in sys.argv | |
apk_path = sys.argv[1] if not debug else (sys.argv[2] if sys.argv[1] == "--debug" else sys.argv[1]) | |
try: | |
patcher = APKPatcher(apk_path, debug) | |
patched_file = patcher.patch() | |
if patched_file == Path(apk_path): | |
logger.info("APK file does not need patching.") | |
else: | |
output_name = Path(apk_path).stem + "_patched.apk" | |
shutil.move(patched_file, output_name) | |
logger.info(f"Patched file saved as: {output_name}") | |
except Exception as e: | |
logger.error(f"Error during patching: {e}") | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Unable to repair the following application, apk fake encryption