Last active
September 7, 2021 08:33
-
-
Save smurfix/204fcc459149faa1b29e51daf22ce5d1 to your computer and use it in GitHub Desktop.
example code for a multiplexing client/server protocol
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
This is example code for a multiplexing client/server protocol. | |
Missing: | |
* server capacity management | |
* actual testcases for error propagation and cancellation | |
* handle badly-formatted messages without crashing the server | |
* sending more than one request or reply per interaction | |
* most likely a few other edge cases | |
Message format: | |
Messages are wrapped with `msgpack` because it's self-delimiting and very | |
easy to work with. Messages are dicts with these fields: | |
* id: sequence number from the client, returned by the server | |
* msg: the actual data of the request / reply | |
* err: server-side error, forwarded to the client | |
* kill: sequence number of a to-be-cancelled request, sent to the server | |
""" | |
import trio | |
try: | |
from contextlib import asynccontextmanager | |
except ImportError: | |
from async_generator import asynccontextmanager | |
import msgpack | |
from functools import partial | |
from random import random,randint | |
from outcome import Error,Value | |
import logging | |
logger = logging.getLogger(__name__) | |
PORT=50123 | |
class RemoteError(RuntimeError): | |
pass | |
########### Server ############### | |
class ServerHandler: | |
""" | |
This async function reads messages from ``sock`` and runs the async | |
function ``reply`` with each message from the client, in parallel. | |
The results are returned to the client. | |
TODO: no capacity management yet. | |
TODO: this should probably be an actual class | |
""" | |
def __init__(self, sock): | |
self.lock = trio.Lock() # for serializing sending replies | |
self.jobs = dict() # for cancelling jobs | |
self.sock = sock | |
async def process(self, msg): | |
""" | |
The main method of this class: process one incoming message to | |
generate one reply. | |
""" | |
raise RuntimeError("Override me!") | |
async def _do_reply(self, msg): | |
""" | |
Create and send the reply for one message. | |
""" | |
logger.debug("S> %s",msg) | |
seq = msg.get('id') | |
ex = None | |
try: | |
# We save this request's scope so | |
with trio.CancelScope() as cs: | |
self.jobs[seq] = cs | |
try: | |
msg = msg['msg'] | |
except KeyError: | |
self.jobs[msg['kill']].cancel() | |
else: | |
msg = await self.process(msg) | |
except Exception as exc: | |
msg = {'err':repr(exc)} | |
except BaseException as exc: | |
msg = {'err':repr(exc)} | |
ex = exc | |
else: | |
msg = {'msg': msg} | |
finally: | |
if seq: | |
del self.jobs[seq] | |
if not seq: | |
return | |
msg['id'] = seq | |
logger.debug("S< %s",msg) | |
async with self.lock: | |
try: | |
await self.sock.send_all(msgpack.packb(msg)) | |
except trio.BrokenResourceError: | |
pass # client died. Oh well | |
if ex is not None: | |
raise ex | |
async def serve(self, task_status=trio.TASK_STATUS_IGNORED): | |
# The actual server_handler starts here. | |
# We simply read messages and run ``do_reply`` with each. | |
task_status.started() | |
unpacker = msgpack.Unpacker(raw=False) | |
async with trio.open_nursery() as n: | |
while True: | |
try: | |
data = await self.sock.receive_some(4096) | |
except trio.BrokenResourceError: | |
# client went away, stop work | |
n.cancel_scope.cancel() | |
return | |
unpacker.feed(data) | |
for msg in unpacker: | |
# TODO: add capacity management | |
n.start_soon(self._do_reply,msg) | |
########### Client ############### | |
class Client: | |
def __init__(self): | |
self._seq = 0 | |
self._reply = {} | |
self._send_w,self._send_r = trio.open_memory_channel(0) | |
@asynccontextmanager | |
async def connect(self): | |
""" | |
Async context manager that connects us to a server. | |
""" | |
async with trio.open_nursery() as n: | |
async with await trio.open_tcp_stream("127.0.0.1",PORT) as s: | |
self.s = s | |
await n.start(self._recv) | |
await n.start(self._send) | |
try: | |
yield self | |
finally: | |
del self.s | |
n.cancel_scope.cancel() | |
async def _send(self, task_status=trio.TASK_STATUS_IGNORED): | |
task_status.started() | |
async for msg in self._send_r: | |
await self.s.send_all(msgpack.packb(msg)) | |
async def _recv(self, task_status=trio.TASK_STATUS_IGNORED): | |
""" | |
Packet receiver loop. | |
""" | |
task_status.started() | |
unpacker = msgpack.Unpacker(raw=False) | |
while hasattr(self,'s'): | |
data = await self.s.receive_some(4096) | |
unpacker.feed(data) | |
for msg in unpacker: | |
logger.debug("C> %s",msg) | |
try: | |
self._dispatch(msg) | |
except Exception: | |
logger.exception("C?") | |
return | |
def _dispatch(self, msg): | |
""" | |
This code takes a reply and forwards it to the correct client task. | |
""" | |
seq = msg['id'] | |
evt = self._reply[seq] | |
try: | |
self._reply[seq] = Value(msg['msg']) | |
except KeyError: | |
self._reply[seq] = Error(RemoteError(msg['err'])) | |
# TODO if the server signalled an error, wrap in 'Error()' instead | |
evt.set() | |
async def ask(self, msg): | |
""" | |
Our main entry point. Sends a request to the server and waits for a | |
reply. If cancelled, sends a Kill request to the server. | |
""" | |
self._seq += 1 | |
seq = self._seq | |
# Wrap the message, add seq number | |
msg = {'id': seq, 'msg': msg} | |
self._reply[seq] = evt = trio.Event() | |
try: | |
logger.debug("C< %s",msg) | |
await self._send_w.send(msg) | |
await evt.wait() | |
except trio.Cancelled: | |
# Try to tell the server to kill the request | |
with trio.move_on_after(2, shield=True): | |
try: | |
await self.s.send_all(msgpack.packb({'id':0,'kill':seq})) | |
except (trio.BrokenResourceError, trio.ClosedResourceError, AttributeError): | |
pass # the connection is already dead | |
raise # Never swallow trio.Cancelled! | |
else: | |
return self._reply[seq].unwrap() | |
finally: | |
del self._reply[seq] | |
########### Sample code: client ############### | |
async def asker(c,x,y): | |
""" | |
This is a sample client task. It sends a couple of messages to the | |
server and checks that it gets the correct data back. | |
""" | |
for z in range(randint(2,5)): | |
res = await c.ask({'x':x,'y':y,'z':z,'data':'Hello'}) | |
assert res['data'] == "Hello back" | |
assert res['x'] == x | |
assert res['y'] == y | |
assert res['z'] == z | |
await trio.sleep(random()) | |
async def client(x): | |
""" | |
This is a simple client. It starts a bunch of independent worker tasks | |
that talk to the server. | |
""" | |
async with Client().connect() as c: | |
async with trio.open_nursery() as n: | |
n.start_soon(asker,c,x,1) | |
n.start_soon(asker,c,x,2) | |
n.start_soon(asker,c,x,3) | |
########### Sample code: server ############### | |
class MyHandler(ServerHandler): | |
async def process(self, msg): | |
""" | |
This is a simple server-side reply generator. It returns the message | |
unmodified, except that 'back' is appended to the 'data' element. | |
""" | |
msg['data'] += ' back'; | |
await trio.sleep(random()/3) | |
return msg | |
########### Sample code: tying it all together ############### | |
async def main(): | |
""" | |
Main code. Fairly simplistic. | |
""" | |
def server_handler(sock): | |
h = MyHandler(sock) | |
return h.serve() | |
async with trio.open_nursery() as n: | |
# Start a server (and wait for it to be established) | |
await n.start(partial(trio.serve_tcp, server_handler, 50123, host="127.0.0.1")) | |
# Start some clients, in parallel. | |
# This part could (obviously) run in another process, or on a different host. | |
async with trio.open_nursery() as nn: | |
nn.start_soon(client,1) | |
nn.start_soon(client,2) | |
nn.start_soon(client,3) | |
# All clients have finished: kill the server. | |
n.cancel_scope.cancel() | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.DEBUG) | |
trio.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment