Skip to content

Instantly share code, notes, and snippets.

@wallyqs
Created June 1, 2022 18:12
Show Gist options
  • Save wallyqs/f4260a64fcba167cebbb888964f725e4 to your computer and use it in GitHub Desktop.
Save wallyqs/f4260a64fcba167cebbb888964f725e4 to your computer and use it in GitHub Desktop.
full wildcard stream
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="segments", subjects=["segments.>"])
for i in range(0, 10):
ack = await js.publish("segments.one.A.1", f"msg: {i}".encode(), headers={
"Nats-Msg-Id": str(i)
})
print("Is duplicate?", ack.duplicate)
# # Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("segments.*.*.1", "psub8")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
print(msg)
for i in range(0, 10):
ack = await js.publish("msg.id", f"msg: {i}".encode(), headers={
"Nats-Msg-Id": str(i)
})
print("Is duplicate?", ack.duplicate)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment