Skip to content

Instantly share code, notes, and snippets.

@vkuznet
Last active September 18, 2024 17:54
Show Gist options
  • Save vkuznet/0685347f9a501874aab27356873a81cb to your computer and use it in GitHub Desktop.
Save vkuznet/0685347f9a501874aab27356873a81cb to your computer and use it in GitHub Desktop.

This document shows a full example of PubSub model using NATS server which you can download and run on your laptop. Please organize your code accordingly, e.g. I created PubSub directory where I stored all the files and use 3 terminals (tabs or windows) to run NATS server, python publisher and python subscriber.

Step 1: download NATS into local area

curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@v2.10.20 | sh

# run nats server
./nats-server -js

Step 2: open new terminal window/tab and crate virtual env

python -m venv venv
source venv/bin/activate
pip install nats-py

Step 3: create publisher code:

#!/usr/bin/env python

import time
import asyncio
import nats
from nats.js.api import StreamConfig

# Define the subject for JetStream
STREAM_NAME = "mystream"
SUBJECT = "test.subject"

async def setup_jetstream(nc):
    # Access JetStream context
    js = nc.jetstream()

    # Create a stream with the subject (if it doesn't already exist)
    try:
        await js.add_stream(name=STREAM_NAME, config=StreamConfig(subjects=[SUBJECT]))
        print(f"Stream {STREAM_NAME} created or already exists")
    except Exception as e:
        print(f"Error creating stream: {e}")

    return js

async def run_publisher():
    # Connect to the NATS server asynchronously
    nc = await nats.connect("nats://localhost:4222")

    # Setup JetStream and create the stream
    js = await setup_jetstream(nc)

    # Publish 2 messages to the subject
    for i in range(1, 3):
        tstamp = time.time()
        message = f"Message {i} at {tstamp}"
        ack = await js.publish(SUBJECT, message.encode())
        print(f"Published: {message}, sequence: {ack.seq}")

    # Close the connection after publishing
    await nc.close()

if __name__ == "__main__":
    # Run the publisher in an asyncio event loop
    asyncio.run(run_publisher())

Step 4: create subscriber code

#!/usr/bin/env python

import asyncio
import nats
from nats.js.api import StreamConfig

# Define the subject for JetStream
SUBJECT = "test.subject"

async def setup_jetstream(nc):
    # Access JetStream context
    js = nc.jetstream()

    # Ensure the stream exists (create it if necessary)
    try:
        await js.add_stream(name="mystream", config=StreamConfig(subjects=[SUBJECT]))
        print("Stream 'mystream' created or already exists")
    except Exception as e:
        print(f"Error creating stream: {e}")

    return js

async def run_subscriber():
    # Connect to the NATS server asynchronously
    nc = await nats.connect("nats://localhost:4222")

    # Setup JetStream and create the stream
    js = await setup_jetstream(nc)

    # Subscribe to the JetStream subject asynchronously
    try:
        sub = await js.subscribe(SUBJECT, durable="durable_sub")
        print(f"Subscribed to JetStream subject: {SUBJECT}")

        async def message_handler(msg):
            print(f"Received message: {msg.data.decode()}")
            await msg.ack()

        # Process messages (retrieving from JetStream)
        async for msg in sub.messages:
            await message_handler(msg)
    except Exception as e:
        print(f"Error subscribing or receiving messages: {e}")

    # Close the connection after receiving messages
    await nc.close()

if __name__ == "__main__":
    # Run the subscriber in an asyncio event loop
    asyncio.run(run_subscriber())

Step 5: open two new tabs/windows that you can see all actions

  • tab 1: run nats server
./nats-server -js
  • tab 2: run subscriber
./jetstream_sub3.py
Stream 'mystream' created or already exists
Subscribed to JetStream subject: test.subject
  • tab 3: run piblisher (the sequence numbers will increase each time you run publisher and I show here the ones I currently have):
./jetstream_pub3.py

Stream mystream created or already exists
Published: Message 1 at 1726680150.989121, sequence: 21
Published: Message 2 at 1726680150.990439, sequence: 22

Now, come back to your tab 2 and you'll see messages like this:

Received message: Message 1 at 1726680150.989121
Received message: Message 2 at 1726680150.990439
@vkuznet
Copy link
Author

vkuznet commented Sep 18, 2024

Now, when you'll restart either subscriber or publisher they will restart from the same state, e.g. subscriber will be ready to read "unread" messages while publisher will continue publish new messages and its sequence will increasing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment