Skip to content

Instantly share code, notes, and snippets.

@miraculixx
Last active July 29, 2021 17:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save miraculixx/0740efbc7362f5851a9a6471407d3997 to your computer and use it in GitHub Desktop.
Save miraculixx/0740efbc7362f5851a9a6471407d3997 to your computer and use it in GitHub Desktop.
a threaded mqtt client using the gmqtt python library
version: '3'
services:
rabbitmq:
image: rabbitmq:latest
ports:
# 5672 amqp
# 15672 mgmt ui
# 1883 mqtt
- "5672:5672"
- "15672:15672"
- "1883:1883"
volumes:
- "./enabled_plugins:/etc/rabbitmq/enabled_plugins"
- "./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro"
# adopted from https://github.com/wialon/gmqtt
# original under MIT License, Copyright (c) 2018 Gurtam
from _socket import gethostname
import asyncio
from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import MQTTv311
from threading import Thread
class MQTTNode:
"""
A threaded mqtt client that can receive and send
Dependcies:
gmqtt
Usage:
node = MQTTNode(host, user, token)
node.consume(topic)
...
node.send(message)
"""
def __init__(self, host, user, token):
self.host = host
self.user = user
self.token = token
self.client = None
def on_connect(self, event):
print("\non_connect", event)
def on_message(self, message):
print("\non_message", message)
def on_disconnect(self, event):
print("\non_disconnect", event)
def on_subscribe(self, event):
print("\non_subscribe", event)
def _mqtt_on_connect(self, client, flags, rc, properties):
self.on_connect({
'flags': flags,
'rc': rc,
'properties': properties
})
def _mqtt_on_message(self, client, topic, payload, qos, properties):
self.on_message({
'topic': topic,
'paypload': payload,
'qos': qos,
'properties': properties
})
def _mqtt_on_disconnect(self, client, packet, exc=None):
self.on_disconnect({
'package': packet,
'exc': exc,
})
def _mqtt_on_subscribe(self, client, mid, qos, properties):
self.on_subscribe({
'mid': mid,
'qos': qos,
'properties': properties
})
async def mqtt_client(self, broker_host, user, token):
if self.client is not None:
return self.client
client = self.client = MQTTClient(gethostname())
# set callbacks
client.on_connect = self._mqtt_on_connect
client.on_message = self._mqtt_on_message
client.on_disconnect = self._mqtt_on_disconnect
client.on_subscribe = self._mqtt_on_subscribe
# connect
client.set_auth_credentials(user, password=token)
await client.connect(broker_host, version=MQTTv311)
return client
async def run_mqtt_consumer(self, host, user, token, signal, topic):
client = await self.mqtt_client(host, user, token)
client.subscribe(topic, qos=0)
await signal.wait()
await client.disconnect()
self.client = None
async def send_mqtt_message(self, host, user, token, topic, message):
client = await self.mqtt_client(host, user, token)
client.publish(topic, message, qos=1)
def consume(self, topic):
def forever():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.signal = asyncio.Event()
loop.run_until_complete(self.run_mqtt_consumer(self.host, self.user, self.token, self.signal, topic))
self.topic = topic
self.consumet = Thread(target=forever)
self.consumet.start()
def subscribe(self, topic):
self.topic = topic
self.client.subscribe(topic)
def stop(self):
self.signal.set()
def send(self, message, topic=None):
topic = topic or self.topic
loop = asyncio.get_event_loop()
loop.run_until_complete(self.send_mqtt_message(self.host, self.user, self.token, topic, message))
from threading import Thread
from mqttnode import MQTTNode
if __name__ == '__main__':
host = 'localhost'
user = 'rabbitmq'
token = 'rabbitmq'
client = MQTTNode(host, user, token)
client.consume('TEST/#')
client.on_message = lambda v: print("WE GOT A MESSAGE! {}".format(v))
while True:
message = input('enter some message or exit: ')
if message == 'exit':
break
client.send(message, topic='TEST/OUTPUT')
print("Wait to shutdown...")
client.stop()
@miraculixx
Copy link
Author

miraculixx commented Feb 7, 2020

I later figured that the paho-mqtt library does essentially the same thing for MQTT v3.x, which is what I needed. gmqtt supports MQTT v5 though, so may come handy if you need that. Also I just wanted to know how it works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment