Skip to content

Instantly share code, notes, and snippets.

@wallyqs
Created June 1, 2022 17:58
Show Gist options
  • Save wallyqs/d832911dce6a051584b1b2856b8a9e4a to your computer and use it in GitHub Desktop.
Save wallyqs/d832911dce6a051584b1b2856b8a9e4a to your computer and use it in GitHub Desktop.
Pub JS Msg ID example
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="msgs-with-id", subjects=["msg.id"])
for i in range(0, 10):
ack = await js.publish("msg.id", f"msg: {i}".encode(), headers={
"Nats-Msg-Id": str(i)
})
print("Is duplicate?", ack.duplicate)
# # Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("msg.id", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
print(msg)
for i in range(0, 10):
ack = await js.publish("msg.id", f"msg: {i}".encode(), headers={
"Nats-Msg-Id": str(i)
})
print("Is duplicate?", ack.duplicate)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment