Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Pure python reimplementation of .cpio.xz content extraction from pbzx file payload for OS X packages
#!/usr/bin/env python
# Extract .cpio file from a pbzx Payload file.
# Based on,
# this version adds a command-line interface, improves efficiency (1 MiB chunks
# instead of a full copy in memory), adds Python 3 compatibility and
# automatically decompresses stuff (some blocks may not be compressed).
# Example usage (from Python):
# parse_pbzx(open('PayloadJava', 'rb'), open('PayloadJava.cpio', wb'))
# Example usage (from shell):
# # These are all equivalent
# ./ < PayloadJava > PayloadJava.cpio
# ./ PayloadJava > PayloadJava.cpio
# ./ PayloadJava PayloadJava.cpio
# Another example, extract Payload from a .pkg file, convert it to a cpio.xz
# archive (this script) and list contents (cpio -t):
# bsdtar -xOf some.pkg Payload | ./ Payload | cpio -t
from __future__ import print_function
import struct
import sys
from contextlib import contextmanager
import subprocess
def dbg_print(*args):
# Uncomment next line for debugging
#print(*args, file=sys.stderr)
def read_f(f, count):
"""Try to fully read data, raising EOFError on short reads."""
data =
read_bytes = len(data)
if read_bytes != count:
raise EOFError("Read %d, expected %d" % (read_bytes, count))
return data
def copy_data(f_in, f_out, count):
"""Copy in chunks of a megabyte to avoid excess memory waste."""
while count > 0:
sz = min(count, 1024**2)
f_out.write(read_f(f_in, sz))
count -= sz
def unxz(f_out):
proc = subprocess.Popen(["unxz"], stdin=subprocess.PIPE, stdout=f_out)
yield proc.stdin
ret = proc.wait()
if ret != 0:
raise OSError("Decompression failed with status code %d" % ret)
def parse_pbzx(pbzx_file, cpio_file):
magic = read_f(pbzx_file, 4)
if magic != b'pbzx':
raise RuntimeError("Error: Not a pbzx file")
# Read 8 bytes for initial flags
flags = read_f(pbzx_file, 8)
# Interpret the flags as a 64-bit big-endian unsigned int
flags = struct.unpack('>Q', flags)[0]
out_offset, in_offset = 0, 4 + 8
while (flags & (1 << 24)):
# Read in more flags
flags = read_f(pbzx_file, 8)
flags = struct.unpack('>Q', flags)[0]
# Read in length
f_length = read_f(pbzx_file, 8)
f_length = struct.unpack('>Q', f_length)[0]
if f_length == 0x1000000:
# Literal copy
copy_data(pbzx_file, cpio_file, f_length)
xzmagic = read_f(pbzx_file, 6)
dbg_print("Flags: %#018x Length: %r Magic: %r" % (flags, f_length, xzmagic))
if xzmagic != b'\xfd7zXZ\x00':
raise RuntimeError("Error: Header is not xar file header: offset %d, magic %r" % (offset, xzmagic))
with unxz(cpio_file) as unxz_f:
# Do not copy header magic again (-6)
copy_data(pbzx_file, unxz_f, -6 + f_length)
in_offset += 8 + 8 + f_length
out_offset += f_length
dbg_print("Read %d bytes, wrote %d bytes so far" % (in_offset, out_offset))
if __name__ == '__main__':
def open_file(argno, mode, f):
if len(sys.argv) > argno:
return open(sys.argv[argno], mode)
# Access binary stdin/stdout in Python 3
if hasattr(f, "buffer"):
return f.buffer;
return f
in_file = open_file(1, "rb", sys.stdin)
out_file = open_file(2, "wb", sys.stdout)
parse_pbzx(in_file, out_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.