Skip to content

Instantly share code, notes, and snippets.

@wallyqs
Created May 12, 2021 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wallyqs/780730e840bfc6da11370c8b70dab0cf to your computer and use it in GitHub Desktop.
Save wallyqs/780730e840bfc6da11370c8b70dab0cf to your computer and use it in GitHub Desktop.
pull-subscriber.py
import asyncio
import nats
async def run():
# With stream and pull consumer created as follows via the NATS Cli:
#
# - Stream:
#
# $ nats stream create
# ? Stream Name foo
# ? Subjects to consume foo
# ? Storage backend file
# ? Retention Policy Limits
# ? Discard Policy Old
# ? Stream Messages Limit -1
# ? Message size limit -1
# ? Maximum message age limit -1
# ? Maximum individual message size -1
# ? Duplicate tracking time window 2m
# ? Replicas 1
# - Consumer
#
# nats consumer create bar
# ? Consumer name bar
# ? Delivery target (empty for Pull Consumers)
# X Sorry, your reply was invalid: Value is required
# ? Start policy (all, new, last, 1h, msg sequence) all
# ? Replay policy instant
# ? Filter Stream by subject (blank for all)
# ? Maximum Allowed Deliveries -1
# ? Maximum Acknowledgements Pending 0
# ? Select a Stream foo
# Information for Consumer foo > bar created 2021-05-12T11:00:08-07:00
#
# Configuration:
#
# Durable Name: bar
# Pull Mode: true
# Deliver All: true
# Ack Policy: Explicit
# Ack Wait: 30s
# Replay Policy: Instant
# Max Ack Pending: 20,000
#
# State:
#
# Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
# Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
# Outstanding Acks: 0 out of maximum 20000
# Redelivered Messages: 0
# Unprocessed Messages: 0
nc = await nats.connect(servers=["nats://localhost:4222"])
# Fetch next message
msg = await nc.timed_request('$JS.API.CONSUMER.MSG.NEXT.foo.bar', b'')
print("ACK Subject:", msg.reply)
print("MSG Data :", msg.data)
# ACK the message
#
# Use `nats consumer info bar` to confirm that messages are acked.
#
await nc.publish(msg.reply, b'')
await nc.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment