Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@skylord123
Last active July 30, 2017 23:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skylord123/a54ce88aa79dfcd268a1cd92b499883d to your computer and use it in GitHub Desktop.
Save skylord123/a54ce88aa79dfcd268a1cd92b499883d to your computer and use it in GitHub Desktop.
Helper class for paho-mqtt to quickly integrate into existing sensor code (with reliable connection handling)
# Author: Skylar Sadlier
# Author URL: https://github.com/skylord123
# Script Link: https://gist.github.com/skylord123/a54ce88aa79dfcd268a1cd92b499883d
import paho.mqtt.client as mqtt_client
# - Class Terminal -
# Uses curses to draw data to the terminal window.
class MQTT_Helper():
def __init__(self, client_id='', host='', port=1883, username='', password='', keepalive=30, debug=False):
self.client_id = client_id
self.host = host
self.port = port
self.username = username
self.password = password
self.keepalive = keepalive
self.active = False
self.connected = False
self.debug = debug
self.topics = dict()
""" Init the client """
self.client = mqtt_client.Client(self.client_id)
self.client.on_message = self.on_message
self.client.on_publish = self.on_publish
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
if(self.username != '' and self.password != ''):
self.client.username_pw_set(self.username, self.password)
def subscribe(self, ch, cb):
if ch in self.topics:
return False
self.topics[ch] = cb
# subscribe to topics
if self.connected:
self.client.subscribe(ch, 2)
return True
def on_message(self, client, data, msg):
if self.debug:
print("[MQTT RECEIVED] " + msg.topic + ": " + str(msg.payload))
if msg.topic in self.topics:
self.topics[msg.topic](msg)
def exception_handler(self, request, exception):
if self.debug:
print("[MQTT ERROR]")
print(vars(exception))
print(vars(request))
def on_publish(self, client, userdata, mid):
if self.debug:
print("[MQTT] PUBLISHED")
def on_connect(self, client, userdata, flags, rc):
if self.debug:
print("[MQTT] CONNACK received with code %d." % (rc))
self.connected = True
for i in self.topics:
self.client.subscribe(i, 2)
def on_disconnect(self, client, userdata, rc):
if self.debug:
print("[MQTT] DISCON received with code %d." % (rc))
self.connected = False
self.active = False
def publish(self, channel, message):
if not self.connected and not self.active:
self.connect();
self.client.publish(channel, message)
def connect(self):
self.active = True
self.client.connect(self.host, self.port, self.keepalive)
self.client.loop_start()
def disconnect():
self.client.disconnect()
self.client.loop_stop()
@skylord123
Copy link
Author

Example:

from classes.mqtt_helper import MQTT_Helper
import time

mqtt = MQTT_Helper('client_id', 'example.com', '1883')
mqtt.connect()

def test_event(msg):
	print msg.topic + ": " + msg.payload

mqtt.subscribe('test/test', test_event)

sup = 0
while True:
	try:
		sup += 1
		mqtt.publish("test/test2", "yo sup " + str(sup))
	except KeyboardInterrupt:
		print "Bye"
		GPIO.cleanup()
		sys.exit()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment