Skip to content

Instantly share code, notes, and snippets.

@lucj
Created November 11, 2019 17:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lucj/84e511003907b9a4d3270e63d896ec1b to your computer and use it in GitHub Desktop.
Save lucj/84e511003907b9a4d3270e63d896ec1b to your computer and use it in GitHub Desktop.
NATS subscriber
import asyncio
import json
import ssl
import sys
from nats.aio.client import Client as NATS
async def error_cb(e):
print("Error:", e)
async def run(loop):
# Connection to NATS
nc = NATS()
# Use server's CA
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('./certs/ca.pem')
options = {
"servers": ["nats://messaging.techwhale.io:4222"],
"io_loop": loop,
"tls": ssl_ctx,
"error_cb": error_cb
}
await nc.connect(**options)
print("Connected to NATS: {}".format(nc.connected_url.netloc))
# Handling incoming messages
async def message_handler(msg):
data = json.loads(msg.data.decode())
print(data)
# Subscription to all messages published to the provided subject (if authorized to)
subject = "nats.demo"
await nc.subscribe(subject, cb=message_handler)
print('Subscribed to [%s]' % subject);
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
try:
loop.run_forever()
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment