Created
May 28, 2021 16:12
-
-
Save lordmauve/c338b85ec5f48ebbc06dd67d3018ae24 to your computer and use it in GitHub Desktop.
Convert a Python pickle to equivalent Python source code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2021 Two Sigma Open Source LLC. | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import pickletools | |
import pickle | |
import warnings | |
from typing import Any | |
from dataclasses import dataclass | |
_handlers = {} | |
def handle(name): | |
return lambda f: _handlers.setdefault(name, f) | |
@handle('PROTO') | |
def proto(arg, stack): | |
print(f"__proto__ = {arg!r}") | |
def is_interned(o): | |
"""Decide if object o is interned by Python. | |
If an object is interned by Python we don't need to preserve referential | |
integrity. | |
""" | |
return ( | |
isinstance(o, (str, int)) | |
or (isinstance(o, tuple) and all(map(is_interned, o))) | |
) | |
@handle('BINPUT') | |
def memoize(arg, stack, memos): | |
top = stack.pop() | |
m = memos.set(arg, top) | |
#print(f'{name} = {top!r}') | |
if is_interned(top): | |
# Python interns these so we don't need to call back to the memoized | |
# object | |
stack.append(top) | |
else: | |
stack.append(m) | |
@handle('MEMOIZE') | |
def memoize(arg, stack, memos): | |
memos.append(stack[-1]) | |
#print(f'memo.append({stack[-1]!r})') | |
@dataclass | |
class Class: | |
mod: str | |
cls: str | |
def __repr__(self): | |
return f"{self.mod}.{self.cls}" | |
@handle('GLOBAL') | |
def global_(arg, stack, imports): | |
mod, cls = arg.split() | |
_mkclass(mod, cls, stack, imports) | |
def _mkclass(mod, cls, stack, imports): | |
if mod == 'builtins': | |
stack.append(Raw(cls)) | |
return | |
if mod not in imports: | |
print(f'import {mod}') | |
imports.add(mod) | |
stack.append(Class(mod, cls)) | |
@handle('STACK_GLOBAL') | |
def stack_global(arg, stack, imports): | |
cls = unwrap(stack.pop()) | |
mod = unwrap(stack.pop()) | |
_mkclass(mod, cls, stack, imports) | |
@handle('BINFLOAT') | |
@handle('BININT1') | |
@handle('BININT2') | |
@handle('BINUNICODE') | |
@handle('SHORT_BINUNICODE') | |
@handle('SHORT_BINBYTES') | |
def val(arg, stack): | |
stack.append(arg) | |
@handle('NONE') | |
def none(arg, stack): | |
stack.append(None) | |
@handle('EMPTY_TUPLE') | |
def empty_tuple(arg, stack): | |
stack.append(()) | |
@handle('EMPTY_LIST') | |
def empty_list(arg, stack): | |
stack.append([]) | |
def mktuple(n: int): | |
def f(stack): | |
top = stack[-n:] | |
del stack[-n:] | |
stack.append(tuple(top)) | |
return f | |
handle('TUPLE1')(mktuple(1)) | |
handle('TUPLE2')(mktuple(2)) | |
handle('TUPLE3')(mktuple(3)) | |
@handle('STOP') | |
def stop(arg, stack): | |
print(f'return {stack.pop()!r}') | |
@handle('EMPTY_DICT') | |
def empty_dict(arg, stack): | |
stack.append({}) | |
@dataclass | |
class Raw: | |
_repr: str | |
def __repr__(self): | |
return self._repr | |
@handle('BINGET') | |
def binget(arg, stack, memos): | |
stack.append(memos.get(arg)) | |
marker = object() | |
@handle('MARK') | |
def mark(arg, stack): | |
stack.append(marker) | |
def pop_to_marker(stack): | |
for idx in range(len(stack) - 1, -1, -1): | |
if stack[idx] is marker: | |
break | |
else: | |
raise IndexError(f"no marker found in stack: {stack}") | |
items = stack[idx + 1:] | |
del stack[idx:] | |
return items | |
@handle('TUPLE') | |
def tuple_(arg, stack): | |
stack.append(tuple(pop_to_marker(stack))) | |
@handle('FROZENSET') | |
def frozenset_(arg, stack): | |
stack.append(frozenset(pop_to_marker(stack))) | |
def unwrap(o): | |
"""Unwrap a memo object so that it can be modified.""" | |
if isinstance(o, Memo): | |
return o.value | |
return o | |
@handle('SETITEMS') | |
def setitems(arg, stack): | |
items = pop_to_marker(stack) | |
d = unwrap(stack[-1]) | |
for k, v in zip(items[::2], items[1::2]): | |
d[k] = v | |
@handle('APPEND') | |
def append(arg, stack): | |
top = stack.pop() | |
unwrap(stack[-1]).append(top) | |
@handle('APPENDS') | |
def appends(arg, stack): | |
items = pop_to_marker(stack) | |
unwrap(stack[-1]).extend(items) | |
@dataclass | |
class Call: | |
callable: Any | |
args: tuple | |
def __repr__(self): | |
if isinstance(self.args, (Raw, Memo)): | |
return f'{self.callable!r}(*{self.args!r})' | |
elif len(self.args) == 1: | |
return f'{self.callable!r}({self.args[0]!r})' | |
return f'{self.callable!r}{self.args!r}' | |
@handle('FRAME') | |
def frame(arg, stack): | |
"""Opcode relating to data stream, not funtional.""" | |
@dataclass | |
class GetAttr: | |
obj: Any | |
name: str | |
def __repr__(self): | |
return f'{self.obj!r}.{self.name}' | |
@handle('REDUCE') | |
def reduce(arg, stack): | |
args = stack.pop() | |
f = stack.pop() | |
if f == Raw('getattr') and len(args) == 2 and isinstance(args[1], str): | |
result = GetAttr(*args) | |
else: | |
result = Call(f, args) | |
stack.append(result) | |
@dataclass | |
class NewObj: | |
cls: Any | |
args: Any | |
def __repr__(self): | |
arg = '' if unwrap(self.args) == () else ", *{self.args!r}" | |
return f"{self.cls!r}.__new__({self.cls!r}{arg})" | |
@handle('NEWOBJ') | |
def newobj(stack): | |
args = stack.pop() | |
cls = stack.pop() | |
dup(cls) | |
stack.append(NewObj(cls, args)) | |
@dataclass | |
class Build: | |
obj: Any | |
param: Any | |
def __repr__(self): | |
return f'build({self.obj!r}, {self.param!r})' | |
@handle('BUILD') | |
def build(arg, stack, ctx): | |
if 'have_build' not in ctx: | |
print("""\n | |
def build(obj, data): | |
setstate = getattr(obj, '__setstate__', None) or obj.__dict__.update | |
setstate(data) | |
return obj | |
""") | |
ctx['have_build'] = True | |
data = stack.pop() | |
obj = stack.pop() | |
stack.append(Build(obj, data)) | |
def dup(obj): | |
"""Duplicate a reference to an object.""" | |
if isinstance(obj, Memo): | |
return obj.dup() | |
return obj | |
@handle('SETITEM') | |
def setitem(arg, stack, ctx): | |
value = stack.pop() | |
key = stack.pop() | |
stack[-1][key] = value | |
@handle('POP') | |
def pop(arg, stack): | |
print(repr(stack.pop())) | |
@dataclass | |
class Memo: | |
memos: 'Memos' | |
name: str | |
value: Any | |
refcount: int = 1 | |
printed: bool = False | |
def dup(self): | |
self.refcount += 1 | |
return self | |
def __hash__(self): | |
return id(self) | |
def __repr__(self): | |
if self.refcount == 1 or isinstance(self.value, (str, int, bytes, float, Class)): | |
return repr(self.value) | |
else: | |
if not self.printed: | |
print(f'{self.name} = {self.value!r}') | |
self.printed = True | |
return self.name | |
class Memos: | |
def __init__(self): | |
self.memos = {} | |
def set(self, id, value) -> Memo: | |
"""Assign the given memo value.""" | |
assert id not in self.memos, \ | |
"Reassigning memos is not currently supported" | |
name = f'_memo{id + 1}' | |
m = Memo(self, name, value) | |
self.memos[id] = m | |
return m | |
def append(self, value) -> Memo: | |
"""Memoise the given value.""" | |
return self.set(len(self.memos), value) | |
def get(self, id): | |
"""Read the given memo ids.""" | |
m = self.memos[id] | |
m.refcount += 1 | |
return m | |
def to_py(bs: bytes): | |
"""Print Python code corresponding to the given pickle bytes.""" | |
ctx = { | |
'stack': [], | |
'memos': Memos(), | |
'imports': set(), | |
} | |
ctx['ctx'] = ctx | |
for opcode, arg, position in pickletools.genops(bs): | |
#print(opcode.name, arg) | |
try: | |
h = _handlers[opcode.name] | |
except KeyError: | |
warnings.warn(f"No handler for {opcode.name} (arg {arg}):\n{opcode.doc}", stacklevel=2) | |
else: | |
c = h.__code__ | |
names = c.co_varnames[:c.co_argcount] | |
ctx['arg'] = arg | |
h(*[ctx[name] for name in names]) | |
def py_for_pickled(obj, protocol=-1): | |
"""Print the Python code for how obj should be pickled.""" | |
bs = pickle.dumps(obj, protocol) | |
to_py(bs) | |
if __name__ == '__main__': | |
import sys | |
if sys.argv[1:2] == ['-f']: | |
to_py(open(sys.argv[2], 'rb').read()) | |
else: | |
import datetime | |
bs = pickle.dumps(datetime.datetime.now(datetime.timezone.utc), -1) | |
to_py(bs) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment