Skip to content

Instantly share code, notes, and snippets.

@voith
Last active March 17, 2018 13:58
Show Gist options
  • Save voith/455a145f35edaa5ec9a3b6d50244fa26 to your computer and use it in GitHub Desktop.
Save voith/455a145f35edaa5ec9a3b6d50244fa26 to your computer and use it in GitHub Desktop.
Basic Mqtt test

** setup

  1. Install python2
  2. Install paho-mqtt libtray: pip install paho-mqtt
  3. Download all three python files in the gist and store them in the same directory:
  4. To run the subscriber run python mqtt_subscriber.py
  5. To run the publisher run python mqtt_publisher.py
from uuid import uuid4
from paho.mqtt.client import Client
class MqttClient(object):
def __init__(self, client_id=None, host="iot.eclipse.org", port=1883):
client_id = client_id or str(uuid4())
self._client = Client(client_id=client_id)
self.host = host
self.port = port
def connect(self):
self._client.connect(host=self.host, port=self.port)
print('established connection to client with host: {host}, port={port}'.format(host=self.host, port=self.port))
def loop_forever(self):
self._client.loop_forever()
@property
def client(self):
return self._client
import json
from time import sleep
from mqtt_client import MqttClient
class MqttPublisher(object):
def __init__(self, topic):
self.topic = topic
self.mqtt = MqttClient()
self.mqtt.connect()
def publish_message(self, payload, qos=0, retain=False, serializer=json.dumps):
print('publishing message: {message}'.format(message=payload))
serialized_message = serializer(payload)
self.mqtt.client.publish(topic=self.topic, payload=serialized_message, qos=qos, retain=retain)
print('published message: {message} to topic: {topic}'.format(message=payload, topic=self.topic))
def publish_data():
publisher = MqttPublisher(topic='voith/test')
for i in range(10):
data = dict({'lat': i, 'long': -i})
publisher.publish_message(payload=data)
sleep(1)
if __name__ == '__main__':
publish_data()
import json
from mqtt_client import MqttClient
class MqttSubscriber(object):
def __init__(self, topic, deserializer=json.loads):
self.topic = topic
self.deserializer = deserializer
self.mqtt = MqttClient()
self.mqtt.client.on_connect = self.on_connect
self.mqtt.client.on_message = self.message_arrived
def on_connect(self, client, userdata, flags, rc):
print('connected to client with return code: {rc}'.format(rc=rc))
client.subscribe(self.topic)
def message_arrived(self, client, userdata, msg):
# every time a message is published, this method will automatically be called
print('received message: {msg} from topic: {topic}'.format(msg=msg.payload, topic=msg.topic))
# this is how you decode the json.
# the deserializer here points to json.dumps
deserailized_data = self.deserializer(msg.payload)
# this is how you access the lat and long
# instead of the print put your lcd code here
print('received latiude: {}'.format(deserailized_data['lat']))
print('received longitude: {}'.format(deserailized_data['long']))
def listen_for_messages(self):
print('listening for messages from topic: {topic}'.format(topic=self.topic))
self.mqtt.connect()
# this is the event loop I was talking about..
# think of this as a loop that runs forever.
# whener a message is received it calls the on_message method
# which is pointing to message_arrived in our case.
self.mqtt.loop_forever()
def start_subscriber():
subscriber = MqttSubscriber(topic='voith/test')
subscriber.listen_for_messages()
if __name__ == '__main__':
start_subscriber()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment