This is an mqtt subscribe client for micropython
# Copyright (c) 2012-2014 Roger Light <roger@atchoo.org> | |
# | |
# This is a stripped down version of paho.mqtt intended for micropython | |
# All rights reserved. This program and the accompanying materials | |
# are made available under the terms of the Eclipse Public License v1.0 | |
# and Eclipse Distribution License v1.0 which accompany this distribution. | |
# | |
# The Eclipse Public License is available at | |
# http://www.eclipse.org/legal/epl-v10.html | |
# and the Eclipse Distribution License is available at | |
# http://www.eclipse.org/org/documents/edl-v10.php. | |
# | |
# Contributors: | |
# Roger Light - initial API and implementation | |
""" | |
This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging | |
protocol that is easy to implement and suitable for low powered devices. | |
""" | |
import errno | |
import sys | |
import select | |
import socket | |
import ustruct as struct | |
import utime as time | |
EAGAIN = errno.EAGAIN | |
MQTTv31 = 3 | |
MQTTv311 = 4 | |
PROTOCOL_NAMEv31 = b"MQIsdp" | |
PROTOCOL_NAMEv311 = b"MQTT" | |
PROTOCOL_VERSION = 3 | |
# Message types | |
CONNECT = 0x10 | |
CONNACK = 0x20 | |
PUBLISH = 0x30 | |
PUBACK = 0x40 | |
PUBREC = 0x50 | |
PUBREL = 0x60 | |
PUBCOMP = 0x70 | |
SUBSCRIBE = 0x80 | |
SUBACK = 0x90 | |
UNSUBSCRIBE = 0xA0 | |
UNSUBACK = 0xB0 | |
PINGREQ = 0xC0 | |
PINGRESP = 0xD0 | |
DISCONNECT = 0xE0 | |
# Log levels | |
MQTT_LOG_INFO = 0x01 | |
MQTT_LOG_NOTICE = 0x02 | |
MQTT_LOG_WARNING = 0x04 | |
MQTT_LOG_ERR = 0x08 | |
MQTT_LOG_DEBUG = 0x10 | |
# CONNACK codes | |
CONNACK_ACCEPTED = 0 | |
CONNACK_REFUSED_PROTOCOL_VERSION = 1 | |
CONNACK_REFUSED_IDENTIFIER_REJECTED = 2 | |
CONNACK_REFUSED_SERVER_UNAVAILABLE = 3 | |
CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4 | |
CONNACK_REFUSED_NOT_AUTHORIZED = 5 | |
# Connection state | |
mqtt_cs_new = 0 | |
mqtt_cs_connected = 1 | |
mqtt_cs_disconnecting = 2 | |
mqtt_cs_connect_async = 3 | |
# Message state | |
mqtt_ms_invalid = 0 | |
mqtt_ms_publish= 1 | |
mqtt_ms_wait_for_puback = 2 | |
mqtt_ms_wait_for_pubrec = 3 | |
mqtt_ms_resend_pubrel = 4 | |
mqtt_ms_wait_for_pubrel = 5 | |
mqtt_ms_resend_pubcomp = 6 | |
mqtt_ms_wait_for_pubcomp = 7 | |
mqtt_ms_send_pubrec = 8 | |
mqtt_ms_queued = 9 | |
# Error values | |
MQTT_ERR_AGAIN = -1 | |
MQTT_ERR_SUCCESS = 0 | |
MQTT_ERR_NOMEM = 1 | |
MQTT_ERR_PROTOCOL = 2 | |
MQTT_ERR_INVAL = 3 | |
MQTT_ERR_NO_CONN = 4 | |
MQTT_ERR_CONN_REFUSED = 5 | |
MQTT_ERR_NOT_FOUND = 6 | |
MQTT_ERR_CONN_LOST = 7 | |
MQTT_ERR_TLS = 8 | |
MQTT_ERR_PAYLOAD_SIZE = 9 | |
MQTT_ERR_NOT_SUPPORTED = 10 | |
MQTT_ERR_AUTH = 11 | |
MQTT_ERR_ACL_DENIED = 12 | |
MQTT_ERR_UNKNOWN = 13 | |
MQTT_ERR_ERRNO = 14 | |
sockpair_data = b"0" | |
def error_string(mqtt_errno): | |
"""Return the error string associated with an mqtt error number.""" | |
if mqtt_errno == MQTT_ERR_SUCCESS: | |
return "No error." | |
elif mqtt_errno == MQTT_ERR_NOMEM: | |
return "Out of memory." | |
elif mqtt_errno == MQTT_ERR_PROTOCOL: | |
return "A network protocol error occurred when communicating with the broker." | |
elif mqtt_errno == MQTT_ERR_INVAL: | |
return "Invalid function arguments provided." | |
elif mqtt_errno == MQTT_ERR_NO_CONN: | |
return "The client is not currently connected." | |
elif mqtt_errno == MQTT_ERR_CONN_REFUSED: | |
return "The connection was refused." | |
elif mqtt_errno == MQTT_ERR_NOT_FOUND: | |
return "Message not found (internal error)." | |
elif mqtt_errno == MQTT_ERR_CONN_LOST: | |
return "The connection was lost." | |
elif mqtt_errno == MQTT_ERR_TLS: | |
return "A TLS error occurred." | |
elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE: | |
return "Payload too large." | |
elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED: | |
return "This feature is not supported." | |
elif mqtt_errno == MQTT_ERR_AUTH: | |
return "Authorisation failed." | |
elif mqtt_errno == MQTT_ERR_ACL_DENIED: | |
return "Access denied by ACL." | |
elif mqtt_errno == MQTT_ERR_UNKNOWN: | |
return "Unknown error." | |
elif mqtt_errno == MQTT_ERR_ERRNO: | |
return "Error defined by errno." | |
else: | |
return "Unknown error." | |
def connack_string(connack_code): | |
"""Return the string associated with a CONNACK result.""" | |
if connack_code == 0: | |
return "Connection Accepted." | |
elif connack_code == 1: | |
return "Connection Refused: unacceptable protocol version." | |
elif connack_code == 2: | |
return "Connection Refused: identifier rejected." | |
elif connack_code == 3: | |
return "Connection Refused: broker unavailable." | |
elif connack_code == 4: | |
return "Connection Refused: bad user name or password." | |
elif connack_code == 5: | |
return "Connection Refused: not authorised." | |
else: | |
return "Connection Refused: unknown reason." | |
def topic_matches_sub(sub, topic): | |
"""Check whether a topic matches a subscription. | |
For example: | |
foo/bar would match the subscription foo/# or +/bar | |
non/matching would not match the subscription non/+/+ | |
""" | |
result = True | |
multilevel_wildcard = False | |
slen = len(sub) | |
tlen = len(topic) | |
if slen > 0 and tlen > 0: | |
if (sub[0] == '$' and topic[0] != '$') or (topic[0] == '$' and sub[0] != '$'): | |
return False | |
spos = 0 | |
tpos = 0 | |
while spos < slen and tpos < tlen: | |
if sub[spos] == topic[tpos]: | |
if tpos == tlen-1: | |
# Check for e.g. foo matching foo/# | |
if spos == slen-3 and sub[spos+1] == '/' and sub[spos+2] == '#': | |
result = True | |
multilevel_wildcard = True | |
break | |
spos += 1 | |
tpos += 1 | |
if tpos == tlen and spos == slen-1 and sub[spos] == '+': | |
spos += 1 | |
result = True | |
break | |
else: | |
if sub[spos] == '+': | |
spos += 1 | |
while tpos < tlen and topic[tpos] != '/': | |
tpos += 1 | |
if tpos == tlen and spos == slen: | |
result = True | |
break | |
elif sub[spos] == '#': | |
multilevel_wildcard = True | |
if spos+1 != slen: | |
result = False | |
break | |
else: | |
result = True | |
break | |
else: | |
result = False | |
break | |
if not multilevel_wildcard and (tpos < tlen or spos < slen): | |
result = False | |
return result | |
class MQTTMessage: | |
""" This is a class that describes an incoming message. It is passed to the | |
on_message callback as the message parameter. | |
""" | |
def __init__(self): | |
self.timestamp = 0 | |
self.state = mqtt_ms_invalid | |
self.dup = False | |
self.mid = 0 | |
self.topic = "" | |
self.payload = None | |
self.qos = 0 | |
self.retain = False | |
class Client(object): | |
"""MQTT version 3.1/3.1.1 client class. | |
This is the main class for use communicating with an MQTT broker. | |
""" | |
def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv31): | |
if not clean_session and (client_id == "" or client_id is None): | |
raise ValueError('A client id must be provided if clean session is False.') | |
self._protocol = protocol | |
self._userdata = userdata | |
self._sock = None | |
self._keepalive = 60 | |
self._message_retry = 20 | |
self._last_retry_check = 0 | |
self._clean_session = clean_session | |
if client_id == "" or client_id is None: | |
self._client_id = "umqtt" | |
else: | |
self._client_id = client_id | |
self._username = "" | |
self._password = "" | |
self._in_packet = { | |
"command": 0, | |
"have_remaining": 0, | |
"remaining_count": [], | |
"remaining_mult": 1, | |
"remaining_length": 0, | |
"packet": b"", | |
"to_process": 0, | |
"pos": 0} | |
self._out_packet = [] | |
self._current_out_packet = None | |
self._last_msg_in = time.time() | |
self._last_msg_out = time.time() | |
self._ping_t = 0 | |
self._last_mid = 0 | |
self._state = mqtt_cs_new | |
self._out_messages = [] | |
self._in_messages = [] | |
self._max_inflight_messages = 20 | |
self._inflight_messages = 0 | |
self._will = False | |
self._will_topic = "" | |
self._will_payload = None | |
self._will_qos = 0 | |
self._will_retain = False | |
self.on_disconnect = None | |
self.on_connect = None | |
self.on_publish = None | |
self.on_message = None | |
self.on_message_filtered = [] | |
self.on_subscribe = None | |
self.on_unsubscribe = None | |
self.on_log = None #None | |
self._host = "" | |
self._port = 1883 | |
self._bind_address = "" | |
self._in_callback = False | |
self._strict_protocol = False | |
def __del__(self): | |
pass | |
def reinitialise(self, client_id="", clean_session=True, userdata=None): | |
if self._sock: | |
self._sock.close() | |
self._sock = None | |
self.__init__(client_id, clean_session, userdata) | |
def connect(self, host, port=1883, keepalive=60, bind_address=""): | |
"""Connect to a remote broker. | |
""" | |
print("connect") | |
self.connect_async(host, port, keepalive, bind_address) | |
return self.reconnect() | |
def connect_async(self, host, port=1883, keepalive=60, bind_address=""): | |
"""Connect to a remote broker asynchronously. This is a non-blocking | |
connect call that can be used with loop_start() to provide very quick | |
start. | |
""" | |
print("connect_async") | |
if host is None or len(host) == 0: | |
raise ValueError('Invalid host.') | |
if port <= 0: | |
raise ValueError('Invalid port number.') | |
if keepalive < 0: | |
raise ValueError('Keepalive must be >=0.') | |
self._host = host | |
self._port = port | |
self._keepalive = keepalive | |
self._bind_address = bind_address | |
self._state = mqtt_cs_connect_async | |
def reconnect(self): | |
"""Reconnect the client after a disconnect. Can only be called after | |
connect()/connect_async().""" | |
print("reconnect") | |
if len(self._host) == 0: | |
raise ValueError('Invalid host.') | |
if self._port <= 0: | |
raise ValueError('Invalid port number.') | |
self._in_packet = { | |
"command": 0, | |
"have_remaining": 0, | |
"remaining_count": [], | |
"remaining_mult": 1, | |
"remaining_length": 0, | |
"packet": b"", | |
"to_process": 0, | |
"pos": 0} | |
self._out_packet = [] | |
self._current_out_packet = None | |
self._last_msg_in = time.time() | |
self._last_msg_out = time.time() | |
self._ping_t = 0 | |
self._state = mqtt_cs_new | |
if self._sock: | |
self._sock.close() | |
self._sock = None | |
print("self._sock == None") | |
# Put messages in progress in a valid state. | |
self._messages_reconnect_reset() | |
sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0)) | |
self._sock = sock | |
self._sock.setblocking(0) | |
self.ep = select.epoll() | |
self.fileno = self._sock.fileno() | |
self.ep.register(self.fileno) | |
print("self._sock =", self._sock) | |
return self._send_connect(self._keepalive, self._clean_session) | |
def loop(self, timeout=1.0, max_packets=1): | |
"""Process network events. | |
""" | |
print("loop") | |
if timeout < 0.0: | |
raise ValueError('Invalid timeout.') | |
print("self._current_out_packet =", self._current_out_packet) | |
print("self._out_packet =", self._out_packet) | |
if self._current_out_packet is None and len(self._out_packet) > 0: | |
self._current_out_packet = self._out_packet.pop(0) | |
print("self._current_out_packet =", self._current_out_packet) | |
events = self.ep.poll(1) | |
print("events = ", events) | |
for fileno, ev in events: | |
if ev & select.EPOLLIN: | |
rc = self.loop_read(max_packets) | |
if rc or (self._sock is None): | |
return rc | |
if ev & select.EPOLLOUT and self._current_out_packet: | |
rc = self.loop_write(max_packets) | |
if rc or (self._sock is None): | |
return rc | |
return self.loop_misc() | |
# def publish(self, topic, payload=None, qos=0, retain=False): | |
# """Publish a message on a topic. | |
# | |
# This causes a message to be sent to the broker and subsequently from | |
# the broker to any clients subscribing to matching topics. | |
# | |
# topic: The topic that the message should be published on. | |
# payload: The actual message to send. If not given, or set to None a | |
# zero length message will be used. Passing an int or float will result | |
# in the payload being converted to a string representing that number. If | |
# you wish to send a true int/float, use struct.pack() to create the | |
# payload you require. | |
# qos: The quality of service level to use. | |
# retain: If set to true, the message will be set as the "last known | |
# good"/retained message for the topic. | |
# | |
# Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to | |
# indicate success or MQTT_ERR_NO_CONN if the client is not currently | |
# connected. mid is the message ID for the publish request. The mid | |
# value can be used to track the publish request by checking against the | |
# mid argument in the on_publish() callback if it is defined. | |
# | |
# A ValueError will be raised if topic is None, has zero length or is | |
# invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if | |
# the length of the payload is greater than 268435455 bytes.""" | |
# if topic is None or len(topic) == 0: | |
# raise ValueError('Invalid topic.') | |
# if qos<0 or qos>2: | |
# raise ValueError('Invalid QoS level.') | |
# if isinstance(payload, str) or isinstance(payload, bytearray): | |
# local_payload = payload | |
# elif sys.version_info[0] < 3 and isinstance(payload, unicode): | |
# local_payload = payload | |
# elif isinstance(payload, int) or isinstance(payload, float): | |
# local_payload = str(payload) | |
# elif payload is None: | |
# local_payload = None | |
# else: | |
# raise TypeError('payload must be a string, bytearray, int, float or None.') | |
# | |
# if local_payload is not None and len(local_payload) > 268435455: | |
# raise ValueError('Payload too large.') | |
# | |
# if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS: | |
# raise ValueError('Publish topic cannot contain wildcards.') | |
# | |
# local_mid = self._mid_generate() | |
# | |
# if qos == 0: | |
# rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False) | |
# return (rc, local_mid) | |
# else: | |
# message = MQTTMessage() | |
# message.timestamp = time.time() | |
# | |
# message.mid = local_mid | |
# message.topic = topic | |
# if local_payload is None or len(local_payload) == 0: | |
# message.payload = None | |
# else: | |
# message.payload = local_payload | |
# | |
# message.qos = qos | |
# message.retain = retain | |
# message.dup = False | |
# | |
# self._out_message_mutex.acquire() | |
# self._out_messages.append(message) | |
# if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: | |
# self._inflight_messages = self._inflight_messages+1 | |
# if qos == 1: | |
# message.state = mqtt_ms_wait_for_puback | |
# elif qos == 2: | |
# message.state = mqtt_ms_wait_for_pubrec | |
# self._out_message_mutex.release() | |
# | |
# rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup) | |
# | |
# # remove from inflight messages so it will be send after a connection is made | |
# if rc is MQTT_ERR_NO_CONN: | |
# with self._out_message_mutex: | |
# self._inflight_messages -= 1 | |
# message.state = mqtt_ms_publish | |
# | |
# return (rc, local_mid) | |
# else: | |
# message.state = mqtt_ms_queued; | |
# self._out_message_mutex.release() | |
# return (MQTT_ERR_SUCCESS, local_mid) | |
# | |
# def username_pw_set(self, username, password=None): | |
# """Set a username and optionally a password for broker authentication. | |
# | |
# Must be called before connect() to have any effect. | |
# Requires a broker that supports MQTT v3.1. | |
# | |
# username: The username to authenticate with. Need have no relationship to the client id. | |
# password: The password to authenticate with. Optional, set to None if not required. | |
# """ | |
# self._username = username.encode('utf-8') | |
# self._password = password | |
def disconnect(self): | |
"""Disconnect a connected client from the broker.""" | |
self._state = mqtt_cs_disconnecting | |
if self._sock is None: | |
return MQTT_ERR_NO_CONN | |
return self._send_disconnect() | |
def subscribe(self, topic, qos=0): | |
"""Subscribe the client to one or more topics.""" | |
print("subscribe") | |
topic_qos_list = None | |
if isinstance(topic, str): | |
if qos<0 or qos>2: | |
raise ValueError('Invalid QoS level.') | |
if topic is None or len(topic) == 0: | |
raise ValueError('Invalid topic.') | |
topic_qos_list = [(topic.encode('utf-8'), qos)] | |
elif isinstance(topic, tuple): | |
if topic[1]<0 or topic[1]>2: | |
raise ValueError('Invalid QoS level.') | |
if topic[0] is None or len(topic[0]) == 0 or not isinstance(topic[0], str): | |
raise ValueError('Invalid topic.') | |
topic_qos_list = [(topic[0].encode('utf-8'), topic[1])] | |
elif isinstance(topic, list): | |
topic_qos_list = [] | |
for t in topic: | |
if t[1]<0 or t[1]>2: | |
raise ValueError('Invalid QoS level.') | |
if t[0] is None or len(t[0]) == 0 or not isinstance(t[0], str): | |
raise ValueError('Invalid topic.') | |
topic_qos_list.append((t[0].encode('utf-8'), t[1])) | |
if topic_qos_list is None: | |
raise ValueError("No topic specified, or incorrect topic type.") | |
if self._sock is None: | |
return (MQTT_ERR_NO_CONN, None) | |
return self._send_subscribe(False, topic_qos_list) | |
def unsubscribe(self, topic): | |
"""Unsubscribe the client from one or more topics.""" | |
topic_list = None | |
if topic is None: | |
raise ValueError('Invalid topic.') | |
if isinstance(topic, str): | |
if len(topic) == 0: | |
raise ValueError('Invalid topic.') | |
topic_list = [topic.encode('utf-8')] | |
elif isinstance(topic, list): | |
topic_list = [] | |
for t in topic: | |
if len(t) == 0 or not isinstance(t, str): | |
raise ValueError('Invalid topic.') | |
topic_list.append(t.encode('utf-8')) | |
if topic_list is None: | |
raise ValueError("No topic specified, or incorrect topic type.") | |
if self._sock is None and self._ssl is None: | |
return (MQTT_ERR_NO_CONN, None) | |
return self._send_unsubscribe(False, topic_list) | |
def loop_read(self, max_packets=1): | |
"""Process read network events. """ | |
print("loop_read") | |
if self._sock is None and self._ssl is None: | |
return MQTT_ERR_NO_CONN | |
max_packets = len(self._out_messages) + len(self._in_messages) | |
if max_packets < 1: | |
max_packets = 1 | |
for i in range(0, max_packets): | |
rc = self._packet_read() | |
if rc > 0: | |
return self._loop_rc_handle(rc) | |
elif rc == MQTT_ERR_AGAIN: | |
return MQTT_ERR_SUCCESS | |
return MQTT_ERR_SUCCESS | |
def loop_write(self, max_packets=1): | |
"""Process write network events.""" | |
print("loop_write") | |
if self._sock is None and self._ssl is None: | |
return MQTT_ERR_NO_CONN | |
max_packets = len(self._out_packet) + 1 | |
if max_packets < 1: | |
max_packets = 1 | |
for i in range(0, max_packets): | |
rc = self._packet_write() | |
if rc > 0: | |
return self._loop_rc_handle(rc) | |
elif rc == MQTT_ERR_AGAIN: | |
return MQTT_ERR_SUCCESS | |
return MQTT_ERR_SUCCESS | |
def loop_misc(self): | |
"""Process miscellaneous network events.""" | |
print("loop_misc") | |
if self._sock is None: # and self._ssl is None: | |
return MQTT_ERR_NO_CONN | |
now = time.time() | |
self._check_keepalive() | |
if self._last_retry_check+1 < now: | |
# Only check once a second at most | |
self._message_retry_check() | |
self._last_retry_check = now | |
if self._ping_t > 0 and now - self._ping_t >= self._keepalive: | |
# client->ping_t != 0 means we are waiting for a pingresp. | |
# This hasn't happened in the keepalive time so we should disconnect. | |
if self._sock: | |
self._sock.close() | |
self._sock = None | |
if self._state == mqtt_cs_disconnecting: | |
rc = MQTT_ERR_SUCCESS | |
else: | |
rc = 1 | |
if self.on_disconnect: | |
self._in_callback = True | |
self.on_disconnect(self, self._userdata, rc) | |
self._in_callback = False | |
return MQTT_ERR_CONN_LOST | |
return MQTT_ERR_SUCCESS | |
def max_inflight_messages_set(self, inflight): | |
"""Set the maximum number of messages with QoS>0 that can be part way | |
through their network flow at once. Defaults to 20.""" | |
if inflight < 0: | |
raise ValueError('Invalid inflight.') | |
self._max_inflight_messages = inflight | |
def message_retry_set(self, retry): | |
"""Set the timeout in seconds before a message with QoS>0 is retried. | |
20 seconds by default.""" | |
if retry < 0: | |
raise ValueError('Invalid retry.') | |
self._message_retry = retry | |
def user_data_set(self, userdata): | |
"""Set the user data variable passed to callbacks. May be any data type.""" | |
self._userdata = userdata | |
def will_set(self, topic, payload=None, qos=0, retain=False): | |
"""Set a Will to be sent by the broker in case the client disconnects unexpectedly. | |
This must be called before connect() to have any effect. | |
topic: The topic that the will message should be published on. | |
payload: The message to send as a will. If not given, or set to None a | |
zero length message will be used as the will. Passing an int or float | |
will result in the payload being converted to a string representing | |
that number. If you wish to send a true int/float, use struct.pack() to | |
create the payload you require. | |
qos: The quality of service level to use for the will. | |
retain: If set to true, the will message will be set as the "last known | |
good"/retained message for the topic. | |
Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has | |
zero string length. | |
""" | |
if topic is None or len(topic) == 0: | |
raise ValueError('Invalid topic.') | |
if qos<0 or qos>2: | |
raise ValueError('Invalid QoS level.') | |
if isinstance(payload, str): | |
self._will_payload = payload.encode('utf-8') | |
elif isinstance(payload, bytearray): | |
self._will_payload = payload | |
elif isinstance(payload, int) or isinstance(payload, float): | |
self._will_payload = str(payload) | |
elif payload is None: | |
self._will_payload = None | |
else: | |
raise TypeError('payload must be a string, bytearray, int, float or None.') | |
self._will = True | |
self._will_topic = topic.encode('utf-8') | |
self._will_qos = qos | |
self._will_retain = retain | |
def will_clear(self): | |
""" Removes a will that was previously configured with will_set(). | |
Must be called before connect() to have any effect.""" | |
self._will = False | |
self._will_topic = "" | |
self._will_payload = None | |
self._will_qos = 0 | |
self._will_retain = False | |
def socket(self): | |
"""Return the socket or ssl object for this client.""" | |
return self._sock | |
def message_callback_add(self, sub, callback): | |
"""Register a message callback for a specific topic. | |
Messages that match 'sub' will be passed to 'callback'. Any | |
non-matching messages will be passed to the default on_message | |
callback. | |
Call multiple times with different 'sub' to define multiple topic | |
specific callbacks. | |
Topic specific callbacks may be removed with | |
message_callback_remove().""" | |
if callback is None or sub is None: | |
raise ValueError("sub and callback must both be defined.") | |
for i in range(0, len(self.on_message_filtered)): | |
if self.on_message_filtered[i][0] == sub: | |
self.on_message_filtered[i] = (sub, callback) | |
return | |
self.on_message_filtered.append((sub, callback)) | |
def message_callback_remove(self, sub): | |
"""Remove a message callback previously registered with | |
message_callback_add().""" | |
if sub is None: | |
raise ValueError("sub must defined.") | |
for i in range(0, len(self.on_message_filtered)): | |
if self.on_message_filtered[i][0] == sub: | |
self.on_message_filtered.pop(i) | |
return | |
# ============================================================ | |
# Private functions | |
# ============================================================ | |
def _loop_rc_handle(self, rc): | |
if rc: | |
if self._ssl: | |
self._ssl.close() | |
self._ssl = None | |
elif self._sock: | |
self._sock.close() | |
self._sock = None | |
if self._state == mqtt_cs_disconnecting: | |
rc = MQTT_ERR_SUCCESS | |
if self.on_disconnect: | |
self._in_callback = True | |
self.on_disconnect(self, self._userdata, rc) | |
self._in_callback = False | |
return rc | |
def _packet_read(self): | |
# This gets called if pselect() indicates that there is network data | |
# available - ie. at least one byte. What we do depends on what data we | |
# already have. | |
# If we've not got a command, attempt to read one and save it. This should | |
# always work because it's only a single byte. | |
# Then try to read the remaining length. This may fail because it is may | |
# be more than one byte - will need to save data pending next read if it | |
# does fail. | |
# Then try to read the remaining payload, where 'payload' here means the | |
# combined variable header and actual payload. This is the most likely to | |
# fail due to longer length, so save current data and current position. | |
# After all data is read, send to _mqtt_handle_packet() to deal with. | |
# Finally, free the memory and reset everything to starting conditions. | |
print("_packet_read") | |
if self._in_packet['command'] == 0: | |
try: | |
command = self._sock.recv(1) | |
except socket.error as err: | |
if err.errno == EAGAIN: | |
return MQTT_ERR_AGAIN | |
print(err) | |
return 1 | |
else: | |
if len(command) == 0: | |
return 1 | |
command = struct.unpack("!B", command) | |
self._in_packet['command'] = command[0] | |
if self._in_packet['have_remaining'] == 0: | |
# Read remaining | |
# Algorithm for decoding taken from pseudo code at | |
# http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm | |
while True: | |
try: | |
byte = self._sock.recv(1) | |
except socket.error as err: | |
if err.errno == EAGAIN: | |
return MQTT_ERR_AGAIN | |
print(err) | |
return 1 | |
else: | |
byte = struct.unpack("!B", byte) | |
byte = byte[0] | |
self._in_packet['remaining_count'].append(byte) | |
# Max 4 bytes length for remaining length as defined by protocol. | |
# Anything more likely means a broken/malicious client. | |
if len(self._in_packet['remaining_count']) > 4: | |
return MQTT_ERR_PROTOCOL | |
self._in_packet['remaining_length'] = self._in_packet['remaining_length'] + (byte & 127)*self._in_packet['remaining_mult'] | |
self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128 | |
if (byte & 128) == 0: | |
break | |
self._in_packet['have_remaining'] = 1 | |
self._in_packet['to_process'] = self._in_packet['remaining_length'] | |
while self._in_packet['to_process'] > 0: | |
try: | |
data = self._sock.recv(self._in_packet['to_process']) | |
except socket.error as err: | |
if err.errno == EAGAIN: | |
return MQTT_ERR_AGAIN | |
print(err) | |
return 1 | |
else: | |
self._in_packet['to_process'] = self._in_packet['to_process'] - len(data) | |
self._in_packet['packet'] = self._in_packet['packet'] + data | |
# All data for this packet is read. | |
self._in_packet['pos'] = 0 | |
rc = self._packet_handle() | |
# Free data and reset values | |
self._in_packet = dict( | |
command=0, | |
have_remaining=0, | |
remaining_count=[], | |
remaining_mult=1, | |
remaining_length=0, | |
packet=b"", | |
to_process=0, | |
pos=0) | |
self._last_msg_in = time.time() | |
return rc | |
def _packet_write(self): | |
print("_packet_write") | |
while self._current_out_packet: | |
packet = self._current_out_packet | |
try: | |
write_length = self._sock.send(packet['packet'][packet['pos']:]) | |
except AttributeError: | |
return MQTT_ERR_SUCCESS | |
except socket.error as err: | |
if err.errno == EAGAIN: | |
return MQTT_ERR_AGAIN | |
print(err) | |
return 1 | |
if write_length > 0: | |
packet['to_process'] = packet['to_process'] - write_length | |
packet['pos'] = packet['pos'] + write_length | |
if packet['to_process'] == 0: | |
if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0: | |
if self.on_publish: | |
self._in_callback = True | |
self.on_publish(self, self._userdata, packet['mid']) | |
self._in_callback = False | |
if (packet['command'] & 0xF0) == DISCONNECT: | |
self._last_msg_out = time.time() | |
if self.on_disconnect: | |
self._in_callback = True | |
self.on_disconnect(self, self._userdata, 0) | |
self._in_callback = False | |
if self._sock: | |
self._sock.close() | |
self._sock = None | |
return MQTT_ERR_SUCCESS | |
if len(self._out_packet) > 0: | |
self._current_out_packet = self._out_packet.pop(0) | |
else: | |
self._current_out_packet = None | |
else: | |
break | |
self._last_msg_out = time.time() | |
print("self._current_out_packet =", self._current_out_packet) | |
return MQTT_ERR_SUCCESS | |
def _easy_log(self, level, buf): | |
if self.on_log: | |
self.on_log(self, self._userdata, level, buf) | |
def _check_keepalive(self): | |
now = time.time() | |
last_msg_out = self._last_msg_out | |
last_msg_in = self._last_msg_in | |
if (self._sock is not None or self._ssl is not None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive): | |
if self._state == mqtt_cs_connected and self._ping_t == 0: | |
self._send_pingreq() | |
self._last_msg_out = now | |
self._last_msg_in = now | |
else: | |
if self._sock: | |
self._sock.close() | |
self._sock = None | |
if self._state == mqtt_cs_disconnecting: | |
rc = MQTT_ERR_SUCCESS | |
else: | |
rc = 1 | |
if self.on_disconnect: | |
self._in_callback = True | |
self.on_disconnect(self, self._userdata, rc) | |
self._in_callback = False | |
def _mid_generate(self): | |
self._last_mid = self._last_mid + 1 | |
if self._last_mid == 65536: | |
self._last_mid = 1 | |
return self._last_mid | |
def _topic_wildcard_len_check(self, topic): | |
# Search for + or # in a topic. Return MQTT_ERR_INVAL if found. | |
# Also returns MQTT_ERR_INVAL if the topic string is too long. | |
# Returns MQTT_ERR_SUCCESS if everything is fine. | |
if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535: | |
return MQTT_ERR_INVAL | |
else: | |
return MQTT_ERR_SUCCESS | |
def _send_pingreq(self): | |
self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ") | |
rc = self._send_simple_command(PINGREQ) | |
if rc == MQTT_ERR_SUCCESS: | |
self._ping_t = time.time() | |
return rc | |
def _send_pingresp(self): | |
self._easy_log(MQTT_LOG_DEBUG, "Sending PINGRESP") | |
return self._send_simple_command(PINGRESP) | |
def _send_puback(self, mid): | |
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")") | |
return self._send_command_with_mid(PUBACK, mid, False) | |
def _send_pubcomp(self, mid): | |
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")") | |
return self._send_command_with_mid(PUBCOMP, mid, False) | |
def _pack_remaining_length(self, packet, remaining_length): | |
remaining_bytes = [] | |
while True: | |
byte = remaining_length % 128 | |
remaining_length = remaining_length // 128 | |
# If there are more digits to encode, set the top bit of this digit | |
if remaining_length > 0: | |
byte = byte | 0x80 | |
remaining_bytes.append(byte) | |
packet.extend(struct.pack("!B", byte)) | |
if remaining_length == 0: | |
# FIXME - this doesn't deal with incorrectly large payloads | |
return packet | |
def _pack_str16(self, packet, data): | |
if isinstance(data, bytearray) or isinstance(data, bytes): | |
packet.extend(struct.pack("!H", len(data))) | |
packet.extend(data) | |
elif isinstance(data, str): | |
udata = data.encode('utf-8') | |
pack_format = "!H" + str(len(udata)) + "s" | |
packet.extend(struct.pack(pack_format, len(udata), udata)) | |
else: | |
raise TypeError | |
def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False): | |
if self._sock is None and self._ssl is None: | |
return MQTT_ERR_NO_CONN | |
utopic = topic.encode('utf-8') | |
command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain | |
packet = bytearray() | |
packet.extend(struct.pack("!B", command)) | |
if payload is None: | |
remaining_length = 2+len(utopic) | |
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"' (NULL payload)") | |
else: | |
if isinstance(payload, str): | |
upayload = payload.encode('utf-8') | |
payloadlen = len(upayload) | |
elif isinstance(payload, bytearray): | |
payloadlen = len(payload) | |
elif isinstance(payload, unicode): | |
upayload = payload.encode('utf-8') | |
payloadlen = len(upayload) | |
remaining_length = 2+len(utopic) + payloadlen | |
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"', ... ("+str(payloadlen)+" bytes)") | |
if qos > 0: | |
# For message id | |
remaining_length = remaining_length + 2 | |
self._pack_remaining_length(packet, remaining_length) | |
self._pack_str16(packet, topic) | |
if qos > 0: | |
# For message id | |
packet.extend(struct.pack("!H", mid)) | |
if payload is not None: | |
if isinstance(payload, str): | |
pack_format = str(payloadlen) + "s" | |
packet.extend(struct.pack(pack_format, upayload)) | |
elif isinstance(payload, bytearray): | |
packet.extend(payload) | |
elif isinstance(payload, unicode): | |
pack_format = str(payloadlen) + "s" | |
packet.extend(struct.pack(pack_format, upayload)) | |
else: | |
raise TypeError('payload must be a string, unicode or a bytearray.') | |
return self._packet_queue(PUBLISH, packet, mid, qos) | |
def _send_pubrec(self, mid): | |
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")") | |
return self._send_command_with_mid(PUBREC, mid, False) | |
def _send_pubrel(self, mid, dup=False): | |
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")") | |
return self._send_command_with_mid(PUBREL|2, mid, dup) | |
def _send_command_with_mid(self, command, mid, dup): | |
# For PUBACK, PUBCOMP, PUBREC, and PUBREL | |
if dup: | |
command = command | 8 | |
remaining_length = 2 | |
packet = struct.pack('!BBH', command, remaining_length, mid) | |
return self._packet_queue(command, packet, mid, 1) | |
def _send_simple_command(self, command): | |
# For DISCONNECT, PINGREQ and PINGRESP | |
remaining_length = 0 | |
packet = struct.pack('!BB', command, remaining_length) | |
return self._packet_queue(command, packet, 0, 0) | |
def _send_connect(self, keepalive, clean_session): | |
print("_send_connect") | |
if self._protocol == MQTTv31: | |
protocol = PROTOCOL_NAMEv31 | |
proto_ver = 3 | |
else: | |
protocol = PROTOCOL_NAMEv311 | |
proto_ver = 4 | |
remaining_length = 2+len(protocol) + 1+1+2 + 2+len(self._client_id) | |
connect_flags = 0 | |
if clean_session: | |
connect_flags = connect_flags | 0x02 | |
if self._will: | |
if self._will_payload is not None: | |
remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload) | |
else: | |
remaining_length = remaining_length + 2+len(self._will_topic) + 2 | |
connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5) | |
if self._username: | |
remaining_length = remaining_length + 2+len(self._username) | |
connect_flags = connect_flags | 0x80 | |
if self._password: | |
connect_flags = connect_flags | 0x40 | |
remaining_length = remaining_length + 2+len(self._password) | |
command = CONNECT | |
packet = bytearray() | |
packet.extend(struct.pack("!B", command)) | |
self._pack_remaining_length(packet, remaining_length) | |
packet.extend(struct.pack("!H"+str(len(protocol))+"sBBH", len(protocol), protocol, proto_ver, connect_flags, keepalive)) | |
self._pack_str16(packet, self._client_id) | |
if self._will: | |
self._pack_str16(packet, self._will_topic) | |
if self._will_payload is None or len(self._will_payload) == 0: | |
packet.extend(struct.pack("!H", 0)) | |
else: | |
self._pack_str16(packet, self._will_payload) | |
if self._username: | |
self._pack_str16(packet, self._username) | |
if self._password: | |
self._pack_str16(packet, self._password) | |
self._keepalive = keepalive | |
return self._packet_queue(command, packet, 0, 0) | |
def _send_disconnect(self): | |
return self._send_simple_command(DISCONNECT) | |
def _send_subscribe(self, dup, topics): | |
print("_send_subscribe") | |
remaining_length = 2 | |
for t in topics: | |
remaining_length = remaining_length + 2+len(t[0])+1 | |
command = SUBSCRIBE | (dup<<3) | (1<<1) | |
packet = bytearray() | |
packet.extend(struct.pack("!B", command)) | |
self._pack_remaining_length(packet, remaining_length) | |
local_mid = self._mid_generate() | |
packet.extend(struct.pack("!H", local_mid)) | |
for t in topics: | |
self._pack_str16(packet, t[0]) | |
packet.extend(struct.pack("B", t[1])) | |
return (self._packet_queue(command, packet, local_mid, 1), local_mid) | |
def _send_unsubscribe(self, dup, topics): | |
remaining_length = 2 | |
for t in topics: | |
remaining_length = remaining_length + 2+len(t) | |
command = UNSUBSCRIBE | (dup<<3) | (1<<1) | |
packet = bytearray() | |
packet.extend(struct.pack("!B", command)) | |
self._pack_remaining_length(packet, remaining_length) | |
local_mid = self._mid_generate() | |
packet.extend(struct.pack("!H", local_mid)) | |
for t in topics: | |
self._pack_str16(packet, t) | |
return (self._packet_queue(command, packet, local_mid, 1), local_mid) | |
def _message_retry_check_actual(self, messages): | |
print("_message_retry_check_actual") #needed | |
now = time.time() | |
for m in messages: | |
if m.timestamp + self._message_retry < now: | |
if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec: | |
m.timestamp = now | |
m.dup = True | |
self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) | |
elif m.state == mqtt_ms_wait_for_pubrel: | |
m.timestamp = now | |
m.dup = True | |
self._send_pubrec(m.mid) | |
elif m.state == mqtt_ms_wait_for_pubcomp: | |
m.timestamp = now | |
m.dup = True | |
self._send_pubrel(m.mid, True) | |
def _message_retry_check(self): | |
print("_message_retry_check") #needed | |
self._message_retry_check_actual(self._out_messages) | |
self._message_retry_check_actual(self._in_messages) | |
def _messages_reconnect_reset_out(self): | |
print("_messages_reconnect_reset_out") #needed | |
self._inflight_messages = 0 | |
for m in self._out_messages: | |
m.timestamp = 0 | |
if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: | |
if m.qos == 0: | |
m.state = mqtt_ms_publish | |
elif m.qos == 1: | |
#self._inflight_messages = self._inflight_messages + 1 | |
if m.state == mqtt_ms_wait_for_puback: | |
m.dup = True | |
m.state = mqtt_ms_publish | |
elif m.qos == 2: | |
#self._inflight_messages = self._inflight_messages + 1 | |
if m.state == mqtt_ms_wait_for_pubcomp: | |
m.state = mqtt_ms_resend_pubrel | |
m.dup = True | |
else: | |
if m.state == mqtt_ms_wait_for_pubrec: | |
m.dup = True | |
m.state = mqtt_ms_publish | |
else: | |
m.state = mqtt_ms_queued | |
def _messages_reconnect_reset_in(self): | |
print("_messages_reconnect_reset_in") #needed | |
for m in self._in_messages: | |
m.timestamp = 0 | |
if m.qos != 2: | |
self._in_messages.pop(self._in_messages.index(m)) | |
else: | |
# Preserve current state | |
pass | |
def _messages_reconnect_reset(self): | |
print("_messages_reconnect_reset") #needed | |
self._messages_reconnect_reset_out() | |
self._messages_reconnect_reset_in() | |
def _packet_queue(self, command, packet, mid, qos): | |
print("_packet_queue") #needed | |
mpkt = dict( | |
command = command, | |
mid = mid, | |
qos = qos, | |
pos = 0, | |
to_process = len(packet), | |
packet = packet) | |
self._out_packet.append(mpkt) | |
if self._current_out_packet is None and len(self._out_packet) > 0: | |
self._current_out_packet = self._out_packet.pop(0) | |
print("self._out_packet =", self._out_packet) | |
# Write a single byte to sockpairW (connected to sockpairR) to break | |
# out of select() if in threaded mode. | |
#try: | |
# self._sockpairW.send(sockpair_data) | |
#except socket.error as err: | |
# if err.errno != EAGAIN: | |
# raise | |
print("self._in_callback =", self._in_callback) | |
if not self._in_callback: | |
return self.loop_write() | |
else: | |
print(MQTT_ERR_SUCCESS) | |
return MQTT_ERR_SUCCESS | |
def _packet_handle(self): | |
print("_packet_handle") | |
cmd = self._in_packet['command']&0xF0 | |
if cmd == PINGREQ: | |
return self._handle_pingreq() | |
elif cmd == PINGRESP: | |
return self._handle_pingresp() | |
elif cmd == PUBACK: | |
return self._handle_pubackcomp("PUBACK") | |
elif cmd == PUBCOMP: | |
return self._handle_pubackcomp("PUBCOMP") | |
elif cmd == PUBLISH: #appear to need PUBLISH | |
return self._handle_publish() | |
elif cmd == PUBREC: | |
return self._handle_pubrec() | |
elif cmd == PUBREL: | |
return self._handle_pubrel() | |
elif cmd == CONNACK: | |
return self._handle_connack() | |
elif cmd == SUBACK: | |
return self._handle_suback() | |
elif cmd == UNSUBACK: | |
return self._handle_unsuback() | |
else: | |
# If we don't recognise the command, return an error straight away. | |
self._easy_log(MQTT_LOG_ERR, "Error: Unrecognised command "+str(cmd)) | |
return MQTT_ERR_PROTOCOL | |
def _handle_connack(self): | |
print("_handle_connack") #needed | |
if self._strict_protocol: | |
if self._in_packet['remaining_length'] != 2: | |
return MQTT_ERR_PROTOCOL | |
if len(self._in_packet['packet']) != 2: | |
return MQTT_ERR_PROTOCOL | |
(flags, result) = struct.unpack("!BB", self._in_packet['packet']) | |
if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311: | |
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.") | |
# Downgrade to MQTT v3.1 | |
self._protocol = MQTTv31 | |
return self.reconnect() | |
if result == 0: | |
self._state = mqtt_cs_connected | |
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")") | |
if self.on_connect: | |
self._in_callback = True | |
flags_dict = dict() | |
flags_dict['session present'] = flags & 0x01 | |
self.on_connect(self, self._userdata, flags_dict, result) | |
self._in_callback = False | |
if result == 0: | |
rc = 0 | |
for m in self._out_messages: | |
m.timestamp = time.time() | |
if m.state == mqtt_ms_queued: | |
self.loop_write() # Process outgoing messages that have just been queued up | |
return MQTT_ERR_SUCCESS | |
if m.qos == 0: | |
self._in_callback = True # Don't call loop_write after _send_publish() | |
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) | |
self._in_callback = False | |
if rc != 0: | |
return rc | |
elif m.qos == 1: | |
if m.state == mqtt_ms_publish: | |
self._inflight_messages = self._inflight_messages + 1 | |
m.state = mqtt_ms_wait_for_puback | |
self._in_callback = True # Don't call loop_write after _send_publish() | |
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) | |
self._in_callback = False | |
if rc != 0: | |
return rc | |
elif m.qos == 2: | |
if m.state == mqtt_ms_publish: | |
self._inflight_messages = self._inflight_messages + 1 | |
m.state = mqtt_ms_wait_for_pubrec | |
self._in_callback = True # Don't call loop_write after _send_publish() | |
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) | |
self._in_callback = False | |
if rc != 0: | |
return rc | |
elif m.state == mqtt_ms_resend_pubrel: | |
self._inflight_messages = self._inflight_messages + 1 | |
m.state = mqtt_ms_wait_for_pubcomp | |
self._in_callback = True # Don't call loop_write after _send_pubrel() | |
rc = self._send_pubrel(m.mid, m.dup) | |
self._in_callback = False | |
if rc != 0: | |
return rc | |
self.loop_write() # Process outgoing messages that have just been queued up | |
return rc | |
elif result > 0 and result < 6: | |
return MQTT_ERR_CONN_REFUSED | |
else: | |
return MQTT_ERR_PROTOCOL | |
def _handle_suback(self): | |
print("_handle_suback") #needed after _packet_handle | |
self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK") | |
pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's' | |
(mid, packet) = struct.unpack(pack_format, self._in_packet['packet']) | |
pack_format = "!" + "B"*len(packet) | |
granted_qos = struct.unpack(pack_format, packet) | |
if self.on_subscribe: | |
self._in_callback = True | |
self.on_subscribe(self, self._userdata, mid, granted_qos) | |
self._in_callback = False | |
return MQTT_ERR_SUCCESS | |
def _handle_publish(self): | |
rc = 0 | |
print("_handle_publish") #needed after packet_handle | |
header = self._in_packet['command'] | |
message = MQTTMessage() | |
message.dup = (header & 0x08)>>3 | |
message.qos = (header & 0x06)>>1 | |
message.retain = (header & 0x01) | |
pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's' | |
(slen, packet) = struct.unpack(pack_format, self._in_packet['packet']) | |
pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's' | |
(message.topic, packet) = struct.unpack(pack_format, packet) | |
if len(message.topic) == 0: | |
return MQTT_ERR_PROTOCOL | |
if sys.version_info[0] >= 3: | |
message.topic = message.topic.decode('utf-8') | |
if message.qos > 0: | |
pack_format = "!H" + str(len(packet)-2) + 's' | |
(message.mid, packet) = struct.unpack(pack_format, packet) | |
message.payload = packet | |
self._easy_log( | |
MQTT_LOG_DEBUG, | |
"Received PUBLISH (d"+str(message.dup)+ | |
", q"+str(message.qos)+", r"+str(message.retain)+ | |
", m"+str(message.mid)+", '"+message.topic+ | |
"', ... ("+str(len(message.payload))+" bytes)") | |
message.timestamp = time.time() | |
if message.qos == 0: | |
self._handle_on_message(message) | |
return MQTT_ERR_SUCCESS | |
elif message.qos == 1: | |
rc = self._send_puback(message.mid) | |
self._handle_on_message(message) | |
return rc | |
elif message.qos == 2: | |
rc = self._send_pubrec(message.mid) | |
message.state = mqtt_ms_wait_for_pubrel | |
self._in_message_mutex.acquire() | |
self._in_messages.append(message) | |
self._in_message_mutex.release() | |
return rc | |
else: | |
return MQTT_ERR_PROTOCOL | |
def _handle_pubrel(self): | |
if self._strict_protocol: | |
if self._in_packet['remaining_length'] != 2: | |
return MQTT_ERR_PROTOCOL | |
if len(self._in_packet['packet']) != 2: | |
return MQTT_ERR_PROTOCOL | |
mid = struct.unpack("!H", self._in_packet['packet']) | |
mid = mid[0] | |
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")") | |
self._in_message_mutex.acquire() | |
for i in range(len(self._in_messages)): | |
if self._in_messages[i].mid == mid: | |
# Only pass the message on if we have removed it from the queue - this | |
# prevents multiple callbacks for the same message. | |
self._handle_on_message(self._in_messages[i]) | |
self._in_messages.pop(i) | |
self._inflight_messages = self._inflight_messages - 1 | |
if self._max_inflight_messages > 0: | |
rc = self._update_inflight() | |
if rc != MQTT_ERR_SUCCESS: | |
return rc | |
self._in_message_mutex.release() | |
return self._send_pubcomp(mid) | |
return MQTT_ERR_SUCCESS | |
def _update_inflight(self): | |
# Dont lock message_mutex here | |
for m in self._out_messages: | |
if self._inflight_messages < self._max_inflight_messages: | |
if m.qos > 0 and m.state == mqtt_ms_queued: | |
self._inflight_messages = self._inflight_messages + 1 | |
if m.qos == 1: | |
m.state = mqtt_ms_wait_for_puback | |
elif m.qos == 2: | |
m.state = mqtt_ms_wait_for_pubrec | |
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) | |
if rc != 0: | |
return rc | |
else: | |
return MQTT_ERR_SUCCESS | |
return MQTT_ERR_SUCCESS | |
def _handle_pubrec(self): | |
if self._strict_protocol: | |
if self._in_packet['remaining_length'] != 2: | |
return MQTT_ERR_PROTOCOL | |
mid = struct.unpack("!H", self._in_packet['packet']) | |
mid = mid[0] | |
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")") | |
for m in self._out_messages: | |
if m.mid == mid: | |
m.state = mqtt_ms_wait_for_pubcomp | |
m.timestamp = time.time() | |
return self._send_pubrel(mid, False) | |
return MQTT_ERR_SUCCESS | |
def _handle_unsuback(self): | |
if self._strict_protocol: | |
if self._in_packet['remaining_length'] != 2: | |
return MQTT_ERR_PROTOCOL | |
mid = struct.unpack("!H", self._in_packet['packet']) | |
mid = mid[0] | |
self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")") | |
if self.on_unsubscribe: | |
self._in_callback = True | |
self.on_unsubscribe(self, self._userdata, mid) | |
self._in_callback = False | |
return MQTT_ERR_SUCCESS | |
def _handle_pubackcomp(self, cmd): | |
if self._strict_protocol: | |
if self._in_packet['remaining_length'] != 2: | |
return MQTT_ERR_PROTOCOL | |
mid = struct.unpack("!H", self._in_packet['packet']) | |
mid = mid[0] | |
self._easy_log(MQTT_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")") | |
for i in range(len(self._out_messages)): | |
try: | |
if self._out_messages[i].mid == mid: | |
# Only inform the client the message has been sent once. | |
if self.on_publish: | |
self._in_callback = True | |
self.on_publish(self, self._userdata, mid) | |
self._in_callback = False | |
self._out_messages.pop(i) | |
self._inflight_messages = self._inflight_messages - 1 | |
if self._max_inflight_messages > 0: | |
rc = self._update_inflight() | |
if rc != MQTT_ERR_SUCCESS: | |
return rc | |
return MQTT_ERR_SUCCESS | |
except IndexError: | |
# Have removed item so i>count. | |
# Not really an error. | |
pass | |
return MQTT_ERR_SUCCESS | |
def _handle_on_message(self, message): | |
matched = False | |
for t in self.on_message_filtered: | |
if topic_matches_sub(t[0], message.topic): | |
self._in_callback = True | |
t[1](self, self._userdata, message) | |
self._in_callback = False | |
matched = True | |
if matched == False and self.on_message: | |
self._in_callback = True | |
self.on_message(self, self._userdata, message) | |
self._in_callback = False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment