Skip to content

Instantly share code, notes, and snippets.

@mildsunrise
Created March 28, 2023 19:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mildsunrise/80be53e3f25aedc6e868c2c69ed692fe to your computer and use it in GitHub Desktop.
Save mildsunrise/80be53e3f25aedc6e868c2c69ed692fe to your computer and use it in GitHub Desktop.
🕵️‍♀️ JPEG parser / dissector for the command line
#!/usr/bin/env python3
'''
Portable* JPEG dissector / parser.
Usage: ./jpegparser.py <file name>
(*) Needs Python 3.8+
'''
import sys
import struct
import mmap
import io
import itertools
import re
args = sys.argv[1:]
if len(args) != 1:
print(__doc__.strip(), file=sys.stderr)
exit(10)
fname, = args
jpegfile = open(fname, 'rb')
bytes_per_line = 16
max_rows = 6
max_dump = bytes_per_line * max_rows
show_lengths = True
show_offsets = True
show_defaults = False
show_descriptions = True
keep_parsing = False
colorize = sys.stdout.isatty()
mask = lambda n: ~((~0) << n)
get_bits = lambda x, end, start: (x & mask(end)) >> start
split_bits = lambda x, *bits: (get_bits(x, a, b) for a, b in itertools.pairwise(bits))
def main():
while not parse_marker(jpegfile):
pass
# UTILITIES
def unique_dict(x):
r = {}
for k, v in x:
assert k not in r, f'duplicate key {repr(k)}: existing {repr(r[k])}, got {repr(v)}'
r[k] = v
return r
# FIXME: give it a proper CLI interface
# FIXME: display errors more nicely (last two frames, type name, you know)
def read_string(stream, optional=False):
result = bytearray()
while True:
b = stream.read(1)
if not b:
if optional and not result: return None
raise EOFError('EOF while reading string')
b = b[0]
if not b: break
result.append(b)
return result.decode('utf-8')
def unpack(stream, struct_fmt: str) -> tuple:
struct_obj = struct.Struct('>' + struct_fmt) # FIXME: caching
return struct_obj.unpack(stream.read(struct_obj.size))
def pad_iter(iterable, size, default=None):
iterator = iter(iterable)
for _ in range(size):
yield next(iterator, default)
def split_in_groups(iterable, size):
iterator = iter(iterable)
while (group := list(itertools.islice(iterator, size))):
yield group
def ansi_sgr(p: str, content: str):
content = str(content)
if not colorize: return content
if not content.endswith('\x1b[m'):
content += '\x1b[m'
return f'\x1b[{p}m' + content
ansi_bold = lambda x: ansi_sgr('1', x)
ansi_dim = lambda x: ansi_sgr('2', x)
ansi_fg0 = lambda x: ansi_sgr('30', x)
ansi_fg1 = lambda x: ansi_sgr('31', x)
ansi_fg2 = lambda x: ansi_sgr('32', x)
ansi_fg3 = lambda x: ansi_sgr('33', x)
ansi_fg4 = lambda x: ansi_sgr('34', x)
ansi_fg5 = lambda x: ansi_sgr('35', x)
ansi_fg6 = lambda x: ansi_sgr('36', x)
ansi_fg7 = lambda x: ansi_sgr('37', x)
def print_hex_dump(data: bytes, prefix: str):
colorize_byte = lambda x, r: \
ansi_dim(ansi_fg2(x)) if r == 0 else \
ansi_fg3(x) if chr(r).isascii() and chr(r).isprintable() else \
ansi_fg2(x)
format_hex = lambda x: colorize_byte(f'{x:02x}', x) if x != None else ' '
format_char = lambda x: colorize_byte(x if x.isascii() and (x.isprintable() or x == ' ') else '.', ord(x))
def format_line(line):
groups = split_in_groups(pad_iter(line, bytes_per_line), 4)
hex_part = ' '.join(' '.join(map(format_hex, group)) for group in groups)
char_part = ''.join(format_char(x) for x in map(chr, line))
return hex_part + ' ' + char_part
for line in split_in_groups(data[:max_dump], bytes_per_line):
print(prefix + format_line(line))
if len(data) > max_dump:
print(prefix + '...')
def print_error(exc, prefix: str):
print(prefix + f'{ansi_bold(ansi_fg1("ERROR:"))} {ansi_fg1(exc)}\n')
# CORE PARSING
def parse_segment(st):
'''
parses (if any) an entropy-coded data segment until either a marker or EOF is found.
generates the destuffed bytes of the entropy-coded data segment,
and returns the marker byte, or None if EOF was found.
throws on parsing errors.
'''
def read(msg: str) -> int:
b = st.read(1)
if not b:
raise ValueError(f'{st.tell():#x}: {msg}')
return b[0]
while True:
b = st.read(1)
if not b:
return None
b, = b
if b != 0xFF:
yield b
continue
b = read('unexpected EOF after FF')
if b != 0x00:
break
# the 0x00 is a stuffing byte
yield 0xFF
# we've found a marker
while b == 0xFF:
b = read('unexpected EOF when expecting marker')
if b == 0x00:
raise Exception(f'{st.tell():#x}: invalid FF00 marker')
return b
def parse_and_print_segment(st, prefix='') -> int:
# this is a bit hacky... we only want to collect enough bytes for the hexdump
max_data = max_dump + 1
offset = st.tell()
data = bytearray()
extra = 0
it = parse_segment(st)
try:
while len(data) < max_data:
data.append(next(it))
while True:
next(it)
extra += 1
except StopIteration as err:
return err.value
finally:
if data:
name_text = ansi_fg3(ansi_bold(f'Entropy-coded segment'))
offset_text = ansi_fg4(f' @ {offset:#x}') if show_offsets else ''
length_text = ansi_fg4(f' ({len(data) + extra})') if show_lengths else ''
print(prefix + name_text + offset_text + length_text)
print_hex_dump(data, prefix + ' ')
# CORE MARKER PARSING
# (marker: int | (int, int), code: str, description: str)
marker_info = [
(0x01, 'TEM*', 'For temporary private use in arithmetic coding'),
((0xC0, 0xCF), 'SOF', 'Start of frame'),
(0xC0, None, 'Huffman coding, sequential DCT (baseline)'),
(0xC1, None, 'Huffman coding, extended sequential DCT'),
(0xC2, None, 'Huffman coding, progressive DCT'),
(0xC3, None, 'Huffman coding, lossless (sequential)'),
(0xC5, None, 'Huffman coding, sequential DCT, differential'),
(0xC6, None, 'Huffman coding, progressive DCT, differential'),
(0xC7, None, 'Huffman coding, lossless (sequential), differential'),
(0xC9, None, 'arithmetic coding, extended sequential DCT'),
(0xCA, None, 'arithmetic coding, progressive DCT'),
(0xCB, None, 'arithmetic coding, lossless (sequential)'),
(0xCD, None, 'arithmetic coding, sequential DCT, differential'),
(0xCE, None, 'arithmetic coding, progressive DCT, differential'),
(0xCF, None, 'arithmetic coding, lossless (sequential), differential'),
(0xC4, 'DHT', 'Define Huffman table(s)'),
(0xC8, 'JPG', 'Reserved for additional JPEG extensions'),
(0xCC, 'DAC', 'Define arithmetic coding conditioning(s)'),
((0xD0, 0xD7), 'RST*', 'Restart interval termination'),
(0xD8, 'SOI*', '[Other markers] Start of image'),
(0xD9, 'EOI*', '[Other markers] End of image'),
(0xDA, 'SOS', '[Other markers] Start of scan'),
(0xDB, 'DQT', '[Other markers] Define quantization table(s)'),
(0xDC, 'DNL', '[Other markers] Define number of lines'),
(0xDD, 'DRI', '[Other markers] Define restart interval'),
(0xDE, 'DHP', '[Other markers] Define hierarchical progression'),
(0xDF, 'EXP', '[Other markers] Expand reference component(s)'),
((0xE0, 0xEF), 'APP', 'Application segment'),
(0xE0, None, 'usually JFIF marker segment'),
(0xF0, 'VER', '[Version 1 extension] Version'),
(0xF1, 'DTI', '[Version 1 extension] Define tiled image'),
(0xF2, 'DTT', '[Version 1 extension] Define tile'),
(0xF3, 'SRF', '[Version 1 extension] Selectively refined frame'),
(0xF4, 'SRS', '[Version 1 extension] Selectively refined scan'),
(0xF5, 'DCR', '[Version 1 extension] Define component registration'),
(0xF6, 'DQS', '[Version 1 extension] Define quantizer scale selection'),
((0xF7, 0xFD), 'JPG', 'Reserved for additional JPEG extensions'),
(0xFE, 'COM', '[Other markers] Comment'),
]
parse_asterisk = lambda code: (code[:-1], False) if code and code.endswith('*') else (code, True)
marker_info = [ (k, *parse_asterisk(code), *v) for k, code, *v in marker_info ]
single_markers = { k: v for k, *v in marker_info if type(k) is int }
marker_ranges = [ v for v in marker_info if not (type(v[0]) is int) ]
def get_marker_info(marker: int):
if not (type(marker) is int and 0 < marker < 0xFF):
raise Exception(f'illegal marker value {marker:#02X}')
entry = single_markers.get(marker)
if range_entry := next((v for v in marker_ranges if v[0][0] <= marker <= v[0][1]), None):
if not entry:
entry = range_entry[1:]
elif not entry[0]:
entry = list(entry)
entry[0] = range_entry[1] + str(marker - range_entry[0][0])
entry[2] = f'{range_entry[3]}: {entry[2]}'
return entry
def parse_marker(st, prefix=''):
marker = parse_and_print_segment(st, prefix)
if not marker:
return True
offset = st.tell() - 2
entry = get_marker_info(marker)
if not entry:
entry = 'RES', True, 'Reserved (unknown)' # assume there's a segment...
code, has_segment, description = entry
data = None
if has_segment:
size = st.read(2)
if len(size) < 2:
raise Exception(f'{offset+2:#x}: unexpected EOF when reading length')
size, = struct.unpack('>H', size)
if size < 2:
raise Exception(f'{offset+2:#x}: invalid segment length {size}')
data = st.read(size - 2)
if len(data) < size - 2:
raise Exception(f'{offset+4:#x}: unexpected EOF when reading data (expected {size - 2}, got {len(data)})')
name_text = ansi_bold(f'[{marker:02X}] {code}')
if code.startswith('SOF') or code == 'SOI' or code == 'EOI':
name_text = ansi_fg5(name_text)
if code == 'SOS':
name_text = ansi_fg3(name_text)
description_text = f' - {description}' if show_descriptions else ''
offset_text = ansi_fg4(f' @ {offset:#x}') if show_offsets else ''
length_text = ansi_fg4(f' ({len(data)})') if has_segment and show_lengths else ''
print(prefix + name_text + offset_text + length_text + description_text)
prefix += ' '
try:
if code.startswith('SOF'): code = 'SOF'
if (handler := globals().get(f'parse_{code.lower()}_marker')):
return handler(offset, data, prefix)
except Exception as e:
print_error(e, prefix)
# as fall back (or if error), print hex dump
if max_dump and data:
print_hex_dump(data, prefix)
if not keep_parsing and marker == b'\xFF\xD9':
return True
# MARKERS
def zigzag_to_index(x, y):
def zigzag(x, y):
row = x + y
return (1 + row) * row // 2 + [x, y][row % 2]
row = x + y
return zigzag(x, y) if row < 8 else 63 - zigzag(7 - x, 7 - y)
def parse_app0_marker(offset, data, prefix):
st = io.BytesIO(data)
app_id = read_string(st)
if app_id == 'JFIF':
parse_app0_jfif(offset, st, prefix)
# elif app_id == 'JFXX': (TODO)
# parse_app0_jfxx(offset, st, prefix)
else:
raise Exception(f'unrecognized APP0 marker with ID: {repr(app_id)}')
def parse_app0_jfif(offset, st, prefix):
major, minor = unpack(st, '2B')
print(prefix + ansi_bold(f'JFIF v{major}.{minor} file'))
unit, dx, dy = unpack(st, 'B 2H')
if unit == 0:
print(prefix + f'pixel aspect ratio = {dy}:{dx}')
else:
unit_desc = ['ppi', 'pixels per cm'][unit-1] if unit-1 < 2 else f'(unknown unit {unit})'
print(prefix + f'density = {dx} x {dy} {unit_desc}')
tw, th = unpack(st, '2B')
data_offset = offset + st.tell()
data = st.read(3 * tw * th)
if len(data) < 3 * tw * th:
raise Exception('unexpected EOF when reading thumbnail data')
if tw or th:
name_text = f'{tw} x {th} thumbnail'
offset_text = ansi_fg4(f' @ {data_offset:#x}') if show_offsets else ''
length_text = ansi_fg4(f' ({len(data)})') if has_segment and show_lengths else ''
print(prefix + name_text + offset_text + length_text)
print_hex_dump(data, prefix + ' ')
if extra := st.read():
raise Exception(f'found {len(extra)} extra bytes')
def parse_sof_marker(offset, data, prefix):
st = io.BytesIO(data)
p, y, x, nf = unpack(st, 'B 2H B')
print(prefix + f'{x} x {y}, {p} bits')
for c_idx in range(nf):
c, comp, tq = unpack(st, '3B')
h, v = comp >> 4, comp & 0xF
print(prefix + ansi_bold(f'component {c}:') + f' sampling factor = {v}:{h}, quantization table = {tq}')
if extra := st.read():
raise Exception(f'found {len(extra)} extra bytes')
def parse_sos_marker(offset, data, prefix):
st = io.BytesIO(data)
ns, = unpack(st, 'B')
for c_idx in range(ns):
Cs, comp = unpack(st, '2B')
Td, Ta = comp >> 4, comp & 0xF
print(prefix + ansi_bold(f'component {Cs}:') + f' DC table = {Td}, AC table = {Ta}')
prog = st.read(3)
if prog:
Ss, Se, comp = prog
Ah, Al = comp >> 4, comp & 0xF
print(prefix + f'progressive scan: Ss = {Ss}, Se = {Se}, Ah = {Ah}, Al = {Al}')
if extra := st.read():
raise Exception(f'found {len(extra)} extra bytes')
def parse_dqt_marker(offset, data, prefix):
st = io.BytesIO(data)
while spec := st.read(1):
Pq = spec[0] >> 4
Tq = spec[0] & 0xF
if not (Pq < 2):
raise Exception(f'invalid Pq = {Pq}')
print(prefix + f'table slot {ansi_bold(str(Tq))} ({[8, 16][Pq]} bit precision):')
coeffs = unpack(st, '64' + 'BH'[Pq])
min_coeff, max_coeff = min(coeffs), max(coeffs)
for y in range(8):
row = [ coeffs[zigzag_to_index(x, y)] for x in range(8) ]
row = ( (x == min_coeff, x == max_coeff, str(x).rjust([3, 5][Pq])) for x in row)
row = ( ansi_fg2(ansi_bold(x) if bold else ansi_dim(x) if dim else x) for bold, dim, x in row )
print(prefix + ' ' + ' '.join(row))
def parse_dht_marker(offset, data, prefix):
st = io.BytesIO(data)
while spec := st.read(1):
Tc = spec[0] >> 4
Th = spec[0] & 0xF
kind = ["DC", "AC"][Tc] if Tc < 2 else f'unknown class {Tc}'
print(prefix + f'table slot {ansi_bold(str(Th))} ({kind}):')
def safe_read(n):
k = st.read(n)
if len(k) < n:
raise Exception(f'expected {n} bytes, found {len(k)}')
return k
lens = safe_read(16)
vals = [ safe_read(n) for n in lens ]
format_val = lambda x: ansi_fg2(x.hex()) if x else ansi_dim('o')
print(prefix + ' ' + ansi_dim(', ').join(format_val(x) for x in vals))
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment