Skip to content

Instantly share code, notes, and snippets.

@allanlei
Created February 26, 2016 05:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save allanlei/6c7c3e14fe85ea5576e7 to your computer and use it in GitHub Desktop.
Save allanlei/6c7c3e14fe85ea5576e7 to your computer and use it in GitHub Desktop.
MQTT Example
import paho.mqtt.client as mqtt
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.connect('iot.eclipse.org', port=1883, keepalive=60)
client.publish('update', 'hello world')
from gevent import monkey
monkey.patch_all()
import paho.mqtt.client as mqtt
import blinker
import ssl
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('mqtt')
CODES = {
getattr(mqtt, err): err for err in (
'CONNACK_ACCEPTED',
'CONNACK_REFUSED_BAD_USERNAME_PASSWORD',
'CONNACK_REFUSED_IDENTIFIER_REJECTED',
'CONNACK_REFUSED_NOT_AUTHORIZED',
'CONNACK_REFUSED_PROTOCOL_VERSION',
'CONNACK_REFUSED_SERVER_UNAVAILABLE',
)
}
message_received = blinker.signal('message-received')
def on_connect(client, userdata, flags, return_code):
if return_code != mqtt.CONNACK_ACCEPTED:
raise Exception(CODES[return_code])
logger.info('Connected to %s:%d: %s',
client._host, client._port, CODES[return_code])
channels = [
# ("$SYS/#", 0),
# ("my/topic", 1),
('update', 1),
('update/accepted', 1),
('update/rejected', 1),
('update/delta', 1),
('get', 1),
('get/accepted', 1),
('get/rejected', 1),
('delete', 1),
('delete/accepted', 1),
('delete/rejected', 1),
]
client.subscribe(channels)
logger.info('Subscribed to: %r', channels)
def on_message(client, userdata, message):
message_received.send(
message.topic,
payload=message.payload, message=message, client=client, userdata=userdata)
@message_received.connect_via('my/topic')
@message_received.connect_via('update')
@message_received.connect_via('update/accepted')
@message_received.connect_via('update/rejected')
def debug(sender, payload, **kwargs):
print('{topic}: {message}'.format(topic=sender, message=payload))
if __name__ == '__main__':
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_message = on_message
# client.tls_set(
# 'ca.pem',
# certfile='client.crt', keyfile='client.key',
# cert_reqs=ssl.CERT_REQUIRED)
client.connect('iot.eclipse.org', port=1883, keepalive=60)
client.loop_forever() # loop_forever() will handle reconnecting for you. If you call disconnect() in a callback it will return.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment