Skip to content

Instantly share code, notes, and snippets.

@spitz-dan-l
Last active March 16, 2019 17:07
Show Gist options
  • Save spitz-dan-l/3375a61fac6fe150574fac791567af4f to your computer and use it in GitHub Desktop.
Save spitz-dan-l/3375a61fac6fe150574fac791567af4f to your computer and use it in GitHub Desktop.
Python Atrocity: Graph-Transformation-Oriented Programming via Pickle
import pickle
import importlib
import pkg_resources
from operator import attrgetter
from functools import reduce
import io
from contextlib import ExitStack
from collections import defaultdict
"""
Alright so just to sketch out this abomination, here's what we do:
1. We pickle an arbitrary python object.
The object can have references to arbitrary other objects, forming a graph.
2. We "mock" unpickle it.
We use a custom "Mocking" Unpickler to unpickle embedded objects as "mock" objects,
allowing us to transform them arbitrarily anywhere inside the object graph.
3. We transform the mocked objects by pickling them.
We register arbitrary custom reducers to transform all the mocked objects
into anything we want. We apply these reducers by pickling the "mocked" object graph.
This triggers the custom reducers to run on all the embedded objects while avoiding cycles.
4. Finally, use a normal unpickler to unpickle the migrated blob.
5. (Optional) Iteratively repeat 1-4 on the results until no reductions are left to apply.
CURRENT LIMITATIONS:
- You cannot directly register transformations on primitive types. This can be worked around by wrapping
values in a custom class, and register your transformation on that class.
"""
class MockClass:
"""
Mock class that subs in for arbitrary source classes during unpickling.
Records any positional arguments passed to __new__ and any state passed to __setstate__.
Does not support keyword args passed to __new__. This is due to the limitations
inherent in using __reduce__() for pickling.
"""
_MockClass__transformer = None
def __init_subclass__(cls):
"""
Register the new subclass with the current transformer, if there is one.
"""
if cls._MockClass__transformer is not None:
cls._MockClass__transformer._mock_subclasses.add(cls)
# Default values
_MockClass__new_args = ()
def __new__(cls, *args):
"""
Simply record the positional arguments passed to __new__.
"""
obj = super(MockClass, cls).__new__(cls, *args)
obj.__new_args = args
return obj
def __setstate__(self, state):
"""
Simply record the state passed to __setstate__.
"""
self.__state = state
s = super(MockClass, self)
if hasattr(s, '__setstate__'):
s.__setstate__(state)
else:
self.__dict__.update(state)
def __reduce__(self):
"""
If there is a transformer associated with the MockClass, invoke
the reducer it has registered for this MockClass' source class.
Otherwise, invoke the trivial/default reduction of producing a
copy of this object.
"""
if self._MockClass__transformer is not None:
return self._MockClass__transformer._reduce_mock_component(self)
return (
self.__class__.__new__,
(self.__class__, *self.__new_args),
self.__state
)
def reify_reducer(self):
"""
Given a MockClass object representing the source,
return an instance of the destination object.
The default implementation simply calls __new__ and then
__setstate__ on the destination class, simulating the
process of unpickling.
"""
# I haven't *proven* it to myself, but I have a strong belief that the correct
# destination class will always be at index 2 (position 3) in the mock component's class' __mro__.
# This is even despite the possibility of the destination class using multiple inheritance;
# since the MockClass baseclass is a direct descendent of object, and it always appears first in
# the list of bases for MockComponent subclasses, the first two slots in the resulting __mro__
# will always be populated with (MockSubclass, MockBaseClass, ...).
dest_class = self.__class__.__mro__[2]
return (
dest_class.__new__,
(dest_class, *self._MockClass__new_args),
getattr(self, '_MockClass__state', None)
)
class MockUnpickler(pickle.Unpickler):
"""
Unpickler subclass that yields MockClass subclasses in place of arbitrary
source classes.
If a transformer is passed in, pass it to the generated subclasses.
"""
def __init__(self, f, transformer=None):
super().__init__(f)
self._transformer = transformer
def find_class(self, module, name):
if self._transformer is None or OGTransform._resolve_class(module + '.' + name) in self._transformer._get_transformable_classes():
mock_class = type(
'Mock_' + name,
(MockClass, super().find_class(module, name)),
{
'_MockClass__orig_module': module,
'_MockClass__orig_name': name,
'_MockClass__transformer': self._transformer
})
return mock_class
else:
return super().find_class(module, name)
class OGTransformError(Exception):
"""
Exception class for things that go wrong while transforming.
"""
class OGTransform:
"""
Object Graph Transform management class.
Manages a collection of transformations on mock objects referencing
arbitrary source classes.
"""
def __init__(self):
self._transformer_registry = {}
self._mock_subclasses = set()
self._active_transformations = []
self._active_transformation_types = defaultdict(list)
def _source_class_name(self, mock_component):
return '{}.{}'.format(
mock_component._MockClass__orig_module,
mock_component._MockClass__orig_name)
@classmethod
def _resolve_class(cls, dest_class_name):
parts = dest_class_name.split('.')
for i in range(len(parts) - 1, 0, -1):
module_name_candidate = '.'.join(parts[:i])
class_name_candidate = '.'.join(parts[i:])
try:
m = importlib.import_module(module_name_candidate)
dest_class = attrgetter(class_name_candidate)(m)
except (ImportError, AttributeError) as e:
continue
else:
break
else:
raise OGTransformError(
'Could not resolve destination class {}'.format(dest_class_name))
return dest_class
def _reduce_mock_component(self, mock_component):
source_class_name = self._source_class_name(mock_component)
source_class = self._resolve_class(source_class_name)
transformer = self._transformer_registry[source_class]
if transformer is None:
raise RuntimeError('No reducer found for {}'.format(source_class))
active_transformation_tag = self._active_transformations[-1]
self._active_transformation_types[active_transformation_tag].append(source_class_name)
result = transformer(mock_component)
if isinstance(result, MockClass):
return result.reify_reducer()
else: # it's a standard python object, create a reducer for it
return (_identity, (result,))
def register_transformer(self, source_class):
if source_class in self._transformer_registry:
raise NameError(
'Duplicate registration for source class {}.'.format(source_class))
self._transformer_registry[source_class] = None
def decorator(f):
if self._transformer_registry[source_class] is not None:
raise RuntimeError(
"Cannot register reducer for {}; there's another reducer here already.".format(source_class))
self._transformer_registry[source_class] = f
return f
return decorator
def transform(self, root, include_reduction_report=False):
with io.BytesIO() as buf:
pickle.dump(root, buf, protocol=-1)
buf.flush()
buf.seek(0)
root_mock = self._mock_unpickle(buf)
buf.seek(0)
buf.truncate()
result = self._transform_mock_object_graph(root_mock, buf, include_reduction_report=include_reduction_report)
return result
def transform_iterative(self, root, limit=None):
step = None
if limit is not None:
step = 0
while True:
root, reduction_report = self.transform(root, True)
if step is not None:
step += 1
if not reduction_report:
return root
if step is not None and step >= limit:
raise OGTransformError('Hit the step limit of {} while transforming.'.format(limit))
def _mock_unpickle(self, f):
unpickler = MockUnpickler(f, self)
mock_component = unpickler.load()
return mock_component
def _push_active_transformation(self):
active_transformation_tag = object()
self._active_transformations.append(active_transformation_tag)
def _pop_active_transformation(self):
tag = self._active_transformations.pop()
result = self._active_transformation_types.pop(tag, [])
return result
def _transform_mock_object_graph(self, root_mock, buf=None, include_reduction_report=False):
with ExitStack() as ctx:
if buf is None:
buf = ctx.enter_context(io.BytesIO())
reduction_report = None
self._push_active_transformation()
@ctx.callback
def grab_report():
nonlocal reduction_report
reduction_report = self._pop_active_transformation()
migration_pkl = pickle.dump(root_mock, buf, protocol=-1)
buf.flush()
buf.seek(0)
result = pickle.load(buf)
if include_reduction_report:
return result, reduction_report
else:
return result
def _get_transformable_classes(self):
return list(self._transformer_registry.keys())
def _identity(x):
"""
Identity function, must be defined globally so that it can be used as the callable in reduce-values.
"""
return x
if __name__ == '__main__':
t1 = OGTransform()
@t1.register_transformer(int)
def int2str(i):
return str(i)
t1.transform(23)
class Foo:
class Foo2:
def __init__(self, foo2):
self.foo2 = foo2
def __init__(self, foo):
self.foo = foo
class Bar:
def __init__(self, bar):
self.bar = bar
# OGTransform for going from Foo -> Bar
t = OGTransform()
@t.register_transformer(Foo)
def foo_to_bar(foo):
return Bar(foo.foo)
# just to show that inner classes work too
@t.register_transformer(Foo.Foo2)
def foo2_to_bar(foo2):
return Bar(foo2.foo2)
obj_foo = [Foo.Foo2(3), Foo('horse'), Foo([Foo.Foo2(1), Foo(3)])]
obj_bar = t.transform(obj_foo)
"""
Integer math expression parser/evaluator!!
Mini expression language:
"3 + 5" -> 8
"(4 + 3 + 2)" -> 9
"4 + (3 + 2 + (1))" -> 10
We will tokenize, parse and evaluate these expressions
using nothing but Pickle-based graph transformations!!!!
"""
import re
from pprint import pprint
class ParseError(Exception):
"""
Exception class for parse-time errors
"""
class Text:
"""
Contains raw text that will be parsed.
"""
def __init__(self, text):
self.text = text
def __repr__(self):
return 'Text({})'.format(repr(self.text))
class Tokens:
"""
Contains a sequence of tokens.
"""
def __init__(self, tokens):
self.tokens = tokens
def __repr__(self):
return 'Tokens({})'.format(repr(self.tokens))
# OGTransform for going form Text -> Tokens
tokenizer = OGTransform()
@tokenizer.register_transformer(Text)
def tokenize(text):
txt = text.text
tok_re = re.compile('[0-9]+|[+\(\)]')
toks = re.findall(tok_re, txt)
return Tokens(toks)
class Expression:
"""
Base Expression class.
Represents a particular subexpression in the expression being parsed.
"""
class ConstExpression(Expression):
def __init__(self, value):
self.value = value
def __repr__(self):
return 'ConstExpression({})'.format(repr(self.value))
class PlusExpression(Expression):
def __init__(self, left, right):
self.left = left
self.right = right
def __repr__(self):
return 'PlusExpression({}, {})'.format(repr(self.left), repr(self.right))
class ParenExpression(Expression):
def __init__(self, contents):
self.contents = contents
def __repr__(self):
return 'ParenExpression({})'.format(repr(self.contents))
# OGTransform for going from Tokens -> Expression
parser = OGTransform()
@parser.register_transformer(Tokens)
def parse1(tokens):
toks = tokens.tokens[:]
if len(toks) == 0:
raise ParseError('Cannot parse empty token stream')
if len(toks) == 1:
if isinstance(toks[0], Expression):
return toks[0]
elif toks[0].isdecimal():
return ConstExpression(toks[0])
else:
raise ParseError('Invalid lone token {}'.format(toks[0]))
def _last_index(lst, value):
return len(lst) - 1 - lst[::-1].index(value)
# parens
if '(' in toks:
l_idx = _last_index(toks, '(')
r_idx = toks.index(')', l_idx)
toks[l_idx:r_idx+1] = [ParenExpression(Tokens(toks[l_idx+1:r_idx]))]
return Tokens(toks)
# operator is plus, find the last one to be evaluated
if '+' in toks:
idx = _last_index(toks, '+')
return PlusExpression(Tokens(toks[:idx]), Tokens(toks[idx + 1:]))
else:
raise ParseError('Cannot further reduce expression {}'.format(toks))
class Value:
"""
Contains the value corresponding to a particular subexpression.
"""
def __init__(self, value):
self.value = value
def __repr__(self):
return 'Value({})'.format(repr(self.value))
# OGTransform for going from Expression -> Value
evaluator = OGTransform()
@evaluator.register_transformer(ConstExpression)
def eval_constant(constant):
return Value(int(constant.value))
@evaluator.register_transformer(PlusExpression)
def eval_plus(plus):
left = plus.left
right = plus.right
if isinstance(left, Value) and isinstance(right, Value):
return Value(left.value + right.value)
return plus
@evaluator.register_transformer(ParenExpression)
def eval_parens(parens):
value = parens.contents
if isinstance(value, Value):
return Value(value.value)
return parens
# OGTransform for going from Value -> int
reifier = OGTransform()
@reifier.register_transformer(Value)
def reify(value):
return value.value
def evaluate(text_obj_graph, step_limit=None):
"""
Strings together tokenizer -> parser -> evaluator -> reifier transforms.
"""
print('input:')
pprint(text_obj_graph)
token_obj_graph = tokenizer.transform(text_obj_graph)
print('tokens:')
pprint(token_obj_graph)
expr_obj_graph = parser.transform_iterative(token_obj_graph, step_limit)
print('expressions:')
pprint(expr_obj_graph)
value_obj_graph = evaluator.transform_iterative(expr_obj_graph, step_limit)
print('values:')
pprint(value_obj_graph)
primitive_obj_graph = reifier.transform(value_obj_graph)
print('primitives:')
pprint(primitive_obj_graph)
return primitive_obj_graph
# Our input data containing various pieces of raw text to parse and evaluate.
text_list = [
Text('2 + 3'),
Text('43+18'),
Text('1+2+3'),
Text('3 + ((4 + 5) + 6 + (7 + 8))'),
Text('(3+1)'),
Text('5'),
Text('(((4)))')
]
text_list.append(text_list) # This is what makes the whole thing worth it!
# The cool bit is that the Text objects can appear anywhere in the object graph.
# Our transformations will find them and transform them,
# while preserving the shape of the overall graph.
result_list = evaluate(text_list) # [5, 61, 6, 33, 4, 5, 4, ...]
@spitz-dan-l
Copy link
Author

Py3.5+

The next in my series of Python Atrocities.

Pickle is a cool module. It lets you serialize arbitrary python objects (subject to certain limitations), including ones with reference cycles.

Pickle is also extremely configurable; you can override how objects are pickled and unpickled, while still getting the benefits of pickle's object graph traversal, cycle detection, and power.

So why not operationalize pickle to do more than serialize objects? We can effectively use it to transform objects in arbitrary ways. And those transformations will work on entire object graphs, without having to write any traversal logic of our own!

Here I introduce a module for applying arbitrary transformations to object graphs, based on the types of the embedded objects.

Then as a demo, I show how it can be used to implement a parser/evaluator for a toy expression language.

What we have here is a bold new programming paradigm! Graph-Transform-Oriented Programming, via Pickle!

Bear in mind that every time you apply a transformation to an object graph this way, you are incurring the compute cost of pickling and unpickling it twice. A small price to pay for not having to write traversal logic, if you ask me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment