Created
May 12, 2021 18:14
-
-
Save wallyqs/780730e840bfc6da11370c8b70dab0cf to your computer and use it in GitHub Desktop.
pull-subscriber.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import nats | |
async def run(): | |
# With stream and pull consumer created as follows via the NATS Cli: | |
# | |
# - Stream: | |
# | |
# $ nats stream create | |
# ? Stream Name foo | |
# ? Subjects to consume foo | |
# ? Storage backend file | |
# ? Retention Policy Limits | |
# ? Discard Policy Old | |
# ? Stream Messages Limit -1 | |
# ? Message size limit -1 | |
# ? Maximum message age limit -1 | |
# ? Maximum individual message size -1 | |
# ? Duplicate tracking time window 2m | |
# ? Replicas 1 | |
# - Consumer | |
# | |
# nats consumer create bar | |
# ? Consumer name bar | |
# ? Delivery target (empty for Pull Consumers) | |
# X Sorry, your reply was invalid: Value is required | |
# ? Start policy (all, new, last, 1h, msg sequence) all | |
# ? Replay policy instant | |
# ? Filter Stream by subject (blank for all) | |
# ? Maximum Allowed Deliveries -1 | |
# ? Maximum Acknowledgements Pending 0 | |
# ? Select a Stream foo | |
# Information for Consumer foo > bar created 2021-05-12T11:00:08-07:00 | |
# | |
# Configuration: | |
# | |
# Durable Name: bar | |
# Pull Mode: true | |
# Deliver All: true | |
# Ack Policy: Explicit | |
# Ack Wait: 30s | |
# Replay Policy: Instant | |
# Max Ack Pending: 20,000 | |
# | |
# State: | |
# | |
# Last Delivered Message: Consumer sequence: 0 Stream sequence: 0 | |
# Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0 | |
# Outstanding Acks: 0 out of maximum 20000 | |
# Redelivered Messages: 0 | |
# Unprocessed Messages: 0 | |
nc = await nats.connect(servers=["nats://localhost:4222"]) | |
# Fetch next message | |
msg = await nc.timed_request('$JS.API.CONSUMER.MSG.NEXT.foo.bar', b'') | |
print("ACK Subject:", msg.reply) | |
print("MSG Data :", msg.data) | |
# ACK the message | |
# | |
# Use `nats consumer info bar` to confirm that messages are acked. | |
# | |
await nc.publish(msg.reply, b'') | |
await nc.close() | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(run()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment