Created
March 11, 2022 15:47
-
-
Save econeale/00902a070c9126d84397b752a0bc5cce to your computer and use it in GitHub Desktop.
A wrapper for Paho MQTT Python client to interact with an MQTT cluster.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from paho.mqtt.client import * | |
class ClusterClient(Client): | |
"""A subclass of paho.mqtt.Client that supports connecting to a cluster of | |
mqtt brokers. connect() and connect_async() additionally accept a list | |
of hostnames or host/port tuples: | |
connect("host1") | |
connect(["host1", "host2", "host3"]) # use default port 1883 | |
connect(["host1", ("host2", 8883), ("host3", 8883)]) | |
Hosts to connect to are chosen randomly. If a host disappears the client | |
automatically connects to another host from the list. | |
Modified for support with version 1.6.1 of the Paho Python library from | |
code first described at | |
https://github.com/eclipse/paho.mqtt.python/issues/308#issuecomment-406990125 | |
""" | |
def __init__(self, client_id="", clean_session=None, userdata=None, protocol=MQTTv311, transport="tcp", reconnect_on_failure=True): | |
super().__init__(client_id, clean_session, userdata, protocol, transport, reconnect_on_failure) | |
self._hosts = [] | |
def connect(self, host, port=1883, keepalive=60, bind_address="", bind_port=0, clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None): | |
if isinstance(host, (list, tuple)): | |
self._hosts = [(t, port) if isinstance(t, str) else t for t in host] | |
else: | |
self._hosts = [(host, port)] | |
for host, port in self._hosts: | |
if host is None or len(host) == 0: | |
raise ValueError('Invalid host.') | |
if port <= 0: | |
raise ValueError('Invalid port number.') | |
host, port = random.choice(self._hosts) | |
super().connect(host, port, keepalive, bind_address, bind_port, clean_start, properties) | |
def connect_async(self, host, port=1883, keepalive=60, bind_address="", bind_port=0, clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None): | |
if isinstance(host, (list, tuple)): | |
self._hosts = [(t, port) if isinstance(t, str) else t for t in host] | |
else: | |
self._hosts = [(host, port)] | |
for host, port in self._hosts: | |
if host is None or len(host) == 0: | |
raise ValueError('Invalid host.') | |
if port <= 0: | |
raise ValueError('Invalid port number.') | |
host, port = random.choice(self._hosts) | |
super().connect_async(host, port, keepalive, bind_address, bind_port, clean_start, properties) | |
def reconnect(self): | |
hosts = self._hosts[:] | |
random.shuffle(hosts) | |
while True: | |
self._host, self._port = hosts.pop(0) | |
try: | |
return super().reconnect() | |
except socket.error: | |
if not hosts: | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment