Skip to content

Instantly share code, notes, and snippets.

@wallyqs
Created September 24, 2020 15:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wallyqs/2e5d16094e400bceeaf3dd0eefa7144e to your computer and use it in GitHub Desktop.
Save wallyqs/2e5d16094e400bceeaf3dd0eefa7144e to your computer and use it in GitHub Desktop.
nats.py async for experiments
import asyncio
import nats
async def run():
nc = await nats.connect(servers=["nats://localhost:4222"])
# Using list comprehension like syntax with limited interest
sub1 = await nc.subscribe("foo")
await nc.publish("foo", b'First')
await nc.publish("foo", b'Second')
await nc.publish("foo", b'Third')
await sub1.unsubscribe(limit=2)
msgs = [msg async for msg in sub1.messages]
for msg in msgs:
print(f"Received ['{msg.subject}] : '{msg.data.decode()}'")
# Using list comprehension like syntax with limited interest
sub2 = await nc.subscribe("bar")
await nc.publish("bar", b'First')
await nc.publish("bar", b'Second')
await nc.publish("bar", b'Third')
async for msg in sub2.messages:
print(f"Received ['{msg.subject}] : '{msg.data.decode()}'")
# This signals the server to stop sending messages, though
# the iteration will stop once all messages that have already
# been consumed have been processed (kind of like drain).
await sub2.unsubscribe()
await nc.flush()
# Using async generators
sub3 = await nc.subscribe("quux")
async def stream():
async for msg in sub3.messages:
yield msg
# Use async generator and async def to be able to use await.
async def next_msg():
async for msg in stream():
return msg
for i in range(0, 2):
await nc.publish(f"quux", f'n:{i}'.encode())
async for msg in stream():
print(f"Received ['{msg.subject}] : '{msg.data.decode()}'")
# Stop after receiving first message.
break
msg = await next_msg()
print(f"Next Msg ['{msg.subject}] : '{msg.data.decode()}'")
# Stop receiving messages.
await sub3.unsubscribe()
await nc.flush()
# Next message Will not be received.
await nc.publish(f"quux", b'n:3')
msg = await next_msg()
if msg is None:
print("No more messages...")
# Issues with drain and iterators?
# await nc.drain()
await nc.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment