Skip to content

Instantly share code, notes, and snippets.

@decentral1se
Last active January 3, 2020 17:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save decentral1se/37493dbf58f367adc75b2ace8d6263ee to your computer and use it in GitHub Desktop.
Save decentral1se/37493dbf58f367adc75b2ace8d6263ee to your computer and use it in GitHub Desktop.
nproto.py
import trio
import trio.testing
class Protocol:
def __init__(self, stream):
self.stream = stream
async def aclose(self):
await self.stream.aclose()
async def send(self, messages):
try:
for message in messages:
print(f'send: {message}')
await self.stream.send_all(message)
except Exception:
return
def __aiter__(self):
return self
async def __anext__(self):
try:
while True:
byte = await self.stream.receive_some(1)
if byte == b'':
raise StopAsyncIteration
print(f'recv: {byte}')
return byte
except Exception:
raise StopAsyncIteration
async def main():
left, right = trio.testing.memory_stream_pair()
p1 = Protocol(stream=left)
p2 = Protocol(stream=right)
async def p1main():
await p1.send([b'a', b'b', b'c'])
async for msg in p1:
if msg == b'd':
await p1.send([b'e'])
else:
await p1.aclose()
async def p2main():
async for msg in p2:
if msg == b'c':
await p2.send([b'd'])
elif msg == b'e':
print('all done')
await p2.aclose()
async with trio.open_nursery() as nursery:
nursery.start_soon(p1main)
nursery.start_soon(p2main)
trio.run(main)
@decentral1se
Copy link
Author

decentral1se commented Jan 3, 2020

Output:

send: b'a'
send: b'b'
recv: bytearray(b'a')
send: b'c'
recv: bytearray(b'b')
recv: bytearray(b'c')
send: b'd'
recv: bytearray(b'd')
send: b'e'
recv: bytearray(b'e')
all done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment