Skip to content

Instantly share code, notes, and snippets.

@lordmauve
Created May 28, 2021 16:12
Show Gist options
  • Save lordmauve/c338b85ec5f48ebbc06dd67d3018ae24 to your computer and use it in GitHub Desktop.
Save lordmauve/c338b85ec5f48ebbc06dd67d3018ae24 to your computer and use it in GitHub Desktop.
Convert a Python pickle to equivalent Python source code
# Copyright 2021 Two Sigma Open Source LLC.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import pickletools
import pickle
import warnings
from typing import Any
from dataclasses import dataclass
_handlers = {}
def handle(name):
return lambda f: _handlers.setdefault(name, f)
@handle('PROTO')
def proto(arg, stack):
print(f"__proto__ = {arg!r}")
def is_interned(o):
"""Decide if object o is interned by Python.
If an object is interned by Python we don't need to preserve referential
integrity.
"""
return (
isinstance(o, (str, int))
or (isinstance(o, tuple) and all(map(is_interned, o)))
)
@handle('BINPUT')
def memoize(arg, stack, memos):
top = stack.pop()
m = memos.set(arg, top)
#print(f'{name} = {top!r}')
if is_interned(top):
# Python interns these so we don't need to call back to the memoized
# object
stack.append(top)
else:
stack.append(m)
@handle('MEMOIZE')
def memoize(arg, stack, memos):
memos.append(stack[-1])
#print(f'memo.append({stack[-1]!r})')
@dataclass
class Class:
mod: str
cls: str
def __repr__(self):
return f"{self.mod}.{self.cls}"
@handle('GLOBAL')
def global_(arg, stack, imports):
mod, cls = arg.split()
_mkclass(mod, cls, stack, imports)
def _mkclass(mod, cls, stack, imports):
if mod == 'builtins':
stack.append(Raw(cls))
return
if mod not in imports:
print(f'import {mod}')
imports.add(mod)
stack.append(Class(mod, cls))
@handle('STACK_GLOBAL')
def stack_global(arg, stack, imports):
cls = unwrap(stack.pop())
mod = unwrap(stack.pop())
_mkclass(mod, cls, stack, imports)
@handle('BINFLOAT')
@handle('BININT1')
@handle('BININT2')
@handle('BINUNICODE')
@handle('SHORT_BINUNICODE')
@handle('SHORT_BINBYTES')
def val(arg, stack):
stack.append(arg)
@handle('NONE')
def none(arg, stack):
stack.append(None)
@handle('EMPTY_TUPLE')
def empty_tuple(arg, stack):
stack.append(())
@handle('EMPTY_LIST')
def empty_list(arg, stack):
stack.append([])
def mktuple(n: int):
def f(stack):
top = stack[-n:]
del stack[-n:]
stack.append(tuple(top))
return f
handle('TUPLE1')(mktuple(1))
handle('TUPLE2')(mktuple(2))
handle('TUPLE3')(mktuple(3))
@handle('STOP')
def stop(arg, stack):
print(f'return {stack.pop()!r}')
@handle('EMPTY_DICT')
def empty_dict(arg, stack):
stack.append({})
@dataclass
class Raw:
_repr: str
def __repr__(self):
return self._repr
@handle('BINGET')
def binget(arg, stack, memos):
stack.append(memos.get(arg))
marker = object()
@handle('MARK')
def mark(arg, stack):
stack.append(marker)
def pop_to_marker(stack):
for idx in range(len(stack) - 1, -1, -1):
if stack[idx] is marker:
break
else:
raise IndexError(f"no marker found in stack: {stack}")
items = stack[idx + 1:]
del stack[idx:]
return items
@handle('TUPLE')
def tuple_(arg, stack):
stack.append(tuple(pop_to_marker(stack)))
@handle('FROZENSET')
def frozenset_(arg, stack):
stack.append(frozenset(pop_to_marker(stack)))
def unwrap(o):
"""Unwrap a memo object so that it can be modified."""
if isinstance(o, Memo):
return o.value
return o
@handle('SETITEMS')
def setitems(arg, stack):
items = pop_to_marker(stack)
d = unwrap(stack[-1])
for k, v in zip(items[::2], items[1::2]):
d[k] = v
@handle('APPEND')
def append(arg, stack):
top = stack.pop()
unwrap(stack[-1]).append(top)
@handle('APPENDS')
def appends(arg, stack):
items = pop_to_marker(stack)
unwrap(stack[-1]).extend(items)
@dataclass
class Call:
callable: Any
args: tuple
def __repr__(self):
if isinstance(self.args, (Raw, Memo)):
return f'{self.callable!r}(*{self.args!r})'
elif len(self.args) == 1:
return f'{self.callable!r}({self.args[0]!r})'
return f'{self.callable!r}{self.args!r}'
@handle('FRAME')
def frame(arg, stack):
"""Opcode relating to data stream, not funtional."""
@dataclass
class GetAttr:
obj: Any
name: str
def __repr__(self):
return f'{self.obj!r}.{self.name}'
@handle('REDUCE')
def reduce(arg, stack):
args = stack.pop()
f = stack.pop()
if f == Raw('getattr') and len(args) == 2 and isinstance(args[1], str):
result = GetAttr(*args)
else:
result = Call(f, args)
stack.append(result)
@dataclass
class NewObj:
cls: Any
args: Any
def __repr__(self):
arg = '' if unwrap(self.args) == () else ", *{self.args!r}"
return f"{self.cls!r}.__new__({self.cls!r}{arg})"
@handle('NEWOBJ')
def newobj(stack):
args = stack.pop()
cls = stack.pop()
dup(cls)
stack.append(NewObj(cls, args))
@dataclass
class Build:
obj: Any
param: Any
def __repr__(self):
return f'build({self.obj!r}, {self.param!r})'
@handle('BUILD')
def build(arg, stack, ctx):
if 'have_build' not in ctx:
print("""\n
def build(obj, data):
setstate = getattr(obj, '__setstate__', None) or obj.__dict__.update
setstate(data)
return obj
""")
ctx['have_build'] = True
data = stack.pop()
obj = stack.pop()
stack.append(Build(obj, data))
def dup(obj):
"""Duplicate a reference to an object."""
if isinstance(obj, Memo):
return obj.dup()
return obj
@handle('SETITEM')
def setitem(arg, stack, ctx):
value = stack.pop()
key = stack.pop()
stack[-1][key] = value
@handle('POP')
def pop(arg, stack):
print(repr(stack.pop()))
@dataclass
class Memo:
memos: 'Memos'
name: str
value: Any
refcount: int = 1
printed: bool = False
def dup(self):
self.refcount += 1
return self
def __hash__(self):
return id(self)
def __repr__(self):
if self.refcount == 1 or isinstance(self.value, (str, int, bytes, float, Class)):
return repr(self.value)
else:
if not self.printed:
print(f'{self.name} = {self.value!r}')
self.printed = True
return self.name
class Memos:
def __init__(self):
self.memos = {}
def set(self, id, value) -> Memo:
"""Assign the given memo value."""
assert id not in self.memos, \
"Reassigning memos is not currently supported"
name = f'_memo{id + 1}'
m = Memo(self, name, value)
self.memos[id] = m
return m
def append(self, value) -> Memo:
"""Memoise the given value."""
return self.set(len(self.memos), value)
def get(self, id):
"""Read the given memo ids."""
m = self.memos[id]
m.refcount += 1
return m
def to_py(bs: bytes):
"""Print Python code corresponding to the given pickle bytes."""
ctx = {
'stack': [],
'memos': Memos(),
'imports': set(),
}
ctx['ctx'] = ctx
for opcode, arg, position in pickletools.genops(bs):
#print(opcode.name, arg)
try:
h = _handlers[opcode.name]
except KeyError:
warnings.warn(f"No handler for {opcode.name} (arg {arg}):\n{opcode.doc}", stacklevel=2)
else:
c = h.__code__
names = c.co_varnames[:c.co_argcount]
ctx['arg'] = arg
h(*[ctx[name] for name in names])
def py_for_pickled(obj, protocol=-1):
"""Print the Python code for how obj should be pickled."""
bs = pickle.dumps(obj, protocol)
to_py(bs)
if __name__ == '__main__':
import sys
if sys.argv[1:2] == ['-f']:
to_py(open(sys.argv[2], 'rb').read())
else:
import datetime
bs = pickle.dumps(datetime.datetime.now(datetime.timezone.utc), -1)
to_py(bs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment