Skip to content

Instantly share code, notes, and snippets.

@drego85
Last active November 20, 2024 12:21
Show Gist options
  • Save drego85/f3fa845b2f14b29e926417d697131134 to your computer and use it in GitHub Desktop.
Save drego85/f3fa845b2f14b29e926417d697131134 to your computer and use it in GitHub Desktop.
Fix APK (Android Package) files that have corrupt or non-standard ZIP headers
#!/usr/bin/env python3
#
# This Python script is designed to fix APK (Android Package)
# files that have corrupt or non-standard ZIP headers.
# Specifically, it corrects invalid compression methods, sets
# the compression method to “no compression” and updates the
# file sizes in the headers. This allows the APK to conform
# to the standard ZIP specification, facilitating analysis
# and decompilation with tools such as JADX.
#
# Usage: apk-patcher.py <file.apk> [--debug]
#
# Many thanks to @qfalconer for the initial reverse engineering:
# https://github.com/skylot/jadx/pull/2298
#
# Made with ♥ by Andrea Draghetti
#
# This file may be licensed under the terms of of the
# GNU General Public License Version 3 (the ``GPL'').
#
import os
import sys
import shutil
import struct
import tempfile
import logging
from pathlib import Path
from typing import BinaryIO, Dict, List, Union
# Configure logging
logging.basicConfig(format='%(levelname)s: %(message)s')
logger = logging.getLogger('APKPatcher')
class APKPatcher:
"""
APK file patcher that fixes corrupted ZIP headers commonly used to evade analysis.
This patcher handles cases where:
1. Compression method in headers is invalid
2. Compressed size in headers is invalid
3. Extra data in Local Headers is invalid
"""
# ZIP format signatures
END_OF_CENTRAL_DIR_SIGNATURE = 0x06054b50 # End of Central Directory Record
CENTRAL_DIR_SIGNATURE = 0x02014b50 # Central Directory Header
LOCAL_HEADER_SIGNATURE = 0x04034b50 # Local File Header
def __init__(self, apk_path: Union[str, Path], debug: bool = False):
"""
Initialize the patcher with the target APK file.
Args:
apk_path: Path to the APK file
debug: Enable debug logging
"""
self.apk_path = Path(apk_path)
if not self.apk_path.suffix.lower() == '.apk':
raise ValueError("File must be an APK")
# Set logging level based on debug flag
logger.setLevel(logging.DEBUG if debug else logging.INFO)
def patch(self) -> Path:
"""
Apply necessary patches to the APK file.
Returns:
Path to the patched file, or original file if no patching was needed
"""
logger.info(f"Starting analysis of {self.apk_path}")
# Create a temporary copy of the file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.apk')
temp_path = Path(temp_file.name)
shutil.copy2(self.apk_path, temp_path)
logger.debug(f"Created temporary file: {temp_path}")
try:
with open(temp_path, 'rb+') as f:
# Find End of Central Directory
eocd_offset = self._find_end_of_central_dir(f)
logger.debug(f"Found End of Central Directory at offset: 0x{eocd_offset:08X}")
# Read Central Directory offset and number of entries
f.seek(eocd_offset + 0x10)
cdir_offset = struct.unpack('<I', f.read(4))[0]
f.seek(eocd_offset + 0x0A)
num_entries = struct.unpack('<H', f.read(2))[0]
logger.debug(f"Central Directory starts at: 0x{cdir_offset:08X}")
logger.debug(f"Number of entries: {num_entries}")
# Lists to track headers that need fixing
cdir_entries_to_fix: List[int] = []
local_headers: List[int] = []
# Analyze Central Directory
offset = cdir_offset
for entry_idx in range(num_entries):
logger.debug(f"\nAnalyzing Central Directory entry {entry_idx + 1}/{num_entries}")
info = self._read_header(f, offset)
if not self._is_valid_compression(info['compression']):
logger.debug(f"Found invalid compression method: 0x{info['compression']:04X}")
cdir_entries_to_fix.append(offset)
# Read local header offset
f.seek(offset + 0x2A)
local_offset = struct.unpack('<I', f.read(4))[0]
local_headers.append(local_offset)
logger.debug(f"Local header offset: 0x{local_offset:08X}")
offset += info['data_offset']
# Find local headers that need fixing
local_headers_to_fix: List[int] = []
for idx, offset in enumerate(local_headers):
logger.debug(f"\nAnalyzing Local Header {idx + 1}/{len(local_headers)}")
info = self._read_header(f, offset)
if not self._is_valid_compression(info['compression']):
logger.debug(f"Found invalid compression method: 0x{info['compression']:04X}")
local_headers_to_fix.append(offset)
# If nothing needs fixing, use original file
if not cdir_entries_to_fix and not local_headers_to_fix:
logger.info("No corrupted headers found, APK is clean")
os.unlink(temp_path)
return self.apk_path
logger.info(f"Found {len(cdir_entries_to_fix)} Central Directory entries and "
f"{len(local_headers_to_fix)} Local Headers to fix")
# Fix Central Directory entries
for offset in cdir_entries_to_fix:
logger.debug(f"\nFixing Central Directory entry at offset 0x{offset:08X}")
info = self._read_header(f, offset)
# Set compression method to 0 (stored)
f.seek(offset + 0x0A)
f.write(struct.pack('<H', 0))
logger.debug("Set compression method to 0 (stored)")
# Update compressed size
f.seek(offset + 0x14)
f.write(struct.pack('<I', info['uncompressed_size']))
logger.debug(f"Updated compressed size to: {info['uncompressed_size']}")
# Fix Local headers
for offset in local_headers_to_fix:
logger.debug(f"\nFixing Local Header at offset 0x{offset:08X}")
info = self._read_header(f, offset)
# Set compression method to 0 (stored)
f.seek(offset + 0x08)
f.write(struct.pack('<H', 0))
logger.debug("Set compression method to 0 (stored)")
# Update compressed size
f.seek(offset + 0x12)
f.write(struct.pack('<I', info['uncompressed_size']))
logger.debug(f"Updated compressed size to: {info['uncompressed_size']}")
# Clear extra length
f.seek(offset + 0x1C)
f.write(struct.pack('<H', 0))
logger.debug("Cleared extra field length")
# Move data block if needed
if info['extra_len'] > 0:
logger.debug(f"Moving data block back by {info['extra_len']} bytes")
self._move_block_back(f,
offset + info['data_offset'],
info['uncompressed_size'],
info['extra_len'])
logger.info("Successfully patched all corrupted headers")
return temp_path
except Exception as e:
logger.error(f"Error while patching: {e}")
os.unlink(temp_path)
raise e
def _find_end_of_central_dir(self, f: BinaryIO) -> int:
"""
Find the offset of End of Central Directory record.
Args:
f: File object positioned at the start
Returns:
Offset of the End of Central Directory record
Raises:
ValueError: If End of Central Directory record is not found
"""
f.seek(0, 2) # Go to end of file
file_size = f.tell()
logger.debug(f"Searching for End of Central Directory in file size: {file_size}")
# Search for EOCD signature from the end
for offset in range(file_size - 4, -1, -1):
f.seek(offset)
if struct.unpack('<I', f.read(4))[0] == self.END_OF_CENTRAL_DIR_SIGNATURE:
return offset
raise ValueError("Invalid ZIP file: End of Central Directory record not found")
def _read_header(self, f: BinaryIO, offset: int) -> Dict:
"""
Read ZIP header information.
Args:
f: File object
offset: Offset where the header starts
Returns:
Dictionary containing header information
Raises:
ValueError: If header signature is invalid
"""
f.seek(offset)
signature = struct.unpack('<I', f.read(4))[0]
if signature not in [self.CENTRAL_DIR_SIGNATURE, self.LOCAL_HEADER_SIGNATURE]:
raise ValueError(f"Invalid ZIP header signature {hex(signature)} at offset {hex(offset)}")
is_central = signature == self.CENTRAL_DIR_SIGNATURE
delta = 0 if is_central else -2
# Read compression method
f.seek(offset + 0x0A + delta)
compression = struct.unpack('<H', f.read(2))[0]
# Read uncompressed size
f.seek(offset + 0x18 + delta)
uncompressed_size = struct.unpack('<I', f.read(4))[0]
# Read name, extra and comment lengths
f.seek(offset + 0x1C + delta)
name_len = struct.unpack('<H', f.read(2))[0]
extra_len = struct.unpack('<H', f.read(2))[0]
comment_len = struct.unpack('<H', f.read(2))[0] if is_central else 0
return {
'compression': compression,
'uncompressed_size': uncompressed_size,
'data_offset': (0x2E if is_central else 0x1E) + name_len + extra_len + comment_len,
'extra_len': extra_len
}
def _is_valid_compression(self, compression: int) -> bool:
"""
Check if compression method is valid.
Args:
compression: Compression method value
Returns:
True if compression method is valid, False otherwise
"""
return compression in [0x0, 0x8] # 0 = stored (no compression), 8 = deflate
def _move_block_back(self, f: BinaryIO, offset: int, size: int, delta: int):
"""
Move a block of data backwards by delta bytes.
Args:
f: File object
offset: Start offset of the block
size: Size of the block to move
delta: Number of bytes to move back
"""
buffer_size = 1024 * 1024 # 1MB buffer
while size > 0:
chunk_size = min(buffer_size, size)
# Read chunk
f.seek(offset)
data = f.read(chunk_size)
# Write chunk at new position
f.seek(offset - delta)
f.write(data)
size -= chunk_size
offset += chunk_size
logger.debug(f"Moved {chunk_size} bytes, {size} remaining")
def main():
if len(sys.argv) not in [2, 3]:
print(f"Usage: {sys.argv[0]} <file.apk> [--debug]")
sys.exit(1)
debug = "--debug" in sys.argv
apk_path = sys.argv[1] if not debug else (sys.argv[2] if sys.argv[1] == "--debug" else sys.argv[1])
try:
patcher = APKPatcher(apk_path, debug)
patched_file = patcher.patch()
if patched_file == Path(apk_path):
logger.info("APK file does not need patching.")
else:
output_name = Path(apk_path).stem + "_patched.apk"
shutil.move(patched_file, output_name)
logger.info(f"Patched file saved as: {output_name}")
except Exception as e:
logger.error(f"Error during patching: {e}")
sys.exit(1)
if __name__ == '__main__':
main()
@ohyeah521
Copy link

Unable to repair the following application, apk fake encryption
2024-10-29 23_00_30__Chrome zip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment