Skip to content

Instantly share code, notes, and snippets.

@imneonizer
Created September 13, 2021 08:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save imneonizer/574d3fe49eb8135e316730b9257d8ee2 to your computer and use it in GitHub Desktop.
Save imneonizer/574d3fe49eb8135e316730b9257d8ee2 to your computer and use it in GitHub Desktop.
HiveMQTT Custom Req-Resp with Auth
# pip install paho-mqtt
import paho.mqtt.client as paho
import paho.mqtt.subscribe as subscribe
import uuid
import contextlib
import threading
import json
import time
class MQTTClient:
def __init__(self, host="broker.hivemq.com", port=1883, req_topic="test", res_topic="test", reconnect_interval=2, token="bearer-token"):
self.host = host
self.port = port
self.uid = ''
self.message = ''
self.token = token
self.reconnect_interval = reconnect_interval
self.req_topic = req_topic
self.res_topic = res_topic
self.subscribe_thread = None
self.subscribe_thread_kill = False
self.init()
def init(self):
self.client = paho.Client()
self.client.on_connect = self.on_connect
self.client.on_publish = self.on_publish
self.client.connect(self.host, self.port)
self.client.loop_start()
self.subscribe_thread = threading.Thread(target=self.subscribe)
self.subscribe_thread.start()
def on_connect(self,client, userdata, flags, rc):
# print("Connected successfully!")
pass
def on_publish(self, client, userdata, mid):
self.mid = mid
def subscribe(self):
with contextlib.suppress(KeyboardInterrupt):
subscribe.callback(
self.on_message, self.res_topic, hostname=self.host, port=self.port, keepalive=60
)
def on_message(self, client, userdata, msg):
payload = msg.payload.decode('utf8')
try:
uid, token, message = payload.split(";")
if [uid,token,message] == ["__reset__"]*3:
self.uid = "__reset__"
return
if token == self.token:
self.uid = uid
self.message = message
except Exception:
pass
if self.subscribe_thread_kill:
raise KeyboardInterrupt
def disconnect(self):
self.subscribe_thread_kill = True
def send(self, message):
uid = str(uuid.uuid4())
rc, mid = self.client.publish(self.req_topic, f"{uid};{self.token};{message}", qos=1)
# wait for message to be published
# to avoid rate limit issues
st = time.time()
while self.uid != uid:
if self.uid == "__reset__":
return
et = time.time()
if (et - st) > self.reconnect_interval:
self.init(); return self.send(message)
time.sleep(0.03)
return self.message
if __name__ == "__main__":
con = MQTTClient(
req_topic="imneonizer/request",
res_topic="imneonizer/response",
token="a586a816-1c7a-4ef9-abf0-8d290c4b7d58"
)
while True:
try:
response = con.send("hello")
print(response)
except KeyboardInterrupt:
con.disconnect()
break
import json
import sys
import paho.mqtt.subscribe as subscribe
import time
class MQTTServer:
def __init__(self, host="broker.hivemq.com", port=1883, req_topic="test", res_topic="test", reconnect_interval=2, token="bearer-token"):
self.host = host
self.port = port
self.token = token
self.req_topic = req_topic
self.res_topic = res_topic
def start(self, callback=lambda x: x):
self.callback = callback
while True:
try:
subscribe.callback(
self.on_message, self.req_topic, hostname=self.host, port=self.port, keepalive=60
)
except Exception as e:
print(e)
def on_message(self, client, userdata, msg):
try:
uid, token, message = msg.payload.decode('utf8').split(";")
if token == self.token:
response = self.callback(message)
client.publish(self.res_topic, f"{uid};{self.token};{response}", qos=1)
except ValueError:
client.publish(self.res_topic, f"__reset__;__reset__;__reset__", qos=1)
if __name__ == "__main__":
con = MQTTServer(
req_topic="imneonizer/request",
res_topic="imneonizer/response",
token="a586a816-1c7a-4ef9-abf0-8d290c4b7d58"
)
def callback(message):
print(message)
return "world"
con.start(callback=callback)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment