Skip to content

Instantly share code, notes, and snippets.

@decentral1se
Created January 1, 2020 22:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save decentral1se/329bdcd957e1a9501579f31126365cf7 to your computer and use it in GitHub Desktop.
Save decentral1se/329bdcd957e1a9501579f31126365cf7 to your computer and use it in GitHub Desktop.
testproto.py
import attr
import trio
import trio.abc
import trio.testing
@attr.s(auto_attribs=True)
class Proto:
stream: trio.abc.Stream
def __aiter__(self):
return self
async def __anext__(self):
msg = await self.stream.receive_some(1)
if not msg:
raise StopAsyncIteration
return msg
async def send(self, msg: bytes):
await self.stream.send_all(msg)
async def main():
left, right = trio.testing.memory_stream_pair()
p1 = Proto(stream=left)
p2 = Proto(stream=right)
await p1.send(b'f')
await p1.send(b'h')
async for msg in p2:
if msg == b'f':
await p2.send(b'f')
await p2.send(b'h')
async for msg in p1:
if msg == b'f':
print('p1: f received')
trio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment