Skip to content

Instantly share code, notes, and snippets.

@f0t0n
Last active April 6, 2016 19:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save f0t0n/7a36e7c31df8b2322e59facfdcae4689 to your computer and use it in GitHub Desktop.
Save f0t0n/7a36e7c31df8b2322e59facfdcae4689 to your computer and use it in GitHub Desktop.
from aiohttp import web
def rpc_method(method):
method.is_rpc_method = True
return method
def raise_method_not_found(method):
raise RpcError('Method `{}` not found'.format(method),
RpcErrorCode.METHOD_NOT_FOUND)
class JsonRpcService(object):
def __init__(self):
self.reset_context()
def set_context(self, ctx):
self._ctx = ctx
def reset_context(self):
self._ctx = None
async def __call__(self, method, **params):
# exposes something like this:
try:
fn = getattr(self, method)
if not fn.is_rpc_method:
raise_method_not_found()
return (await fn(**params)
if asyncio.iscoroutinefunction(fn)
else fn(**params))
except AttributeError as e:
raise_method_not_found()
class PrinterService(JsonRpcService):
@rpc_method
async def print_page(self, page=''):
# do something with data in self._ctx.headers (request headers)
return 'Print Task completed'
@rpc_method
def shutdown(self):
return 'ok'
def get_firmware_version(self):
return '1.0.3'
class ScanerService(JsonRpcService):
pass
class ServiceContextManager(object):
def __init__(self, service, ctx):
self._service = service
self._ctx = ctx
async def __aenter__(self):
self._service.set_context(self._ctx)
return self._service
async def __aexit__(self, exc_type, exc, tb):
self._service.reset_context()
class RpcWebsocketHandler(object):
def __init__(self, services):
self._services = {}
self.register_services(services)
def parse_msg(self, msg):
""" Parses JSON-RPC message.
Returns tuple (method, params, id)
"""
pass
def create_msg(self, rpc_result, id):
""" Constructs JSON-RPC response.
returns dict response
"""
pass
def __call__(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
# ...
method, params, id = self.parse_msg(msg.data)
service, method = method.split('.')
async with ServiceContextManager(self._services[service], request) as srvc:
res = await srvc(method, **params)
ws.send_str(json.dumps(self.create_msg(res, id))
return ws
# create application wide service instances
printer_service = PrinterService()
scaner_service = ScanerService()
# create websocket request handler
websocket_handler = RpcWebsocketHandler([
printer_service,
scaner_service,
# other services...
])
# register handler in app
resource = app.router.add_resource('/ws/rpc/, name='json_rpc')
route = resource.add_route('GET', websocket_handler)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment