Skip to content

Instantly share code, notes, and snippets.

@bradmontgomery
Created May 13, 2019 13:56
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bradmontgomery/8f1de0e56fa86c29a7daadab1c370c56 to your computer and use it in GitHub Desktop.
Save bradmontgomery/8f1de0e56fa86c29a7daadab1c370c56 to your computer and use it in GitHub Desktop.
Simple example of MQTT publishers / subscriber using paho.mqtt
#!/usr/bin/env python
"""
Spawn a lot of publishers.
"""
import json
import os
import random
import sys
import paho.mqtt.client as mqtt
from datetime import datetime
from multiprocessing import Process
HOST = 'localhost'
PORT = 1883
KEEPALIVE = 60 # in seconds
# This callback is called when a message that was to be sent using the publish()
# call has completed transmission to the broker.
def on_publish(client, userdata, mid):
print(f"Client: {client.spawned_from_pid} published message: {mid}")
def _get_data(pid):
ts = datetime.utcnow().strftime("%c")
value = random.random()
print(f"Publishing [{ts}] {value} from {pid}")
return json.dumps({'timestamp': ts, 'value': value, 'pid': pid})
def spawn_client(topic="default"):
pid = os.getpid()
print(f"Creating client with PID: {pid}")
client = mqtt.Client()
client.spawned_from_pid = pid
client.connect(HOST, PORT, KEEPALIVE)
client.on_publish = on_publish
client.loop_start()
try:
while True:
client.publish(topic, payload=_get_data(pid), qos=0, retain=False)
except KeyboardInterrupt:
print(f"Calling loop_stop() for pid: {pid}")
client.loop_stop()
if __name__ == "__main__":
if len(sys.argv) > 2:
topic = sys.argv[1]
try:
n_procs = int(sys.argv[2])
except (ValueError, IndexError):
n_procs = 4
procs = []
for _ in range(n_procs):
p = Process(target=spawn_client, args=(topic, ))
p.start()
procs.append(p)
# do I need to call .join() if I dont' care about return values?
for p in procs:
p.join()
else:
print("USAGE: ./publishers <topic> [n_processes]")
paho-mqtt==1.4.0
#!/usr/bin/env python
"""
A single-process subscriber.
"""
import json
import sys
import paho.mqtt.subscribe as subscribe
HOST = 'localhost'
PORT = 1883
KEEPALIVE = 60 # in seconds
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
payload = json.loads(msg.payload.decode('utf8'))
ts = payload.get('timestamp')
value = payload.get('value')
print(f"[{ts}] {value} -- {msg.topic}")
if __name__ == "__main__":
if len(sys.argv) == 2:
topic = sys.argv[1]
# for more details, see:
# https://github.com/eclipse/paho.mqtt.python#subscribe-1
subscribe.callback(
on_message, topic, hostname=HOST, port=PORT, keepalive=KEEPALIVE
)
else:
print("USAGE: ./subscriber.py <topic>")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment