Skip to content

Instantly share code, notes, and snippets.

@danbradham
Last active May 25, 2018 17:18
Show Gist options
  • Save danbradham/69f70a88fd5ce790003fac4c1561392d to your computer and use it in GitHub Desktop.
Save danbradham/69f70a88fd5ce790003fac4c1561392d to your computer and use it in GitHub Desktop.
Tinyrpc
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
from tinyrpc import spawn, init_maya
# Simple rpc example
# Opens rpc on localhost at port 8000 using the current python interpreter
rpc = spawn()
result = rpc.submit(str.upper, 'hello')
assert result == 'HELLO'
result = rpc.submit('"hello".upper()')
assert result == 'HELLO'
result = rpc.submit(lambda x: x.upper(), 'hello')
assert result == 'HELLO'
result = rpc.submit("x = 10")
assert result is None
result = rpc.submit("x")
assert result == 10
rpc.kill()
# Maya rpc example
# Opens rpc using mayapy interpreter and sends init_maya as an initializer
maya_rpc = spawn(
python="C:/Program Files/Autodesk/Maya2017/bin/mayapy.exe",
initializer=init_maya
)
result = maya_rpc.submit('cmds.polySphere()')
assert result == [u'pSphere1', u'polySphere1']
result = maya_rpc.submit('cmds.objExists("pSphere1")')
assert result == True
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import atexit
import logging
import os
import signal
import socket
import subprocess
import sys
import traceback
from base64 import urlsafe_b64encode, urlsafe_b64decode
from Queue import Queue, Empty
from select import select
LOCAL_PORT_RANGE = range(8000, 9000)
CHILDREN = []
OLD_TERM = signal.getsignal(signal.SIGTERM)
OLD_INT = signal.getsignal(signal.SIGINT)
logging.basicConfig(
format='<%(name)s pid=%(process)d> %(message)s',
level=logging.DEBUG,
)
log = logging.getLogger('rpc')
def on_exit():
'''Send kill signal to all children on exit'''
for child in CHILDREN:
if child.alive():
child.kill()
def exit_signal(sig):
'''Calls on_exit when the sig is received'''
old_sig = signal.getsignal(sig)
def signal_handler(*args):
on_exit()
old_sig()
signal.signal(sig, signal_handler)
atexit.register(on_exit)
exit_signal(signal.SIGTERM)
exit_signal(signal.SIGINT)
def run(command, **kwargs):
'''Start a subprocess...
See also:
https://stackoverflow.com/a/13256908
'''
kwargs.setdefault('stdin', subprocess.PIPE)
kwargs.setdefault('stdout', subprocess.PIPE)
kwargs.setdefault('stderr', subprocess.PIPE)
proc = subprocess.Popen(command, **kwargs)
return proc
def address_available(addr):
'''Check if the host and port are available'''
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
code = sock.connect_ex(addr)
if code == 0:
return False
else:
return True
def spawn(host=None, port=None, python=None, initializer=None, **kwargs):
'''Spawn an RPCExecutor in another process
Arguments:
host (str): Hostname defaults to 'localhost'
port (int): Port to use
python (str): Path to python interpreter
initializer (callable): Function to execute in the RPCExecutor
'''
host = host or 'localhost'
if port:
port = int(port)
if not address_available((host, port)):
raise RuntimeError('Unavailable Port: %d' % port)
else:
for port in LOCAL_PORT_RANGE:
if address_available((host, port)):
break
else:
raise RuntimeError(
'No available ports in range %d-%d'
% (LOCAL_PORT_RANGE[0], LOCAL_PORT_RANGE[-1])
)
if python is None:
python = sys.executable
# Start Worker
kwargs.setdefault('stdout', sys.stdout)
kwargs.setdefault('stderr', sys.stdout)
cmd = [python, __file__, host, str(port)]
proc = run(cmd, **kwargs)
rpc = RPC(host, port, proc)
CHILDREN.append(rpc)
# Send initializer as Task
if initializer:
try:
rpc.submit(initializer)
except Exception:
rpc.kill()
raise
return rpc
class Task(object):
def __init__(self, fn, args, kwargs):
self.fn = fn
self.args = args
self.kwargs = kwargs
self.result = None
self.exc = None
self.formatted_exc = None
def __str__(self):
return (
'Task(fn={}, args={}, kwargs={})'
).format(
self.fn.__name__,
self.args,
self.kwargs
)
def __call__(self):
try:
self.result = self.fn(*self.args, **self.kwargs)
except Exception as e:
self.exc = e
self.formatted_exc = traceback.format_exc()
def get_result(self, propagate=True):
if self.exc:
raise self.exc
return self.result
class Eval(Task):
def __init__(self, expression):
self.expression = expression
super(Eval, self).__init__(eval, (expression,), {})
def __str__(self):
return 'Eval({!r})'.format(self.expression)
class Exec(Task):
def __init__(self, command):
self.command = command
super(Exec, self).__init__(exec_, (command,), {})
def __str__(self):
return 'Exec({!r})'.format(self.command)
def exec_(command):
namespace = globals()
exec(command, namespace, namespace)
def evaluable(expression):
try:
compile(expression, '', 'eval')
except SyntaxError:
return False
else:
return True
def encode(obj):
try:
import cloudpickle as pickle
except ImportError:
import pickle
return urlsafe_b64encode(pickle.dumps(obj))
def decode(obj):
import pickle
return pickle.loads(urlsafe_b64decode(obj))
class Transport(object):
def __init__(self, sock):
self.sock = sock
def send(self, msg):
self.sock.send(encode(msg))
def recv(self):
data = ''
while True:
part = self.sock.recv(1024)
data += part
if len(part) < 1024:
break
return decode(data)
def close(self):
self.sock.close()
class RPC(object):
'''Object used to submit tasks to an RPCExecutor.'''
def __init__(self, host='localhost', port=8888, proc=None):
self.host = host
self.port = int(port)
self.addr = (self.host, self.port)
self.proc = proc
def _transport(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(self.addr)
return Transport(sock)
def alive(self):
return not address_available(self.addr)
def kill(self):
if self.proc:
os.kill(self.proc.pid, signal.SIGBREAK)
def submit(self, task, *args, **kwargs):
if isinstance(task, Task):
task = task
elif callable(task):
task = Task(task, args, kwargs)
elif evaluable(task):
task = Eval(task)
else:
task = Exec(task)
transport = self._transport()
transport.send(task)
response = transport.recv()
transport.close()
return response.get_result()
class RPCExecutor(object):
'''Executes tasks submitted by RPC clients and sends the completed task
back. Each client connection is discarded after a single request.'''
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = int(port)
self.addr = (self.host, self.port)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setblocking(0)
self.sock.bind(self.addr)
self.sock.listen(5)
log.debug('Listening on %s:%s', *self.addr)
def listen(self):
rset, wset, xset = set([self.sock]), set(), set()
queues = {}
transports = {}
addresses = {}
log.debug('Awaiting tasks')
while True:
readable, writeable, exceptional = select(
list(rset), list(wset), list(xset), 0.1
)
for sock in readable:
if sock is self.sock:
client, address = sock.accept()
client.setblocking(0)
address = '%s:%s' % address
log.debug('%s connected', address)
rset.add(client)
transports[client] = Transport(client)
queues[client] = Queue()
addresses[client] = address
else:
transport = transports.get(sock)
queue = queues.get(sock)
address = addresses.get(sock)
task = transport.recv()
if task:
log.debug('%s received: %s', address, task)
log.debug('%s calling: %s', address, task)
task()
if task.exc:
log.debug('%s failed: %s', address, task.exc)
else:
log.debug('%s result: %s', address, task.result)
queue.put(task)
rset.discard(sock)
wset.add(sock)
else:
log.debug('%s eof', address)
transport.close()
rset.discard(sock)
wset.discard(sock)
for sock in writeable:
transport = transports.get(sock)
queue = queues.get(sock)
address = addresses.get(sock)
try:
result = queue.get_nowait()
except Empty:
log.debug('%s result queue empty', address)
log.debug('%s closing connection', address)
wset.discard(sock)
sock.close()
else:
log.debug('%s sending result', address)
transport.send(result)
wset.discard(sock)
sock.close()
for sock in exceptional:
log.debug('%s in error, closing connection', address)
sock.close()
transport = transports.pop(sock, None)
queue = queues.pop(sock, None)
address = addresses.pop(sock, None)
rset.discard(sock)
wset.discard(sock)
def init_maya():
'''Used to initialize Maya in an RPCExecutor spawned using mayapy.
Examples:
>>> rpc = spawn(python='path/to/mayapy.exe', initializer=init_maya)
>>> rpc.submit('cmds.polySphere()')
[u'polySphere1', u'pSphere1']
'''
global cmds
global standalone
global utils
from maya import cmds
from maya import standalone
from maya import utils
# Store root log handlers
rhandler = log.root.handlers[0]
# Initialize maya standalone
standalone.initialize()
# Remove non-root log handlers
for handler in list(log.root.handlers):
if handler is not rhandler:
log.root.handlers.remove(handler)
# Set log level
log.setLevel(logging.DEBUG)
if __name__ == '__main__':
args = sys.argv[1:]
if len(args) == 2:
RPCExecutor(args[0], args[1]).listen()
else:
RPCExecutor().listen()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment