Last active
July 29, 2021 17:51
-
-
Save miraculixx/0740efbc7362f5851a9a6471407d3997 to your computer and use it in GitHub Desktop.
a threaded mqtt client using the gmqtt python library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3' | |
services: | |
rabbitmq: | |
image: rabbitmq:latest | |
ports: | |
# 5672 amqp | |
# 15672 mgmt ui | |
# 1883 mqtt | |
- "5672:5672" | |
- "15672:15672" | |
- "1883:1883" | |
volumes: | |
- "./enabled_plugins:/etc/rabbitmq/enabled_plugins" | |
- "./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# adopted from https://github.com/wialon/gmqtt | |
# original under MIT License, Copyright (c) 2018 Gurtam | |
from _socket import gethostname | |
import asyncio | |
from gmqtt import Client as MQTTClient | |
from gmqtt.mqtt.constants import MQTTv311 | |
from threading import Thread | |
class MQTTNode: | |
""" | |
A threaded mqtt client that can receive and send | |
Dependcies: | |
gmqtt | |
Usage: | |
node = MQTTNode(host, user, token) | |
node.consume(topic) | |
... | |
node.send(message) | |
""" | |
def __init__(self, host, user, token): | |
self.host = host | |
self.user = user | |
self.token = token | |
self.client = None | |
def on_connect(self, event): | |
print("\non_connect", event) | |
def on_message(self, message): | |
print("\non_message", message) | |
def on_disconnect(self, event): | |
print("\non_disconnect", event) | |
def on_subscribe(self, event): | |
print("\non_subscribe", event) | |
def _mqtt_on_connect(self, client, flags, rc, properties): | |
self.on_connect({ | |
'flags': flags, | |
'rc': rc, | |
'properties': properties | |
}) | |
def _mqtt_on_message(self, client, topic, payload, qos, properties): | |
self.on_message({ | |
'topic': topic, | |
'paypload': payload, | |
'qos': qos, | |
'properties': properties | |
}) | |
def _mqtt_on_disconnect(self, client, packet, exc=None): | |
self.on_disconnect({ | |
'package': packet, | |
'exc': exc, | |
}) | |
def _mqtt_on_subscribe(self, client, mid, qos, properties): | |
self.on_subscribe({ | |
'mid': mid, | |
'qos': qos, | |
'properties': properties | |
}) | |
async def mqtt_client(self, broker_host, user, token): | |
if self.client is not None: | |
return self.client | |
client = self.client = MQTTClient(gethostname()) | |
# set callbacks | |
client.on_connect = self._mqtt_on_connect | |
client.on_message = self._mqtt_on_message | |
client.on_disconnect = self._mqtt_on_disconnect | |
client.on_subscribe = self._mqtt_on_subscribe | |
# connect | |
client.set_auth_credentials(user, password=token) | |
await client.connect(broker_host, version=MQTTv311) | |
return client | |
async def run_mqtt_consumer(self, host, user, token, signal, topic): | |
client = await self.mqtt_client(host, user, token) | |
client.subscribe(topic, qos=0) | |
await signal.wait() | |
await client.disconnect() | |
self.client = None | |
async def send_mqtt_message(self, host, user, token, topic, message): | |
client = await self.mqtt_client(host, user, token) | |
client.publish(topic, message, qos=1) | |
def consume(self, topic): | |
def forever(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
self.signal = asyncio.Event() | |
loop.run_until_complete(self.run_mqtt_consumer(self.host, self.user, self.token, self.signal, topic)) | |
self.topic = topic | |
self.consumet = Thread(target=forever) | |
self.consumet.start() | |
def subscribe(self, topic): | |
self.topic = topic | |
self.client.subscribe(topic) | |
def stop(self): | |
self.signal.set() | |
def send(self, message, topic=None): | |
topic = topic or self.topic | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(self.send_mqtt_message(self.host, self.user, self.token, topic, message)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread | |
from mqttnode import MQTTNode | |
if __name__ == '__main__': | |
host = 'localhost' | |
user = 'rabbitmq' | |
token = 'rabbitmq' | |
client = MQTTNode(host, user, token) | |
client.consume('TEST/#') | |
client.on_message = lambda v: print("WE GOT A MESSAGE! {}".format(v)) | |
while True: | |
message = input('enter some message or exit: ') | |
if message == 'exit': | |
break | |
client.send(message, topic='TEST/OUTPUT') | |
print("Wait to shutdown...") | |
client.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I later figured that the paho-mqtt library does essentially the same thing for MQTT v3.x, which is what I needed. gmqtt supports MQTT v5 though, so may come handy if you need that. Also I just wanted to know how it works.