Created
September 24, 2020 15:44
-
-
Save wallyqs/2e5d16094e400bceeaf3dd0eefa7144e to your computer and use it in GitHub Desktop.
nats.py async for experiments
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import nats | |
async def run(): | |
nc = await nats.connect(servers=["nats://localhost:4222"]) | |
# Using list comprehension like syntax with limited interest | |
sub1 = await nc.subscribe("foo") | |
await nc.publish("foo", b'First') | |
await nc.publish("foo", b'Second') | |
await nc.publish("foo", b'Third') | |
await sub1.unsubscribe(limit=2) | |
msgs = [msg async for msg in sub1.messages] | |
for msg in msgs: | |
print(f"Received ['{msg.subject}] : '{msg.data.decode()}'") | |
# Using list comprehension like syntax with limited interest | |
sub2 = await nc.subscribe("bar") | |
await nc.publish("bar", b'First') | |
await nc.publish("bar", b'Second') | |
await nc.publish("bar", b'Third') | |
async for msg in sub2.messages: | |
print(f"Received ['{msg.subject}] : '{msg.data.decode()}'") | |
# This signals the server to stop sending messages, though | |
# the iteration will stop once all messages that have already | |
# been consumed have been processed (kind of like drain). | |
await sub2.unsubscribe() | |
await nc.flush() | |
# Using async generators | |
sub3 = await nc.subscribe("quux") | |
async def stream(): | |
async for msg in sub3.messages: | |
yield msg | |
# Use async generator and async def to be able to use await. | |
async def next_msg(): | |
async for msg in stream(): | |
return msg | |
for i in range(0, 2): | |
await nc.publish(f"quux", f'n:{i}'.encode()) | |
async for msg in stream(): | |
print(f"Received ['{msg.subject}] : '{msg.data.decode()}'") | |
# Stop after receiving first message. | |
break | |
msg = await next_msg() | |
print(f"Next Msg ['{msg.subject}] : '{msg.data.decode()}'") | |
# Stop receiving messages. | |
await sub3.unsubscribe() | |
await nc.flush() | |
# Next message Will not be received. | |
await nc.publish(f"quux", b'n:3') | |
msg = await next_msg() | |
if msg is None: | |
print("No more messages...") | |
# Issues with drain and iterators? | |
# await nc.drain() | |
await nc.close() | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(run()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment