Created
March 28, 2023 19:14
-
-
Save mildsunrise/80be53e3f25aedc6e868c2c69ed692fe to your computer and use it in GitHub Desktop.
🕵️♀️ JPEG parser / dissector for the command line
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Portable* JPEG dissector / parser. | |
Usage: ./jpegparser.py <file name> | |
(*) Needs Python 3.8+ | |
''' | |
import sys | |
import struct | |
import mmap | |
import io | |
import itertools | |
import re | |
args = sys.argv[1:] | |
if len(args) != 1: | |
print(__doc__.strip(), file=sys.stderr) | |
exit(10) | |
fname, = args | |
jpegfile = open(fname, 'rb') | |
bytes_per_line = 16 | |
max_rows = 6 | |
max_dump = bytes_per_line * max_rows | |
show_lengths = True | |
show_offsets = True | |
show_defaults = False | |
show_descriptions = True | |
keep_parsing = False | |
colorize = sys.stdout.isatty() | |
mask = lambda n: ~((~0) << n) | |
get_bits = lambda x, end, start: (x & mask(end)) >> start | |
split_bits = lambda x, *bits: (get_bits(x, a, b) for a, b in itertools.pairwise(bits)) | |
def main(): | |
while not parse_marker(jpegfile): | |
pass | |
# UTILITIES | |
def unique_dict(x): | |
r = {} | |
for k, v in x: | |
assert k not in r, f'duplicate key {repr(k)}: existing {repr(r[k])}, got {repr(v)}' | |
r[k] = v | |
return r | |
# FIXME: give it a proper CLI interface | |
# FIXME: display errors more nicely (last two frames, type name, you know) | |
def read_string(stream, optional=False): | |
result = bytearray() | |
while True: | |
b = stream.read(1) | |
if not b: | |
if optional and not result: return None | |
raise EOFError('EOF while reading string') | |
b = b[0] | |
if not b: break | |
result.append(b) | |
return result.decode('utf-8') | |
def unpack(stream, struct_fmt: str) -> tuple: | |
struct_obj = struct.Struct('>' + struct_fmt) # FIXME: caching | |
return struct_obj.unpack(stream.read(struct_obj.size)) | |
def pad_iter(iterable, size, default=None): | |
iterator = iter(iterable) | |
for _ in range(size): | |
yield next(iterator, default) | |
def split_in_groups(iterable, size): | |
iterator = iter(iterable) | |
while (group := list(itertools.islice(iterator, size))): | |
yield group | |
def ansi_sgr(p: str, content: str): | |
content = str(content) | |
if not colorize: return content | |
if not content.endswith('\x1b[m'): | |
content += '\x1b[m' | |
return f'\x1b[{p}m' + content | |
ansi_bold = lambda x: ansi_sgr('1', x) | |
ansi_dim = lambda x: ansi_sgr('2', x) | |
ansi_fg0 = lambda x: ansi_sgr('30', x) | |
ansi_fg1 = lambda x: ansi_sgr('31', x) | |
ansi_fg2 = lambda x: ansi_sgr('32', x) | |
ansi_fg3 = lambda x: ansi_sgr('33', x) | |
ansi_fg4 = lambda x: ansi_sgr('34', x) | |
ansi_fg5 = lambda x: ansi_sgr('35', x) | |
ansi_fg6 = lambda x: ansi_sgr('36', x) | |
ansi_fg7 = lambda x: ansi_sgr('37', x) | |
def print_hex_dump(data: bytes, prefix: str): | |
colorize_byte = lambda x, r: \ | |
ansi_dim(ansi_fg2(x)) if r == 0 else \ | |
ansi_fg3(x) if chr(r).isascii() and chr(r).isprintable() else \ | |
ansi_fg2(x) | |
format_hex = lambda x: colorize_byte(f'{x:02x}', x) if x != None else ' ' | |
format_char = lambda x: colorize_byte(x if x.isascii() and (x.isprintable() or x == ' ') else '.', ord(x)) | |
def format_line(line): | |
groups = split_in_groups(pad_iter(line, bytes_per_line), 4) | |
hex_part = ' '.join(' '.join(map(format_hex, group)) for group in groups) | |
char_part = ''.join(format_char(x) for x in map(chr, line)) | |
return hex_part + ' ' + char_part | |
for line in split_in_groups(data[:max_dump], bytes_per_line): | |
print(prefix + format_line(line)) | |
if len(data) > max_dump: | |
print(prefix + '...') | |
def print_error(exc, prefix: str): | |
print(prefix + f'{ansi_bold(ansi_fg1("ERROR:"))} {ansi_fg1(exc)}\n') | |
# CORE PARSING | |
def parse_segment(st): | |
''' | |
parses (if any) an entropy-coded data segment until either a marker or EOF is found. | |
generates the destuffed bytes of the entropy-coded data segment, | |
and returns the marker byte, or None if EOF was found. | |
throws on parsing errors. | |
''' | |
def read(msg: str) -> int: | |
b = st.read(1) | |
if not b: | |
raise ValueError(f'{st.tell():#x}: {msg}') | |
return b[0] | |
while True: | |
b = st.read(1) | |
if not b: | |
return None | |
b, = b | |
if b != 0xFF: | |
yield b | |
continue | |
b = read('unexpected EOF after FF') | |
if b != 0x00: | |
break | |
# the 0x00 is a stuffing byte | |
yield 0xFF | |
# we've found a marker | |
while b == 0xFF: | |
b = read('unexpected EOF when expecting marker') | |
if b == 0x00: | |
raise Exception(f'{st.tell():#x}: invalid FF00 marker') | |
return b | |
def parse_and_print_segment(st, prefix='') -> int: | |
# this is a bit hacky... we only want to collect enough bytes for the hexdump | |
max_data = max_dump + 1 | |
offset = st.tell() | |
data = bytearray() | |
extra = 0 | |
it = parse_segment(st) | |
try: | |
while len(data) < max_data: | |
data.append(next(it)) | |
while True: | |
next(it) | |
extra += 1 | |
except StopIteration as err: | |
return err.value | |
finally: | |
if data: | |
name_text = ansi_fg3(ansi_bold(f'Entropy-coded segment')) | |
offset_text = ansi_fg4(f' @ {offset:#x}') if show_offsets else '' | |
length_text = ansi_fg4(f' ({len(data) + extra})') if show_lengths else '' | |
print(prefix + name_text + offset_text + length_text) | |
print_hex_dump(data, prefix + ' ') | |
# CORE MARKER PARSING | |
# (marker: int | (int, int), code: str, description: str) | |
marker_info = [ | |
(0x01, 'TEM*', 'For temporary private use in arithmetic coding'), | |
((0xC0, 0xCF), 'SOF', 'Start of frame'), | |
(0xC0, None, 'Huffman coding, sequential DCT (baseline)'), | |
(0xC1, None, 'Huffman coding, extended sequential DCT'), | |
(0xC2, None, 'Huffman coding, progressive DCT'), | |
(0xC3, None, 'Huffman coding, lossless (sequential)'), | |
(0xC5, None, 'Huffman coding, sequential DCT, differential'), | |
(0xC6, None, 'Huffman coding, progressive DCT, differential'), | |
(0xC7, None, 'Huffman coding, lossless (sequential), differential'), | |
(0xC9, None, 'arithmetic coding, extended sequential DCT'), | |
(0xCA, None, 'arithmetic coding, progressive DCT'), | |
(0xCB, None, 'arithmetic coding, lossless (sequential)'), | |
(0xCD, None, 'arithmetic coding, sequential DCT, differential'), | |
(0xCE, None, 'arithmetic coding, progressive DCT, differential'), | |
(0xCF, None, 'arithmetic coding, lossless (sequential), differential'), | |
(0xC4, 'DHT', 'Define Huffman table(s)'), | |
(0xC8, 'JPG', 'Reserved for additional JPEG extensions'), | |
(0xCC, 'DAC', 'Define arithmetic coding conditioning(s)'), | |
((0xD0, 0xD7), 'RST*', 'Restart interval termination'), | |
(0xD8, 'SOI*', '[Other markers] Start of image'), | |
(0xD9, 'EOI*', '[Other markers] End of image'), | |
(0xDA, 'SOS', '[Other markers] Start of scan'), | |
(0xDB, 'DQT', '[Other markers] Define quantization table(s)'), | |
(0xDC, 'DNL', '[Other markers] Define number of lines'), | |
(0xDD, 'DRI', '[Other markers] Define restart interval'), | |
(0xDE, 'DHP', '[Other markers] Define hierarchical progression'), | |
(0xDF, 'EXP', '[Other markers] Expand reference component(s)'), | |
((0xE0, 0xEF), 'APP', 'Application segment'), | |
(0xE0, None, 'usually JFIF marker segment'), | |
(0xF0, 'VER', '[Version 1 extension] Version'), | |
(0xF1, 'DTI', '[Version 1 extension] Define tiled image'), | |
(0xF2, 'DTT', '[Version 1 extension] Define tile'), | |
(0xF3, 'SRF', '[Version 1 extension] Selectively refined frame'), | |
(0xF4, 'SRS', '[Version 1 extension] Selectively refined scan'), | |
(0xF5, 'DCR', '[Version 1 extension] Define component registration'), | |
(0xF6, 'DQS', '[Version 1 extension] Define quantizer scale selection'), | |
((0xF7, 0xFD), 'JPG', 'Reserved for additional JPEG extensions'), | |
(0xFE, 'COM', '[Other markers] Comment'), | |
] | |
parse_asterisk = lambda code: (code[:-1], False) if code and code.endswith('*') else (code, True) | |
marker_info = [ (k, *parse_asterisk(code), *v) for k, code, *v in marker_info ] | |
single_markers = { k: v for k, *v in marker_info if type(k) is int } | |
marker_ranges = [ v for v in marker_info if not (type(v[0]) is int) ] | |
def get_marker_info(marker: int): | |
if not (type(marker) is int and 0 < marker < 0xFF): | |
raise Exception(f'illegal marker value {marker:#02X}') | |
entry = single_markers.get(marker) | |
if range_entry := next((v for v in marker_ranges if v[0][0] <= marker <= v[0][1]), None): | |
if not entry: | |
entry = range_entry[1:] | |
elif not entry[0]: | |
entry = list(entry) | |
entry[0] = range_entry[1] + str(marker - range_entry[0][0]) | |
entry[2] = f'{range_entry[3]}: {entry[2]}' | |
return entry | |
def parse_marker(st, prefix=''): | |
marker = parse_and_print_segment(st, prefix) | |
if not marker: | |
return True | |
offset = st.tell() - 2 | |
entry = get_marker_info(marker) | |
if not entry: | |
entry = 'RES', True, 'Reserved (unknown)' # assume there's a segment... | |
code, has_segment, description = entry | |
data = None | |
if has_segment: | |
size = st.read(2) | |
if len(size) < 2: | |
raise Exception(f'{offset+2:#x}: unexpected EOF when reading length') | |
size, = struct.unpack('>H', size) | |
if size < 2: | |
raise Exception(f'{offset+2:#x}: invalid segment length {size}') | |
data = st.read(size - 2) | |
if len(data) < size - 2: | |
raise Exception(f'{offset+4:#x}: unexpected EOF when reading data (expected {size - 2}, got {len(data)})') | |
name_text = ansi_bold(f'[{marker:02X}] {code}') | |
if code.startswith('SOF') or code == 'SOI' or code == 'EOI': | |
name_text = ansi_fg5(name_text) | |
if code == 'SOS': | |
name_text = ansi_fg3(name_text) | |
description_text = f' - {description}' if show_descriptions else '' | |
offset_text = ansi_fg4(f' @ {offset:#x}') if show_offsets else '' | |
length_text = ansi_fg4(f' ({len(data)})') if has_segment and show_lengths else '' | |
print(prefix + name_text + offset_text + length_text + description_text) | |
prefix += ' ' | |
try: | |
if code.startswith('SOF'): code = 'SOF' | |
if (handler := globals().get(f'parse_{code.lower()}_marker')): | |
return handler(offset, data, prefix) | |
except Exception as e: | |
print_error(e, prefix) | |
# as fall back (or if error), print hex dump | |
if max_dump and data: | |
print_hex_dump(data, prefix) | |
if not keep_parsing and marker == b'\xFF\xD9': | |
return True | |
# MARKERS | |
def zigzag_to_index(x, y): | |
def zigzag(x, y): | |
row = x + y | |
return (1 + row) * row // 2 + [x, y][row % 2] | |
row = x + y | |
return zigzag(x, y) if row < 8 else 63 - zigzag(7 - x, 7 - y) | |
def parse_app0_marker(offset, data, prefix): | |
st = io.BytesIO(data) | |
app_id = read_string(st) | |
if app_id == 'JFIF': | |
parse_app0_jfif(offset, st, prefix) | |
# elif app_id == 'JFXX': (TODO) | |
# parse_app0_jfxx(offset, st, prefix) | |
else: | |
raise Exception(f'unrecognized APP0 marker with ID: {repr(app_id)}') | |
def parse_app0_jfif(offset, st, prefix): | |
major, minor = unpack(st, '2B') | |
print(prefix + ansi_bold(f'JFIF v{major}.{minor} file')) | |
unit, dx, dy = unpack(st, 'B 2H') | |
if unit == 0: | |
print(prefix + f'pixel aspect ratio = {dy}:{dx}') | |
else: | |
unit_desc = ['ppi', 'pixels per cm'][unit-1] if unit-1 < 2 else f'(unknown unit {unit})' | |
print(prefix + f'density = {dx} x {dy} {unit_desc}') | |
tw, th = unpack(st, '2B') | |
data_offset = offset + st.tell() | |
data = st.read(3 * tw * th) | |
if len(data) < 3 * tw * th: | |
raise Exception('unexpected EOF when reading thumbnail data') | |
if tw or th: | |
name_text = f'{tw} x {th} thumbnail' | |
offset_text = ansi_fg4(f' @ {data_offset:#x}') if show_offsets else '' | |
length_text = ansi_fg4(f' ({len(data)})') if has_segment and show_lengths else '' | |
print(prefix + name_text + offset_text + length_text) | |
print_hex_dump(data, prefix + ' ') | |
if extra := st.read(): | |
raise Exception(f'found {len(extra)} extra bytes') | |
def parse_sof_marker(offset, data, prefix): | |
st = io.BytesIO(data) | |
p, y, x, nf = unpack(st, 'B 2H B') | |
print(prefix + f'{x} x {y}, {p} bits') | |
for c_idx in range(nf): | |
c, comp, tq = unpack(st, '3B') | |
h, v = comp >> 4, comp & 0xF | |
print(prefix + ansi_bold(f'component {c}:') + f' sampling factor = {v}:{h}, quantization table = {tq}') | |
if extra := st.read(): | |
raise Exception(f'found {len(extra)} extra bytes') | |
def parse_sos_marker(offset, data, prefix): | |
st = io.BytesIO(data) | |
ns, = unpack(st, 'B') | |
for c_idx in range(ns): | |
Cs, comp = unpack(st, '2B') | |
Td, Ta = comp >> 4, comp & 0xF | |
print(prefix + ansi_bold(f'component {Cs}:') + f' DC table = {Td}, AC table = {Ta}') | |
prog = st.read(3) | |
if prog: | |
Ss, Se, comp = prog | |
Ah, Al = comp >> 4, comp & 0xF | |
print(prefix + f'progressive scan: Ss = {Ss}, Se = {Se}, Ah = {Ah}, Al = {Al}') | |
if extra := st.read(): | |
raise Exception(f'found {len(extra)} extra bytes') | |
def parse_dqt_marker(offset, data, prefix): | |
st = io.BytesIO(data) | |
while spec := st.read(1): | |
Pq = spec[0] >> 4 | |
Tq = spec[0] & 0xF | |
if not (Pq < 2): | |
raise Exception(f'invalid Pq = {Pq}') | |
print(prefix + f'table slot {ansi_bold(str(Tq))} ({[8, 16][Pq]} bit precision):') | |
coeffs = unpack(st, '64' + 'BH'[Pq]) | |
min_coeff, max_coeff = min(coeffs), max(coeffs) | |
for y in range(8): | |
row = [ coeffs[zigzag_to_index(x, y)] for x in range(8) ] | |
row = ( (x == min_coeff, x == max_coeff, str(x).rjust([3, 5][Pq])) for x in row) | |
row = ( ansi_fg2(ansi_bold(x) if bold else ansi_dim(x) if dim else x) for bold, dim, x in row ) | |
print(prefix + ' ' + ' '.join(row)) | |
def parse_dht_marker(offset, data, prefix): | |
st = io.BytesIO(data) | |
while spec := st.read(1): | |
Tc = spec[0] >> 4 | |
Th = spec[0] & 0xF | |
kind = ["DC", "AC"][Tc] if Tc < 2 else f'unknown class {Tc}' | |
print(prefix + f'table slot {ansi_bold(str(Th))} ({kind}):') | |
def safe_read(n): | |
k = st.read(n) | |
if len(k) < n: | |
raise Exception(f'expected {n} bytes, found {len(k)}') | |
return k | |
lens = safe_read(16) | |
vals = [ safe_read(n) for n in lens ] | |
format_val = lambda x: ansi_fg2(x.hex()) if x else ansi_dim('o') | |
print(prefix + ' ' + ansi_dim(', ').join(format_val(x) for x in vals)) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment