Skip to content

Instantly share code, notes, and snippets.

@stinos stinos/rpc.py
Created May 7, 2020

Embed
What would you like to do?
MicroPython Rpc
import re
import unittest
class RpcIterator:
@staticmethod
def IteratorName(name):
# Assumption here is this string is not an existing variable.
# Otherwise it gets replaced.
return name.replace('.', '_') + '_iter'
@staticmethod
def SentinelName(name):
# Assumption here is this string is not a value in the remote iterable.
# Otherwise iteration stops when this value is returned.
return name.replace('.', '_') + '_nextsentinelvalue'
""" Iteration over remote iterator, see RpcProxy.__iter__. """
def __init__(self, execute, evaluate, iterableName):
self._exec = execute
self._eval = evaluate
self._name = RpcIterator.IteratorName(iterableName)
self._sentinel = RpcIterator.SentinelName(iterableName)
# Setup an iterator remotely, represented by this object.
# Iteration then consists of calling next remotely until the result value equals a sentinel.
execute('{}=iter({})'.format(self._name, iterableName))
def __iter__(self):
return self
def __next__(self):
# Standard says: "Once an iterator’s __next__() method raises StopIteration,
# it must continue to do so on subsequent calls." so we can deal with this locally.
if not self._sentinel:
raise StopIteration()
result = self._eval("next({},'{}')".format(self._name, self._sentinel))
if result == self._sentinel:
self._sentinel = None
self._exec('del {}'.format(self._name))
raise StopIteration()
return result
class RpcProxy(object):
""" Translate Python code into string representation for (remote) execution.
Two modes are used:
- the default, where calling a function (i.e. like __call__ or __len__) or statement
immediately results in a remote call (and returns the value in case of an expression).
Getting an attribute's value is always done using an explicit call to Eval()
because the object returned from __getattr__ etc is RpcProxy itself otherwise
we cannot have nesting.
- a 'manual' mode (created using Manual() instead of the constructor) where the remote
call has to be made explicitly by calling Eval() or Exec(), meaning expressions can be chained.
The fetch function gets a string representation of an attribute and returns
the attribute's value. While this is essentially the same as what evaluate
does, it's separate because it can be implemented by lookup in globals() so
without having to parse/compile/execute the string as code.
The execute function gets a string and executes it, like builtin exec().
The evaluate function does the same but returns the result, like builtin eval().
Using RpcProxy as an argument for a call to RpcProxy substitutes it with
it's current string representation, to support this:
x = proxy.foo.bar
otherProxy.SomeCall(x) # Resulting string is SomeCall(foo.bar).
Some attributes are reserved meaning they won't be turned into a string:
basically everything which we set in the constructor and all methods defined here.
To workaround strings can be appended manually using Literal().
"""
def __init__(self, fetch, execute, evaluate, name=''):
self._SetAttr('_manual', False)
self._SetAttr('_fetch', fetch)
self._SetAttr('_exec', execute)
self._SetAttr('_eval', evaluate)
self._SetAttr('_name', name)
self._SetAttr('Eval', self._Fetch)
@staticmethod
def Manual(execute, evaluate, name=''):
# Eval is used to execute and return result so replace fetch
# with evaluate, and replace execute/execute with _Store so
# they don't actually do anything, and add a manual call to execute.
obj = RpcProxy(evaluate, None, None, name)
obj._SetAttr('Exec', obj._Exec)
obj._SetAttr('_manualexec', execute)
obj._SetAttr('_exec', obj._Store)
obj._SetAttr('_eval', obj._Store)
obj._SetAttr('_manual', True)
return obj
def _SetAttr(self, attr, value):
super().__setattr__(attr, value)
def _DottedName(self, attr):
self._SetAttr('_name', self._name + '.' + attr if self._name else attr)
def _ArgValue(self, arg):
# If an argument is an RpcProxy, substitute it by its remote name to allow
# x = rpc.foo.bar; rpc.SomeCall(x) # Will execute SomeCall(foo.bar)
return arg._name if type(arg) is RpcProxy else repr(arg)
def _ArgString(self, args, kwargs):
a = ','.join(self._ArgValue(arg) for arg in args)
if kwargs:
if a:
a += ','
for k, v in kwargs.items():
a += '{}={},'.format(k, self._ArgValue(v))
return a
def _Fetch(self):
return self._fetch(self._name)
def _Exec(self):
assert(self._manual)
self._manualexec(self._name)
def _Store(self, what):
assert(self._manual)
self._SetAttr('_name', what)
return self
def Literal(self, what):
self._SetAttr('_name', self._name + what)
# Since what could have a call we cannot use _fetch anymore.
self._SetAttr('_fetch', self._eval)
return self
def __getattr__(self, attr):
self._DottedName(attr)
return self
def __setattr__(self, attr, value):
self._DottedName(attr)
self._exec('{}={}'.format(self._name, self._ArgValue(value)))
def __getitem__(self, key):
self._SetAttr('_name', self._name + '[{}]'.format(self._ArgValue(key)))
if not self._manual:
# Since _name now has a call in it we cannot use _fetch anymore.
self._SetAttr('_fetch', self._eval)
return self
def __setitem__(self, key, value):
self.__getitem__(key)
self._exec('{}={}'.format(self._name, self._ArgValue(value)))
def __delitem__(self, key):
self.__getitem__(key)
self._exec('del {}'.format(self._name))
def __iter__(self):
if self._manual:
return self._eval("iter({})".format(self._name))
return RpcIterator(self._exec, self._eval, self._name)
def __next__(self):
if self._manual:
return self._eval("next({})".format(self._name))
sentinel = RpcIterator.SentinelName(self._name)
result = self._eval("next({},'{}')".format(self._name, sentinel))
if result == sentinel:
raise StopIteration()
return result
def __contains__(self, key):
return self._eval('{} in {}'.format(self._ArgValue(key), self._name))
def __len__(self):
return self._eval('len({})'.format(self._name))
def __call__(self, *args, **kwargs):
result = self._eval('{}({})'.format(self._name, self._ArgString(args, kwargs)))
if not self._manual:
# Simple partial reuse support: split last attribute; won't work for reuse with nested attribute calls.
self._SetAttr('_name', '.'.join(self._name.split('.')[0:-1]))
return result
class RpcHost:
def __init__(self, fetch, execute, evaluate):
super().__setattr__('fetch', fetch)
super().__setattr__('execute', execute)
super().__setattr__('evaluate', evaluate)
def Manual(self, name=''):
return RpcProxy.Manual(self.execute, self.evaluate, name)
def __getattr__(self, attr):
return RpcProxy(self.fetch, self.execute, self.evaluate, attr)
def __setattr__(self, attr, value):
return RpcProxy(self.fetch, self.execute, self.evaluate).__setattr__(attr, value)
class IteratorExec:
def __init__(self):
self.iterCount = 0
self.args = []
def __call__(self, arg):
self.args.append(arg)
try:
# For testing iteration we must return the sentinel value in
# order to be able to stop it.
sentinel = re.match(r"next\(\w+,'(\w+)'\)", arg).group(1)
self.iterCount += 1
if self.iterCount > 2:
return sentinel
except:
pass
return self.iterCount
class Test(unittest.TestCase):
def setUp(self):
self.value = 0
self.fe = None
self.ex = None
self.ev = None
def Host(self):
return RpcHost(self.Fetch, self.Exec, self.Eval)
def Fetch(self, attr):
self.fe = attr
return self.value
def Exec(self, what):
self.ex = what
return self.value
def Eval(self, what):
self.ev = what
return self.value
def AssertResult(self, fetched=None, executed=None, evaluated=None):
self.assertEqual(self.fe, fetched)
self.assertEqual(self.ex, executed)
self.assertEqual(self.ev, evaluated)
def test_AttributesAndEval(self):
rpc = self.Host()
self.assertIs(rpc.a.Eval(), self.value)
self.AssertResult(fetched='a')
x = rpc.a.b.c
self.assertIs(x.Eval(), self.value)
self.AssertResult(fetched='a.b.c')
self.setUp()
x = rpc.a.b[1].c
self.assertIs(x.Eval(), self.value)
self.AssertResult(evaluated='a.b[1].c')
def test_ProxyAttributeSetting(self):
rpc = self.Host()
proxy = rpc.a.b
proxy._SetAttr('foo', 'bar')
self.assertEqual(proxy.foo, 'bar')
self.AssertResult(None)
def test_AttributeSetting(self):
rpc = self.Host()
rpc.a = 2
self.AssertResult(executed='a=2')
rpc.a.b.c = 2
self.AssertResult(executed='a.b.c=2')
def test_Indexing(self):
rpc = self.Host()
rpc.a[0] = 2
self.AssertResult(executed='a[0]=2')
rpc.a.b.c['key'] = rpc.a
self.AssertResult(executed="a.b.c['key']=a")
self.setUp()
self.assertIs(rpc.a.b[2].Eval(), self.value)
self.AssertResult(evaluated='a.b[2]')
self.setUp()
self.assertIs(rpc.a.b[2].Foo(), self.value)
self.AssertResult(evaluated='a.b[2].Foo()')
self.setUp()
del rpc.a[0]
self.AssertResult(executed='del a[0]')
def test_Iteration(self):
iteratorExec = IteratorExec()
rpc = RpcHost(self.Fetch, iteratorExec, iteratorExec)
for index, i in enumerate(rpc.x.someIterable):
if index == 0:
self.assertEqual("x_someIterable_iter=iter(x.someIterable)", iteratorExec.args[0])
self.assertEqual("next(x_someIterable_iter,'x_someIterable_nextsentinelvalue')", iteratorExec.args[1])
else:
self.assertEqual("next(x_someIterable_iter,'x_someIterable_nextsentinelvalue')", iteratorExec.args[0])
iteratorExec.args.clear()
self.assertEqual(index + 1, iteratorExec.iterCount)
self.assertIs(i, iteratorExec.iterCount)
self.assertEqual(3, iteratorExec.iterCount)
def test_CollectionOperations(self):
rpc = self.Host()
self.assertIs(len(rpc.a.b), self.value)
self.AssertResult(evaluated='len(a.b)')
self.assertIs(len(rpc.a.b[0]), self.value)
self.AssertResult(evaluated='len(a.b[0])')
self.assertIs(rpc.x in rpc.a.b[0].l, self.value)
self.AssertResult(evaluated='x in a.b[0].l')
self.assertIs(next(rpc.x), self.value)
self.AssertResult(evaluated="next(x,'x_nextsentinelvalue')")
def test_Calls(self):
rpc = self.Host()
self.assertIs(rpc.a(), self.value)
self.AssertResult(evaluated='a()')
self.assertIs(rpc.a.b.c(), self.value)
self.AssertResult(evaluated='a.b.c()')
self.assertIs(rpc.a(rpc.b, '', x=0, y=rpc.b.c[1]), self.value)
self.AssertResult(evaluated="a(b,'',y=b.c[1],x=0,)") # Note the kwargs order.
def test_CallReuse(self):
rpc = self.Host()
x = rpc.foo.bar
self.assertIs(x.V(), self.value)
self.AssertResult(evaluated='foo.bar.V()')
self.assertIs(x.Y(), self.value)
self.AssertResult(evaluated='foo.bar.Y()')
def test_LiteralInsertion(self):
rpc = self.Host()
rpc.a.b.Literal(".__setattr__('a', 1)").Eval()
self.AssertResult(evaluated="a.b.__setattr__('a', 1)")
def test_ManualModeEvaluation(self):
rpc = self.Host()
x = rpc.Manual().a.Foo()[1].bar
self.assertIs(x.Eval(), self.value)
self.AssertResult(evaluated='a.Foo()[1].bar')
self.assertIs(rpc.a(x), self.value)
self.AssertResult(evaluated='a(a.Foo()[1].bar)')
self.assertIs(next(rpc.Manual().a).Eval(), self.value)
self.AssertResult(evaluated='next(a)')
self.assertIs(next(rpc.Manual().a).abc.Eval(), self.value)
self.AssertResult(evaluated='next(a).abc')
self.assertIs(iter(rpc.Manual().a[2]).Eval(), self.value)
self.AssertResult(evaluated='iter(a[2])')
self.assertIs(('a' in rpc.Manual().b).Eval(), self.value)
self.AssertResult(evaluated="'a' in b")
self.assertIs(len(rpc.Manual('xyz')).Eval(), self.value)
self.AssertResult(evaluated="len(xyz)")
def test_ManualModeExection(self):
rpc = self.Host()
x = rpc.Manual()
x.a = 5
x.Exec()
self.AssertResult(executed='a=5')
x = rpc.Manual('a.x')
x.y.z = 35
x.Exec()
self.AssertResult(executed='a.x.y.z=35')
x = rpc.Manual('x')
x.y[2].z()[1].a = 35
x.Exec()
self.AssertResult(executed='x.y[2].z()[1].a=35')
rpc.a = rpc.Manual().b
self.AssertResult(executed="a=b")
rpc.a = rpc.Manual().b()
self.AssertResult(executed="a=b()")
def test_LocalExample(self):
""" Show actual local code execution. """
rpc = RpcHost(eval, exec, eval)
rpc.a = 2
self.assertEqual(a, 2) # noqa
self.assertEqual(rpc.a.Eval(), 2)
rpc.a = [1, 2, 3]
del rpc.a[0]
self.assertEqual(rpc.a.Eval(), [2, 3])
x = iter(rpc.a)
self.assertEqual(next(x), 2)
self.assertEqual(next(x), 3)
self.assertRaises(StopIteration, lambda: next(x))
self.assertRaises(StopIteration, lambda: next(x))
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.