Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save llllllllll/8cb443125c6e6db97e8b1af7291547a3 to your computer and use it in GitHub Desktop.
Save llllllllll/8cb443125c6e6db97e8b1af7291547a3 to your computer and use it in GitHub Desktop.
"""Example usage:
In [1]: !cat ayy.s
.text
.global _start
.type _start, @function
_start:
movq $10, %rdx
pushw $10
pushw $28513
pushw $28012
pushw $8313
pushw $31073
movq %rsp, %rsi
movq $1, %rdi
movq $1, %rbx
movq $1, %rax
syscall
addq $10, %rsp
movq $0, %rax
retq
In [2]: !as -o ayy.o ayy.s
In [3]: with open('ayy.o', 'rb') as f:
...: f.seek(64)
...: payload = f.read()
...:
In [4]: from they_should_have_written_cpython_in_rust_lol import execute_binary_payload
In [5]: execute_binary_payload(payload)
ayy lmao
"""
import dis
import mmap
import sys
import types
# byte offsets for structures
if hasattr(sys, 'gettotalrefcount'):
# under PyDEBUG builds
_f_localsplus_offset = 392
_ob_item_offset = 40
_view_offset = 72
_sizeof_pyobject = 32
_mbuf_offset = 40
_master_offset = 48
_view_offset = 72
_tp_repr_offset = 104
else:
# under normal builds
_f_localsplus_offset = 376
_ob_item_offset = 24
_view_offset = 56
_sizeof_pyobject = 16
_mbuf_offset = 24
_master_offset = 32
_view_offset = 56
_tp_repr_offset = 88
# the maximum allowed offset from the address of the frame's locals
# to the target address measured in pointer counts.
_write_range = 2 ** 22
def _make_write_bytecode(range_):
"""Create bytecode that emits one branch for all values in
[-range_, range_] like:
if this_branch():
_n = value
where ``this_branch`` is a callable that checks if we should assign to
the given index. ``value`` is constant 0. ``this_branch`` is not stored
in the locals, instead we just store it on the stack.
To do negative values, we overflow oparg with man EXTENDED_ARG calls.
"""
try:
return _make_write_bytecode._cache[range_]
except KeyError:
pass
cached_file_name = f'_make_write_bytecode.{range_}'
try:
with open(cached_file_name, 'rb') as f:
_make_write_bytecode._cache[range_] = ret = f.read()
return ret
except FileNotFoundError:
pass
code = bytearray((
dis.opmap['LOAD_CONST'], 0,
dis.opmap['YIELD_VALUE'], 0,
))
def check(n):
*extension, index = (n % 2 ** 32).to_bytes(4, 'big')
extension = [*filter(bool, extension)]
code.extend((
dis.opmap['DUP_TOP'], 0,
dis.opmap['CALL_FUNCTION'], 0,
dis.opmap['EXTENDED_ARG'], 0,
dis.opmap['EXTENDED_ARG'], 0,
dis.opmap['EXTENDED_ARG'], 0,
dis.opmap['POP_JUMP_IF_FALSE'], 0,
dis.opmap['LOAD_CONST'], 0,
))
ix = len(code) - 3
for ext in extension:
code.extend((dis.opmap['EXTENDED_ARG'], ext))
code.extend((
dis.opmap['STORE_FAST'], index,
))
jump_index = len(code)
*jump_extension, jump_index = jump_index.to_bytes(4, 'big')
code[ix - 6] = jump_extension[0]
code[ix - 4] = jump_extension[1]
code[ix - 2] = jump_extension[2]
code[ix] = jump_index
for n in range(range_):
check(n)
check(-n)
code.extend((
dis.opmap['LOAD_CONST'], 0,
dis.opmap['RETURN_VALUE'], 0
))
ret = _make_write_bytecode._cache[range_] = bytes(code)
with open(cached_file_name, 'wb') as f:
f.write(ret)
return ret
_make_write_bytecode._cache = {}
def _make_write_codeobject(range_, value):
return types.CodeType(
0,
0,
0,
2,
99,
_make_write_bytecode(range_),
(value,), # make co_consts[0] ``value`` to replace None
(),
(),
'<string>',
'<write-target-helper>',
1,
b'',
)
def _write_target_address(target_address, value, *, verbose=False):
# collection of frames to prevent deallocation
frames = []
while True:
# spray the heap with generators looking for a location where the
# frame's local variables are withing ``range_ * 8`` bytes of the
# target address
code = _make_write_codeobject(_write_range, value)
generator = types.FunctionType(code, {})()
locals_address = id(generator.gi_frame) + _f_localsplus_offset
offset_bytes = target_address - locals_address
offset_index = offset_bytes // 8
if -_write_range <= offset_index <= _write_range:
# we found a frame that is close enough to the target address;
# clear the saved frames and continue
frames.clear()
if verbose:
print(
f'found write frame within range:'
f' offset={offset_index};'
f' locals=0x{locals_address:x}'
f' target=0x{target_address:x}',
)
break
else:
if verbose:
print(
f'write frame too far away:'
f' offset={offset_index};'
f' locals=0x{locals_address:x}'
f' target=0x{target_address:x}',
)
# this frame isn't close enough; save it in memory so we don't try
# here again
frames.append(generator.gi_frame)
# make sure we set None to ``value`` and prime the generator
assert next(generator) is value, 'yielded the wrong value'
def branch_selector():
for n in range(_write_range):
if n == offset_index:
yield True
break
yield False
if -n == offset_index:
yield True
break
yield False
try:
# send the selector into the generator; this gets bound as
# ``this_branch``
generator.send(branch_selector().__next__)
except StopIteration:
# the branch after ``branch_selector`` returns True will raise a
# ``StopIteration``
pass
# the maximum allowed offset from the address of the frame's locals
# to the target address measured in pointer counts.
_load_range = 2 ** 22
def _make_load_bytecode(range_):
"""Create bytecode that emits one branch for all values in
[-range_, range_] like:
if this_branch():
if False:
_n = False # create a new local
yield _n
where ``this_branch`` is a callable that checks if we should assign to
the given index. ``this_branch`` is not stored
in the locals, instead we just store it on the stack.
To do negative values, we overflow oparg with man EXTENDED_ARG calls.
"""
try:
return _make_load_bytecode._cache[range_]
except KeyError:
pass
cached_file_name = f'_make_load_bytecode.{range_}'
try:
with open(cached_file_name, 'rb') as f:
_make_load_bytecode._cache[range_] = ret = f.read()
return ret
except FileNotFoundError:
pass
code = bytearray((
dis.opmap['LOAD_CONST'], 0,
dis.opmap['YIELD_VALUE'], 0,
))
def check(n):
*extension, index = (n % 2 ** 32).to_bytes(4, 'big')
extension = [*filter(bool, extension)]
code.extend((
dis.opmap['DUP_TOP'], 0,
dis.opmap['CALL_FUNCTION'], 0,
dis.opmap['EXTENDED_ARG'], 0,
dis.opmap['EXTENDED_ARG'], 0,
dis.opmap['EXTENDED_ARG'], 0,
dis.opmap['POP_JUMP_IF_FALSE'], 0,
))
ix = len(code) - 1
for ext in extension:
code.extend((dis.opmap['EXTENDED_ARG'], ext))
code.extend((
dis.opmap['LOAD_FAST'], index,
dis.opmap['YIELD_VALUE'], 0,
))
jump_index = len(code)
*jump_extension, jump_index = jump_index.to_bytes(4, 'big')
code[ix - 6] = jump_extension[0]
code[ix - 4] = jump_extension[1]
code[ix - 2] = jump_extension[2]
code[ix] = jump_index
for n in range(range_):
check(n)
check(-n)
code.extend((
dis.opmap['LOAD_CONST'], 0,
dis.opmap['RETURN_VALUE'], 0
))
ret = _make_load_bytecode._cache[range_] = bytes(code)
with open(cached_file_name, 'wb') as f:
f.write(ret)
return ret
_make_load_bytecode._cache = {}
def _make_load_codeobject(range_):
return types.CodeType(
0,
0,
0,
2,
99,
_make_load_bytecode(range_),
(None,),
(),
(),
'<string>',
'<load-target-helper>',
1,
b'',
)
def _load_target_address(target_address, *, verbose=False):
# collection of frames to prevent deallocation
frames = []
while True:
# spray the heap with generators looking for a location where the
# frame's local variables are withing ``range_ * 8`` bytes of the
# target address
code = _make_load_codeobject(_load_range)
generator = types.FunctionType(code, {})()
locals_address = id(generator.gi_frame) + _f_localsplus_offset
offset_bytes = target_address - locals_address
offset_index = offset_bytes // 8
if -_load_range <= offset_index <= _load_range:
# we found a frame that is close enough to the target address;
# clear the saved frames and continue
frames.clear()
if verbose:
print(
f'found load frame within range:'
f' offset={offset_index};'
f' locals=0x{locals_address:x}'
f' target=0x{target_address:x}',
)
break
else:
if verbose:
print(
f'load frame too far away:'
f' offset={offset_index};'
f' locals=0x{locals_address:x}'
f' target=0x{target_address:x}',
)
# this frame isn't close enough; save it in memory so we don't try
# here again
frames.append(generator.gi_frame)
# make sure we set None to ``value`` and prime the generator
assert next(generator) is None, 'yielded the wrong value'
def branch_selector():
for n in range(_write_range):
if n == offset_index:
yield True
break
yield False
if -n == offset_index:
yield True
break
yield False
# send the selector into the generator; this gets bound as
# ``this_branch``
return generator.send(branch_selector().__next__)
def tuple_setitem(t, ix, value, *, verbose=False):
"""``setitem`` for tuples.
Parameters
----------
t : tuple
The tuple to assign into.
ix : int
The index to assign to (without bounds checking).
value : any
The value to store at index ``ix``.
verbose : bool, optional
Print debugging information?
"""
# the target address is the id of the tuple + the offset of the first
# element + the index * the size of a pointer
target_address = id(t) + _ob_item_offset + ix * 8
_write_target_address(target_address, value, verbose=verbose)
def access_memory(ob, size, *, verbose=False):
"""Access the underlying memory for an object as a mutable memory view.
Parameters
----------
ob : object
The object to access
size : int
The number of bytes to access.
verbose : bool, optional
Print debugging information?
Returns
-------
data : memoryview
A view of ``size`` bytes starting at the address of ``ob``.
"""
underlying_memory = bytearray(size)
view = memoryview(underlying_memory)
managed_buffer = _load_target_address(
id(view) + _mbuf_offset,
verbose=verbose,
)
_write_target_address(
id(managed_buffer) + _master_offset,
ob,
verbose=verbose,
)
_write_target_address(
id(view) + _view_offset,
ob,
verbose=verbose,
)
return view
def execute_binary_payload(payload, *, verbose=False):
"""Execute an arbitrary compiled payload.
Parameters
----------
payload : bytes
The compiled payload to execute.
verbose : bool, optional
Print debugging information?
"""
buffer = mmap.mmap(-1, len(payload), prot=mmap.PROT_EXEC | mmap.PROT_WRITE)
buffer[:] = payload
target_address = access_memory(buffer, 64, verbose=verbose)[
_sizeof_pyobject:_sizeof_pyobject + 8
]
class C:
pass
mem = access_memory(C, 124, verbose=verbose)
mem[_tp_repr_offset:_tp_repr_offset + 8] = target_address
try:
repr(C())
except SystemError:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment