Skip to content

Instantly share code, notes, and snippets.

@bverhoeven
Created November 6, 2019 15:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bverhoeven/00b638487df9f0a1b24a58d6954ffaaa to your computer and use it in GitHub Desktop.
Save bverhoeven/00b638487df9f0a1b24a58d6954ffaaa to your computer and use it in GitHub Desktop.
import sys
import time
from ssl import CERT_NONE
from threading import Event
import paho.mqtt.client as mqtt
## Configuration
server_host = '127.0.0.1'
server_port = 8443
server_use_tls = True
topic = 'example/example1'
publisher_client_id = 'example-publisher'
consumer_client_id = 'example-client'
consumer_timeout = 2
def connect_client(client_name, client_id=None):
def on_log(_client, _data, _level, buf):
print(f'{client_name}: {buf}')
client = mqtt.Client(client_id, clean_session=True, transport='websockets')
if server_use_tls:
client.tls_set(cert_reqs=CERT_NONE)
client.on_log = on_log
client.connect(server_host, server_port, 60)
client.on_disconnect = lambda *unused: client.disconnect()
client.loop_start()
return client
def launch_consumer(client_name, client_id):
client = connect_client(client_name, client_id)
client.got_message = Event()
def on_connect(_client: mqtt.Client, _userdata, _flags, rc):
client.subscribe((topic, 1))
def on_subscribe(*_args):
print(f'{client_name}: Subscribed, publishing message with publisher..')
publisher.publish(topic, 'example')
def on_message(_client, _userdata, _message):
client.got_message.set()
def on_disconnect(_client: mqtt.Client, _userdata, rc):
print(f'{client_name}: Disconnected (rc={rc})')
client.disconnect()
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.on_message = on_message
client.on_disconnect = on_disconnect
return client
if __name__ == "__main__":
print('Connecting publisher...')
publisher = connect_client('Publisher', publisher_client_id)
while not publisher.is_connected():
time.sleep(0.5)
print('Ready for tests.\n')
for name in ['Client 1', 'Client 2']:
print(f'## {name}')
test_client = launch_consumer(name, consumer_client_id)
if test_client.got_message.wait(consumer_timeout):
print(f'\nOK: Client received published message.\n')
else:
print(f'\nFAIL: Client did not get published message.')
break
sys.exit(0)
log_type all
connection_messages true
listener 8443
protocol websockets
cafile /mosquitto/certs/root.pem
certfile /mosquitto/certs/example.cert
keyfile /mosquitto/certs/example.key
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment