Last active
May 25, 2018 17:18
-
-
Save danbradham/69f70a88fd5ce790003fac4c1561392d to your computer and use it in GitHub Desktop.
Tinyrpc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import absolute_import, division, print_function | |
from tinyrpc import spawn, init_maya | |
# Simple rpc example | |
# Opens rpc on localhost at port 8000 using the current python interpreter | |
rpc = spawn() | |
result = rpc.submit(str.upper, 'hello') | |
assert result == 'HELLO' | |
result = rpc.submit('"hello".upper()') | |
assert result == 'HELLO' | |
result = rpc.submit(lambda x: x.upper(), 'hello') | |
assert result == 'HELLO' | |
result = rpc.submit("x = 10") | |
assert result is None | |
result = rpc.submit("x") | |
assert result == 10 | |
rpc.kill() | |
# Maya rpc example | |
# Opens rpc using mayapy interpreter and sends init_maya as an initializer | |
maya_rpc = spawn( | |
python="C:/Program Files/Autodesk/Maya2017/bin/mayapy.exe", | |
initializer=init_maya | |
) | |
result = maya_rpc.submit('cmds.polySphere()') | |
assert result == [u'pSphere1', u'polySphere1'] | |
result = maya_rpc.submit('cmds.objExists("pSphere1")') | |
assert result == True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import absolute_import, division, print_function | |
import atexit | |
import logging | |
import os | |
import signal | |
import socket | |
import subprocess | |
import sys | |
import traceback | |
from base64 import urlsafe_b64encode, urlsafe_b64decode | |
from Queue import Queue, Empty | |
from select import select | |
LOCAL_PORT_RANGE = range(8000, 9000) | |
CHILDREN = [] | |
OLD_TERM = signal.getsignal(signal.SIGTERM) | |
OLD_INT = signal.getsignal(signal.SIGINT) | |
logging.basicConfig( | |
format='<%(name)s pid=%(process)d> %(message)s', | |
level=logging.DEBUG, | |
) | |
log = logging.getLogger('rpc') | |
def on_exit(): | |
'''Send kill signal to all children on exit''' | |
for child in CHILDREN: | |
if child.alive(): | |
child.kill() | |
def exit_signal(sig): | |
'''Calls on_exit when the sig is received''' | |
old_sig = signal.getsignal(sig) | |
def signal_handler(*args): | |
on_exit() | |
old_sig() | |
signal.signal(sig, signal_handler) | |
atexit.register(on_exit) | |
exit_signal(signal.SIGTERM) | |
exit_signal(signal.SIGINT) | |
def run(command, **kwargs): | |
'''Start a subprocess... | |
See also: | |
https://stackoverflow.com/a/13256908 | |
''' | |
kwargs.setdefault('stdin', subprocess.PIPE) | |
kwargs.setdefault('stdout', subprocess.PIPE) | |
kwargs.setdefault('stderr', subprocess.PIPE) | |
proc = subprocess.Popen(command, **kwargs) | |
return proc | |
def address_available(addr): | |
'''Check if the host and port are available''' | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
code = sock.connect_ex(addr) | |
if code == 0: | |
return False | |
else: | |
return True | |
def spawn(host=None, port=None, python=None, initializer=None, **kwargs): | |
'''Spawn an RPCExecutor in another process | |
Arguments: | |
host (str): Hostname defaults to 'localhost' | |
port (int): Port to use | |
python (str): Path to python interpreter | |
initializer (callable): Function to execute in the RPCExecutor | |
''' | |
host = host or 'localhost' | |
if port: | |
port = int(port) | |
if not address_available((host, port)): | |
raise RuntimeError('Unavailable Port: %d' % port) | |
else: | |
for port in LOCAL_PORT_RANGE: | |
if address_available((host, port)): | |
break | |
else: | |
raise RuntimeError( | |
'No available ports in range %d-%d' | |
% (LOCAL_PORT_RANGE[0], LOCAL_PORT_RANGE[-1]) | |
) | |
if python is None: | |
python = sys.executable | |
# Start Worker | |
kwargs.setdefault('stdout', sys.stdout) | |
kwargs.setdefault('stderr', sys.stdout) | |
cmd = [python, __file__, host, str(port)] | |
proc = run(cmd, **kwargs) | |
rpc = RPC(host, port, proc) | |
CHILDREN.append(rpc) | |
# Send initializer as Task | |
if initializer: | |
try: | |
rpc.submit(initializer) | |
except Exception: | |
rpc.kill() | |
raise | |
return rpc | |
class Task(object): | |
def __init__(self, fn, args, kwargs): | |
self.fn = fn | |
self.args = args | |
self.kwargs = kwargs | |
self.result = None | |
self.exc = None | |
self.formatted_exc = None | |
def __str__(self): | |
return ( | |
'Task(fn={}, args={}, kwargs={})' | |
).format( | |
self.fn.__name__, | |
self.args, | |
self.kwargs | |
) | |
def __call__(self): | |
try: | |
self.result = self.fn(*self.args, **self.kwargs) | |
except Exception as e: | |
self.exc = e | |
self.formatted_exc = traceback.format_exc() | |
def get_result(self, propagate=True): | |
if self.exc: | |
raise self.exc | |
return self.result | |
class Eval(Task): | |
def __init__(self, expression): | |
self.expression = expression | |
super(Eval, self).__init__(eval, (expression,), {}) | |
def __str__(self): | |
return 'Eval({!r})'.format(self.expression) | |
class Exec(Task): | |
def __init__(self, command): | |
self.command = command | |
super(Exec, self).__init__(exec_, (command,), {}) | |
def __str__(self): | |
return 'Exec({!r})'.format(self.command) | |
def exec_(command): | |
namespace = globals() | |
exec(command, namespace, namespace) | |
def evaluable(expression): | |
try: | |
compile(expression, '', 'eval') | |
except SyntaxError: | |
return False | |
else: | |
return True | |
def encode(obj): | |
try: | |
import cloudpickle as pickle | |
except ImportError: | |
import pickle | |
return urlsafe_b64encode(pickle.dumps(obj)) | |
def decode(obj): | |
import pickle | |
return pickle.loads(urlsafe_b64decode(obj)) | |
class Transport(object): | |
def __init__(self, sock): | |
self.sock = sock | |
def send(self, msg): | |
self.sock.send(encode(msg)) | |
def recv(self): | |
data = '' | |
while True: | |
part = self.sock.recv(1024) | |
data += part | |
if len(part) < 1024: | |
break | |
return decode(data) | |
def close(self): | |
self.sock.close() | |
class RPC(object): | |
'''Object used to submit tasks to an RPCExecutor.''' | |
def __init__(self, host='localhost', port=8888, proc=None): | |
self.host = host | |
self.port = int(port) | |
self.addr = (self.host, self.port) | |
self.proc = proc | |
def _transport(self): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.connect(self.addr) | |
return Transport(sock) | |
def alive(self): | |
return not address_available(self.addr) | |
def kill(self): | |
if self.proc: | |
os.kill(self.proc.pid, signal.SIGBREAK) | |
def submit(self, task, *args, **kwargs): | |
if isinstance(task, Task): | |
task = task | |
elif callable(task): | |
task = Task(task, args, kwargs) | |
elif evaluable(task): | |
task = Eval(task) | |
else: | |
task = Exec(task) | |
transport = self._transport() | |
transport.send(task) | |
response = transport.recv() | |
transport.close() | |
return response.get_result() | |
class RPCExecutor(object): | |
'''Executes tasks submitted by RPC clients and sends the completed task | |
back. Each client connection is discarded after a single request.''' | |
def __init__(self, host='localhost', port=8888): | |
self.host = host | |
self.port = int(port) | |
self.addr = (self.host, self.port) | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.sock.setblocking(0) | |
self.sock.bind(self.addr) | |
self.sock.listen(5) | |
log.debug('Listening on %s:%s', *self.addr) | |
def listen(self): | |
rset, wset, xset = set([self.sock]), set(), set() | |
queues = {} | |
transports = {} | |
addresses = {} | |
log.debug('Awaiting tasks') | |
while True: | |
readable, writeable, exceptional = select( | |
list(rset), list(wset), list(xset), 0.1 | |
) | |
for sock in readable: | |
if sock is self.sock: | |
client, address = sock.accept() | |
client.setblocking(0) | |
address = '%s:%s' % address | |
log.debug('%s connected', address) | |
rset.add(client) | |
transports[client] = Transport(client) | |
queues[client] = Queue() | |
addresses[client] = address | |
else: | |
transport = transports.get(sock) | |
queue = queues.get(sock) | |
address = addresses.get(sock) | |
task = transport.recv() | |
if task: | |
log.debug('%s received: %s', address, task) | |
log.debug('%s calling: %s', address, task) | |
task() | |
if task.exc: | |
log.debug('%s failed: %s', address, task.exc) | |
else: | |
log.debug('%s result: %s', address, task.result) | |
queue.put(task) | |
rset.discard(sock) | |
wset.add(sock) | |
else: | |
log.debug('%s eof', address) | |
transport.close() | |
rset.discard(sock) | |
wset.discard(sock) | |
for sock in writeable: | |
transport = transports.get(sock) | |
queue = queues.get(sock) | |
address = addresses.get(sock) | |
try: | |
result = queue.get_nowait() | |
except Empty: | |
log.debug('%s result queue empty', address) | |
log.debug('%s closing connection', address) | |
wset.discard(sock) | |
sock.close() | |
else: | |
log.debug('%s sending result', address) | |
transport.send(result) | |
wset.discard(sock) | |
sock.close() | |
for sock in exceptional: | |
log.debug('%s in error, closing connection', address) | |
sock.close() | |
transport = transports.pop(sock, None) | |
queue = queues.pop(sock, None) | |
address = addresses.pop(sock, None) | |
rset.discard(sock) | |
wset.discard(sock) | |
def init_maya(): | |
'''Used to initialize Maya in an RPCExecutor spawned using mayapy. | |
Examples: | |
>>> rpc = spawn(python='path/to/mayapy.exe', initializer=init_maya) | |
>>> rpc.submit('cmds.polySphere()') | |
[u'polySphere1', u'pSphere1'] | |
''' | |
global cmds | |
global standalone | |
global utils | |
from maya import cmds | |
from maya import standalone | |
from maya import utils | |
# Store root log handlers | |
rhandler = log.root.handlers[0] | |
# Initialize maya standalone | |
standalone.initialize() | |
# Remove non-root log handlers | |
for handler in list(log.root.handlers): | |
if handler is not rhandler: | |
log.root.handlers.remove(handler) | |
# Set log level | |
log.setLevel(logging.DEBUG) | |
if __name__ == '__main__': | |
args = sys.argv[1:] | |
if len(args) == 2: | |
RPCExecutor(args[0], args[1]).listen() | |
else: | |
RPCExecutor().listen() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment