This document shows a full example of PubSub model using NATS server
which you can download and run on your laptop. Please organize your code accordingly, e.g.
I created PubSub
directory where I stored all the files and use 3 terminals (tabs
or windows) to run NATS server, python publisher and python subscriber.
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@v2.10.20 | sh
# run nats server
./nats-server -js
python -m venv venv
source venv/bin/activate
pip install nats-py
#!/usr/bin/env python
import time
import asyncio
import nats
from nats.js.api import StreamConfig
# Define the subject for JetStream
STREAM_NAME = "mystream"
SUBJECT = "test.subject"
async def setup_jetstream(nc):
# Access JetStream context
js = nc.jetstream()
# Create a stream with the subject (if it doesn't already exist)
try:
await js.add_stream(name=STREAM_NAME, config=StreamConfig(subjects=[SUBJECT]))
print(f"Stream {STREAM_NAME} created or already exists")
except Exception as e:
print(f"Error creating stream: {e}")
return js
async def run_publisher():
# Connect to the NATS server asynchronously
nc = await nats.connect("nats://localhost:4222")
# Setup JetStream and create the stream
js = await setup_jetstream(nc)
# Publish 2 messages to the subject
for i in range(1, 3):
tstamp = time.time()
message = f"Message {i} at {tstamp}"
ack = await js.publish(SUBJECT, message.encode())
print(f"Published: {message}, sequence: {ack.seq}")
# Close the connection after publishing
await nc.close()
if __name__ == "__main__":
# Run the publisher in an asyncio event loop
asyncio.run(run_publisher())
#!/usr/bin/env python
import asyncio
import nats
from nats.js.api import StreamConfig
# Define the subject for JetStream
SUBJECT = "test.subject"
async def setup_jetstream(nc):
# Access JetStream context
js = nc.jetstream()
# Ensure the stream exists (create it if necessary)
try:
await js.add_stream(name="mystream", config=StreamConfig(subjects=[SUBJECT]))
print("Stream 'mystream' created or already exists")
except Exception as e:
print(f"Error creating stream: {e}")
return js
async def run_subscriber():
# Connect to the NATS server asynchronously
nc = await nats.connect("nats://localhost:4222")
# Setup JetStream and create the stream
js = await setup_jetstream(nc)
# Subscribe to the JetStream subject asynchronously
try:
sub = await js.subscribe(SUBJECT, durable="durable_sub")
print(f"Subscribed to JetStream subject: {SUBJECT}")
async def message_handler(msg):
print(f"Received message: {msg.data.decode()}")
await msg.ack()
# Process messages (retrieving from JetStream)
async for msg in sub.messages:
await message_handler(msg)
except Exception as e:
print(f"Error subscribing or receiving messages: {e}")
# Close the connection after receiving messages
await nc.close()
if __name__ == "__main__":
# Run the subscriber in an asyncio event loop
asyncio.run(run_subscriber())
- tab 1: run nats server
./nats-server -js
- tab 2: run subscriber
./jetstream_sub3.py
Stream 'mystream' created or already exists
Subscribed to JetStream subject: test.subject
- tab 3: run piblisher (the sequence numbers will increase each time you run publisher and I show here the ones I currently have):
./jetstream_pub3.py
Stream mystream created or already exists
Published: Message 1 at 1726680150.989121, sequence: 21
Published: Message 2 at 1726680150.990439, sequence: 22
Now, come back to your tab 2 and you'll see messages like this:
Received message: Message 1 at 1726680150.989121
Received message: Message 2 at 1726680150.990439
Now, when you'll restart either subscriber or publisher they will restart from the same state, e.g. subscriber will be ready to read "unread" messages while publisher will continue publish new messages and its sequence will increasing.