Skip to content

Instantly share code, notes, and snippets.

@tarruda
Created August 27, 2018 17:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tarruda/129c374d8f59a04d35a852e125ce77f0 to your computer and use it in GitHub Desktop.
Save tarruda/129c374d8f59a04d35a852e125ce77f0 to your computer and use it in GitHub Desktop.
msgpack-rpc implementation for python asyncio
import asyncio
import collections
import mpack
class ErrorResponse(BaseException):
pass
class MpackRpcSession(object):
def __init__(self, reader, writer, mpack_session=None):
self._reader = reader
self._writer = writer
self._session = mpack_session or mpack.Session()
self._polling = False
# FIXME _loop is a private member of StreamReader, but it also seems
# redundant to accept an extra loop parameter since reader/writer are
# already associated with loop. Maybe there's a cleaner way?
self._loop = reader._loop
self._message_queue = collections.deque()
self._eof = False
self._buf = None
def _poll_start(self):
if self._polling:
raise Exception('Already polling')
self._polling = True
def _poll_stop(self):
self._polling = False
async def _read(self):
if self._buf:
rv = self._buf
self._buf = None
else:
if self._reader.at_eof():
raise Exception('Connection was closed')
return await self._reader.read(0xfff)
async def _receive(self):
msg_type = None
while not msg_type:
chunk = await self._read()
if not chunk:
return
offs, msg_type, name_or_err, args_or_result, id_or_data = (
self._session.receive(chunk))
if not msg_type:
continue
chunk = chunk[offs:]
if chunk:
# received more than one message, save the extra chunk for
# later
self._buf = chunk
if msg_type == 'response':
# set the result of the saved future
assert isinstance(id_or_data, asyncio.Future)
if name_or_err:
id_or_data.set_exception(ErrorResponse(name_or_err[1]))
else:
id_or_data.set_result(args_or_result)
else:
assert msg_type in ['request', 'notification']
# enqueue the message for later processing
self._message_queue.append((msg_type, name_or_err,
args_or_result, id_or_data))
async def _wait_for(self, future):
self._poll_start()
while not future.done():
await self._receive()
self._poll_stop()
async def next_message(self):
self._poll_start()
while not self._message_queue:
await self._receive()
self._poll_stop()
return self._message_queue.popleft()
def request(self, method, *args):
future = asyncio.Future(loop=self._loop)
request_data = self._session.request(method, args, data=future)
self._writer.write(request_data)
if not self._polling:
self._loop.create_task(self._wait_for(future))
return future
def notify(self, method, *args):
notification_data = self._session.notify(method, args)
self._writer.write(notification_data)
return self._writer.drain()
def reply(self, request_id, result, error=False):
response_data = self._session.reply(request_id, result, error)
self._writer.write(response_data)
return self._writer.drain()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment