Skip to content

Instantly share code, notes, and snippets.

@fabian57fabian
Last active September 22, 2023 12:58
Show Gist options
  • Save fabian57fabian/faf75d9e3327909f8c5b1f19202670ea to your computer and use it in GitHub Desktop.
Save fabian57fabian/faf75d9e3327909f8c5b1f19202670ea to your computer and use it in GitHub Desktop.
Test MQTT connection on python
import time
import paho.mqtt.client as mqtt
def test_connection(ip, port, _user, _pass, clientid, topic, test_mess="test") -> (bool, str):
"""
Tests a MQTT connection with a test message
@param ip: Hostname
@param port: Port
@param _user: Username
@param _pass: Password
@param clientid: ClientId
@param topic: Topic to test
@param test_mess: text to send
@return:
"""
print(f"Testing broker {ip}:{port} with client {clientid} on {topic} using {_user}/{_pass} sending '{test_mess}'")
# Trick to use this in on_connect instead of local var
result = {"arrived_code": -1}
def on_connect(client, userdata, flags, rc):
result["arrived_code"] = rc
client = mqtt.Client(clientid)
client.username_pw_set(_user, _pass)
client.on_connect = on_connect
try:
res = client.connect(ip, port)
if res != 0:
return False, "Wrong ip / port"
result["conn"] = res == 0
except Exception as e:
message = str(e)
if "Errno" in message:
return False, message
return False, str(e)
client.loop_start()
client.publish(topic, test_mess)
time.sleep(.2)
connected = client.is_connected()
client.loop_stop()
client.disconnect()
rc = result["arrived_code"]
if rc > -1:
m = mqtt_connection_rc_tostring(rc)
return m == "Connect successful", m
return connected, "ok" if connected else "no connection"
# Thanks to https://stackoverflow.com/questions/67722678/mqtt-bad-username-or-password
def mqtt_connection_rc_tostring(rc: int) -> str:
"""
Get info from rc on_connect callback.
The value of rc indicates success or not:
0: Connection successful
1: Connection refused - incorrect protocol version
2: Connection refused - invalid client identifier
3: Connection refused - server unavailable
4: Connection refused - bad username or password
5: Connection refused - not authorised
6-255: Currently unused.
"""
if rc == 0:
m = "Connect successful"
elif rc == 1:
m = "incorrect protocol version"
elif rc == 2:
m = "invalid client identifier"
elif rc == 3:
m = "server unavailable"
elif rc == 4:
m = "bad username or password"
elif rc == 5:
m = "not authorised"
else:
m = "Connect failed"
return m
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-host", type=str, default="broker.hivemq.com", help="Broker host")
parser.add_argument("-port", type=int, default=1883, help="Broker port")
parser.add_argument("-topic", type=str, default="/test", help="Topic to connect")
parser.add_argument("-clientid", type=str, default="client_test", help="ClientId to use")
parser.add_argument("-username", type=str, default="", help="Username to use")
parser.add_argument("-password", type=str, default="", help="Password to use")
parser.add_argument("-message", type=str, default="test", help="Test message to send")
args = parser.parse_args()
res = test_connection(args.host, args.port, args.username, args.password, args.clientid, args.topic, args.message)
print(res)
@fabian57fabian
Copy link
Author

Did you ever needed to check if all connection settings and credentials work?

Well this test script exposes a test_connection method to test an MQTT broker with user/pass.
Useful to check connection state and wrong misconfigured settings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment