Skip to content

Instantly share code, notes, and snippets.

@stephenlb
Created June 19, 2024 23:45
Show Gist options
  • Save stephenlb/f5981ffdec9390c7ed8b34eb427de3b2 to your computer and use it in GitHub Desktop.
Save stephenlb/f5981ffdec9390c7ed8b34eb427de3b2 to your computer and use it in GitHub Desktop.
AsyncIO PubNub Subscribe Example

AsyncIO PubNub Subscribe Example

pubnub-asyncio-simple-example

Usage example:

pip install asyncio pubnub
export PUBNUB_PUBLISH_KEY=demo
export PUBNUB_SUBSCRIBE_KEY=demo
python main.py

Output:

Listening for messages...
Connected
Received message: Hello World on channel: my_channel
Received message: Hello World on channel: my_channel
Received message: Hello World on channel: my_channel
Received message: Hello World on channel: my_channel
Received message: Hello World on channel: my_channel

In another terminal:

export PUBNUB_PUBLISH_KEY=demo
export PUBNUB_SUBSCRIBE_KEY=demo
curl "https://ps.pndsn.com/publish/${PUBNUB_PUBLISH_KEY}/${PUBNUB_SUBSCRIBE_KEY}/0/my_channel/0/%22Hello%20World%22"

Output:

[1,"Sent","17183967137027574"]
import os
import asyncio
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_asyncio import PubNubAsyncio, SubscribeCallback
from pubnub.enums import PNStatusCategory
class MySubscribeCallback(SubscribeCallback):
def status(self, pubnub, status):
if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
print("Disconnected")
elif status.category == PNStatusCategory.PNConnectedCategory:
print("Connected")
elif status.category == PNStatusCategory.PNReconnectedCategory:
print("Reconnected")
elif status.category == PNStatusCategory.PNDecryptionErrorCategory:
print("Decryption error")
def message(self, pubnub, message):
print(f"Received message: {message.message} on channel: {message.channel}")
def presence(self, pubnub, presence):
print(f"Presence event: {presence.event}")
async def main(pubnub):
pubnub.subscribe().channels('my_channel').execute()
print("Listening for messages...")
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
pnconfig = PNConfiguration()
pnconfig.subscribe_key = os.getenv('PUBNUB_SUBSCRIBE_KEY') or 'demo'
pnconfig.publish_key = os.getenv('PUBNUB_PUBLISH_KEY') or 'demo'
pnconfig.user_id = "my_unique_user_id" # Set a unique user ID
pubnub = PubNubAsyncio(pnconfig)
callback = MySubscribeCallback()
pubnub.add_listener(callback)
try:
loop.run_until_complete(main(pubnub))
except KeyboardInterrupt:
print("Interrupted by user. Exiting...")
finally:
loop.run_until_complete(pubnub.stop()) # Assuming 'pubnub' is in scope
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment