Skip to content

Instantly share code, notes, and snippets.

@econeale
Created March 11, 2022 15:47
Show Gist options
  • Save econeale/00902a070c9126d84397b752a0bc5cce to your computer and use it in GitHub Desktop.
Save econeale/00902a070c9126d84397b752a0bc5cce to your computer and use it in GitHub Desktop.
A wrapper for Paho MQTT Python client to interact with an MQTT cluster.
import random
from paho.mqtt.client import *
class ClusterClient(Client):
"""A subclass of paho.mqtt.Client that supports connecting to a cluster of
mqtt brokers. connect() and connect_async() additionally accept a list
of hostnames or host/port tuples:
connect("host1")
connect(["host1", "host2", "host3"]) # use default port 1883
connect(["host1", ("host2", 8883), ("host3", 8883)])
Hosts to connect to are chosen randomly. If a host disappears the client
automatically connects to another host from the list.
Modified for support with version 1.6.1 of the Paho Python library from
code first described at
https://github.com/eclipse/paho.mqtt.python/issues/308#issuecomment-406990125
"""
def __init__(self, client_id="", clean_session=None, userdata=None, protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):
super().__init__(client_id, clean_session, userdata, protocol, transport, reconnect_on_failure)
self._hosts = []
def connect(self, host, port=1883, keepalive=60, bind_address="", bind_port=0, clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None):
if isinstance(host, (list, tuple)):
self._hosts = [(t, port) if isinstance(t, str) else t for t in host]
else:
self._hosts = [(host, port)]
for host, port in self._hosts:
if host is None or len(host) == 0:
raise ValueError('Invalid host.')
if port <= 0:
raise ValueError('Invalid port number.')
host, port = random.choice(self._hosts)
super().connect(host, port, keepalive, bind_address, bind_port, clean_start, properties)
def connect_async(self, host, port=1883, keepalive=60, bind_address="", bind_port=0, clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None):
if isinstance(host, (list, tuple)):
self._hosts = [(t, port) if isinstance(t, str) else t for t in host]
else:
self._hosts = [(host, port)]
for host, port in self._hosts:
if host is None or len(host) == 0:
raise ValueError('Invalid host.')
if port <= 0:
raise ValueError('Invalid port number.')
host, port = random.choice(self._hosts)
super().connect_async(host, port, keepalive, bind_address, bind_port, clean_start, properties)
def reconnect(self):
hosts = self._hosts[:]
random.shuffle(hosts)
while True:
self._host, self._port = hosts.pop(0)
try:
return super().reconnect()
except socket.error:
if not hosts:
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment