Skip to content

Instantly share code, notes, and snippets.

@slzatz

slzatz/umqtt.py

Created Apr 16, 2016
Embed
What would you like to do?
This is an mqtt subscribe client for micropython
# Copyright (c) 2012-2014 Roger Light <roger@atchoo.org>
#
# This is a stripped down version of paho.mqtt intended for micropython
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
# Roger Light - initial API and implementation
"""
This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging
protocol that is easy to implement and suitable for low powered devices.
"""
import errno
import sys
import select
import socket
import ustruct as struct
import utime as time
EAGAIN = errno.EAGAIN
MQTTv31 = 3
MQTTv311 = 4
PROTOCOL_NAMEv31 = b"MQIsdp"
PROTOCOL_NAMEv311 = b"MQTT"
PROTOCOL_VERSION = 3
# Message types
CONNECT = 0x10
CONNACK = 0x20
PUBLISH = 0x30
PUBACK = 0x40
PUBREC = 0x50
PUBREL = 0x60
PUBCOMP = 0x70
SUBSCRIBE = 0x80
SUBACK = 0x90
UNSUBSCRIBE = 0xA0
UNSUBACK = 0xB0
PINGREQ = 0xC0
PINGRESP = 0xD0
DISCONNECT = 0xE0
# Log levels
MQTT_LOG_INFO = 0x01
MQTT_LOG_NOTICE = 0x02
MQTT_LOG_WARNING = 0x04
MQTT_LOG_ERR = 0x08
MQTT_LOG_DEBUG = 0x10
# CONNACK codes
CONNACK_ACCEPTED = 0
CONNACK_REFUSED_PROTOCOL_VERSION = 1
CONNACK_REFUSED_IDENTIFIER_REJECTED = 2
CONNACK_REFUSED_SERVER_UNAVAILABLE = 3
CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
CONNACK_REFUSED_NOT_AUTHORIZED = 5
# Connection state
mqtt_cs_new = 0
mqtt_cs_connected = 1
mqtt_cs_disconnecting = 2
mqtt_cs_connect_async = 3
# Message state
mqtt_ms_invalid = 0
mqtt_ms_publish= 1
mqtt_ms_wait_for_puback = 2
mqtt_ms_wait_for_pubrec = 3
mqtt_ms_resend_pubrel = 4
mqtt_ms_wait_for_pubrel = 5
mqtt_ms_resend_pubcomp = 6
mqtt_ms_wait_for_pubcomp = 7
mqtt_ms_send_pubrec = 8
mqtt_ms_queued = 9
# Error values
MQTT_ERR_AGAIN = -1
MQTT_ERR_SUCCESS = 0
MQTT_ERR_NOMEM = 1
MQTT_ERR_PROTOCOL = 2
MQTT_ERR_INVAL = 3
MQTT_ERR_NO_CONN = 4
MQTT_ERR_CONN_REFUSED = 5
MQTT_ERR_NOT_FOUND = 6
MQTT_ERR_CONN_LOST = 7
MQTT_ERR_TLS = 8
MQTT_ERR_PAYLOAD_SIZE = 9
MQTT_ERR_NOT_SUPPORTED = 10
MQTT_ERR_AUTH = 11
MQTT_ERR_ACL_DENIED = 12
MQTT_ERR_UNKNOWN = 13
MQTT_ERR_ERRNO = 14
sockpair_data = b"0"
def error_string(mqtt_errno):
"""Return the error string associated with an mqtt error number."""
if mqtt_errno == MQTT_ERR_SUCCESS:
return "No error."
elif mqtt_errno == MQTT_ERR_NOMEM:
return "Out of memory."
elif mqtt_errno == MQTT_ERR_PROTOCOL:
return "A network protocol error occurred when communicating with the broker."
elif mqtt_errno == MQTT_ERR_INVAL:
return "Invalid function arguments provided."
elif mqtt_errno == MQTT_ERR_NO_CONN:
return "The client is not currently connected."
elif mqtt_errno == MQTT_ERR_CONN_REFUSED:
return "The connection was refused."
elif mqtt_errno == MQTT_ERR_NOT_FOUND:
return "Message not found (internal error)."
elif mqtt_errno == MQTT_ERR_CONN_LOST:
return "The connection was lost."
elif mqtt_errno == MQTT_ERR_TLS:
return "A TLS error occurred."
elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE:
return "Payload too large."
elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED:
return "This feature is not supported."
elif mqtt_errno == MQTT_ERR_AUTH:
return "Authorisation failed."
elif mqtt_errno == MQTT_ERR_ACL_DENIED:
return "Access denied by ACL."
elif mqtt_errno == MQTT_ERR_UNKNOWN:
return "Unknown error."
elif mqtt_errno == MQTT_ERR_ERRNO:
return "Error defined by errno."
else:
return "Unknown error."
def connack_string(connack_code):
"""Return the string associated with a CONNACK result."""
if connack_code == 0:
return "Connection Accepted."
elif connack_code == 1:
return "Connection Refused: unacceptable protocol version."
elif connack_code == 2:
return "Connection Refused: identifier rejected."
elif connack_code == 3:
return "Connection Refused: broker unavailable."
elif connack_code == 4:
return "Connection Refused: bad user name or password."
elif connack_code == 5:
return "Connection Refused: not authorised."
else:
return "Connection Refused: unknown reason."
def topic_matches_sub(sub, topic):
"""Check whether a topic matches a subscription.
For example:
foo/bar would match the subscription foo/# or +/bar
non/matching would not match the subscription non/+/+
"""
result = True
multilevel_wildcard = False
slen = len(sub)
tlen = len(topic)
if slen > 0 and tlen > 0:
if (sub[0] == '$' and topic[0] != '$') or (topic[0] == '$' and sub[0] != '$'):
return False
spos = 0
tpos = 0
while spos < slen and tpos < tlen:
if sub[spos] == topic[tpos]:
if tpos == tlen-1:
# Check for e.g. foo matching foo/#
if spos == slen-3 and sub[spos+1] == '/' and sub[spos+2] == '#':
result = True
multilevel_wildcard = True
break
spos += 1
tpos += 1
if tpos == tlen and spos == slen-1 and sub[spos] == '+':
spos += 1
result = True
break
else:
if sub[spos] == '+':
spos += 1
while tpos < tlen and topic[tpos] != '/':
tpos += 1
if tpos == tlen and spos == slen:
result = True
break
elif sub[spos] == '#':
multilevel_wildcard = True
if spos+1 != slen:
result = False
break
else:
result = True
break
else:
result = False
break
if not multilevel_wildcard and (tpos < tlen or spos < slen):
result = False
return result
class MQTTMessage:
""" This is a class that describes an incoming message. It is passed to the
on_message callback as the message parameter.
"""
def __init__(self):
self.timestamp = 0
self.state = mqtt_ms_invalid
self.dup = False
self.mid = 0
self.topic = ""
self.payload = None
self.qos = 0
self.retain = False
class Client(object):
"""MQTT version 3.1/3.1.1 client class.
This is the main class for use communicating with an MQTT broker.
"""
def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv31):
if not clean_session and (client_id == "" or client_id is None):
raise ValueError('A client id must be provided if clean session is False.')
self._protocol = protocol
self._userdata = userdata
self._sock = None
self._keepalive = 60
self._message_retry = 20
self._last_retry_check = 0
self._clean_session = clean_session
if client_id == "" or client_id is None:
self._client_id = "umqtt"
else:
self._client_id = client_id
self._username = ""
self._password = ""
self._in_packet = {
"command": 0,
"have_remaining": 0,
"remaining_count": [],
"remaining_mult": 1,
"remaining_length": 0,
"packet": b"",
"to_process": 0,
"pos": 0}
self._out_packet = []
self._current_out_packet = None
self._last_msg_in = time.time()
self._last_msg_out = time.time()
self._ping_t = 0
self._last_mid = 0
self._state = mqtt_cs_new
self._out_messages = []
self._in_messages = []
self._max_inflight_messages = 20
self._inflight_messages = 0
self._will = False
self._will_topic = ""
self._will_payload = None
self._will_qos = 0
self._will_retain = False
self.on_disconnect = None
self.on_connect = None
self.on_publish = None
self.on_message = None
self.on_message_filtered = []
self.on_subscribe = None
self.on_unsubscribe = None
self.on_log = None #None
self._host = ""
self._port = 1883
self._bind_address = ""
self._in_callback = False
self._strict_protocol = False
def __del__(self):
pass
def reinitialise(self, client_id="", clean_session=True, userdata=None):
if self._sock:
self._sock.close()
self._sock = None
self.__init__(client_id, clean_session, userdata)
def connect(self, host, port=1883, keepalive=60, bind_address=""):
"""Connect to a remote broker.
"""
print("connect")
self.connect_async(host, port, keepalive, bind_address)
return self.reconnect()
def connect_async(self, host, port=1883, keepalive=60, bind_address=""):
"""Connect to a remote broker asynchronously. This is a non-blocking
connect call that can be used with loop_start() to provide very quick
start.
"""
print("connect_async")
if host is None or len(host) == 0:
raise ValueError('Invalid host.')
if port <= 0:
raise ValueError('Invalid port number.')
if keepalive < 0:
raise ValueError('Keepalive must be >=0.')
self._host = host
self._port = port
self._keepalive = keepalive
self._bind_address = bind_address
self._state = mqtt_cs_connect_async
def reconnect(self):
"""Reconnect the client after a disconnect. Can only be called after
connect()/connect_async()."""
print("reconnect")
if len(self._host) == 0:
raise ValueError('Invalid host.')
if self._port <= 0:
raise ValueError('Invalid port number.')
self._in_packet = {
"command": 0,
"have_remaining": 0,
"remaining_count": [],
"remaining_mult": 1,
"remaining_length": 0,
"packet": b"",
"to_process": 0,
"pos": 0}
self._out_packet = []
self._current_out_packet = None
self._last_msg_in = time.time()
self._last_msg_out = time.time()
self._ping_t = 0
self._state = mqtt_cs_new
if self._sock:
self._sock.close()
self._sock = None
print("self._sock == None")
# Put messages in progress in a valid state.
self._messages_reconnect_reset()
sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0))
self._sock = sock
self._sock.setblocking(0)
self.ep = select.epoll()
self.fileno = self._sock.fileno()
self.ep.register(self.fileno)
print("self._sock =", self._sock)
return self._send_connect(self._keepalive, self._clean_session)
def loop(self, timeout=1.0, max_packets=1):
"""Process network events.
"""
print("loop")
if timeout < 0.0:
raise ValueError('Invalid timeout.')
print("self._current_out_packet =", self._current_out_packet)
print("self._out_packet =", self._out_packet)
if self._current_out_packet is None and len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.pop(0)
print("self._current_out_packet =", self._current_out_packet)
events = self.ep.poll(1)
print("events = ", events)
for fileno, ev in events:
if ev & select.EPOLLIN:
rc = self.loop_read(max_packets)
if rc or (self._sock is None):
return rc
if ev & select.EPOLLOUT and self._current_out_packet:
rc = self.loop_write(max_packets)
if rc or (self._sock is None):
return rc
return self.loop_misc()
# def publish(self, topic, payload=None, qos=0, retain=False):
# """Publish a message on a topic.
#
# This causes a message to be sent to the broker and subsequently from
# the broker to any clients subscribing to matching topics.
#
# topic: The topic that the message should be published on.
# payload: The actual message to send. If not given, or set to None a
# zero length message will be used. Passing an int or float will result
# in the payload being converted to a string representing that number. If
# you wish to send a true int/float, use struct.pack() to create the
# payload you require.
# qos: The quality of service level to use.
# retain: If set to true, the message will be set as the "last known
# good"/retained message for the topic.
#
# Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to
# indicate success or MQTT_ERR_NO_CONN if the client is not currently
# connected. mid is the message ID for the publish request. The mid
# value can be used to track the publish request by checking against the
# mid argument in the on_publish() callback if it is defined.
#
# A ValueError will be raised if topic is None, has zero length or is
# invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if
# the length of the payload is greater than 268435455 bytes."""
# if topic is None or len(topic) == 0:
# raise ValueError('Invalid topic.')
# if qos<0 or qos>2:
# raise ValueError('Invalid QoS level.')
# if isinstance(payload, str) or isinstance(payload, bytearray):
# local_payload = payload
# elif sys.version_info[0] < 3 and isinstance(payload, unicode):
# local_payload = payload
# elif isinstance(payload, int) or isinstance(payload, float):
# local_payload = str(payload)
# elif payload is None:
# local_payload = None
# else:
# raise TypeError('payload must be a string, bytearray, int, float or None.')
#
# if local_payload is not None and len(local_payload) > 268435455:
# raise ValueError('Payload too large.')
#
# if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS:
# raise ValueError('Publish topic cannot contain wildcards.')
#
# local_mid = self._mid_generate()
#
# if qos == 0:
# rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False)
# return (rc, local_mid)
# else:
# message = MQTTMessage()
# message.timestamp = time.time()
#
# message.mid = local_mid
# message.topic = topic
# if local_payload is None or len(local_payload) == 0:
# message.payload = None
# else:
# message.payload = local_payload
#
# message.qos = qos
# message.retain = retain
# message.dup = False
#
# self._out_message_mutex.acquire()
# self._out_messages.append(message)
# if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
# self._inflight_messages = self._inflight_messages+1
# if qos == 1:
# message.state = mqtt_ms_wait_for_puback
# elif qos == 2:
# message.state = mqtt_ms_wait_for_pubrec
# self._out_message_mutex.release()
#
# rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
#
# # remove from inflight messages so it will be send after a connection is made
# if rc is MQTT_ERR_NO_CONN:
# with self._out_message_mutex:
# self._inflight_messages -= 1
# message.state = mqtt_ms_publish
#
# return (rc, local_mid)
# else:
# message.state = mqtt_ms_queued;
# self._out_message_mutex.release()
# return (MQTT_ERR_SUCCESS, local_mid)
#
# def username_pw_set(self, username, password=None):
# """Set a username and optionally a password for broker authentication.
#
# Must be called before connect() to have any effect.
# Requires a broker that supports MQTT v3.1.
#
# username: The username to authenticate with. Need have no relationship to the client id.
# password: The password to authenticate with. Optional, set to None if not required.
# """
# self._username = username.encode('utf-8')
# self._password = password
def disconnect(self):
"""Disconnect a connected client from the broker."""
self._state = mqtt_cs_disconnecting
if self._sock is None:
return MQTT_ERR_NO_CONN
return self._send_disconnect()
def subscribe(self, topic, qos=0):
"""Subscribe the client to one or more topics."""
print("subscribe")
topic_qos_list = None
if isinstance(topic, str):
if qos<0 or qos>2:
raise ValueError('Invalid QoS level.')
if topic is None or len(topic) == 0:
raise ValueError('Invalid topic.')
topic_qos_list = [(topic.encode('utf-8'), qos)]
elif isinstance(topic, tuple):
if topic[1]<0 or topic[1]>2:
raise ValueError('Invalid QoS level.')
if topic[0] is None or len(topic[0]) == 0 or not isinstance(topic[0], str):
raise ValueError('Invalid topic.')
topic_qos_list = [(topic[0].encode('utf-8'), topic[1])]
elif isinstance(topic, list):
topic_qos_list = []
for t in topic:
if t[1]<0 or t[1]>2:
raise ValueError('Invalid QoS level.')
if t[0] is None or len(t[0]) == 0 or not isinstance(t[0], str):
raise ValueError('Invalid topic.')
topic_qos_list.append((t[0].encode('utf-8'), t[1]))
if topic_qos_list is None:
raise ValueError("No topic specified, or incorrect topic type.")
if self._sock is None:
return (MQTT_ERR_NO_CONN, None)
return self._send_subscribe(False, topic_qos_list)
def unsubscribe(self, topic):
"""Unsubscribe the client from one or more topics."""
topic_list = None
if topic is None:
raise ValueError('Invalid topic.')
if isinstance(topic, str):
if len(topic) == 0:
raise ValueError('Invalid topic.')
topic_list = [topic.encode('utf-8')]
elif isinstance(topic, list):
topic_list = []
for t in topic:
if len(t) == 0 or not isinstance(t, str):
raise ValueError('Invalid topic.')
topic_list.append(t.encode('utf-8'))
if topic_list is None:
raise ValueError("No topic specified, or incorrect topic type.")
if self._sock is None and self._ssl is None:
return (MQTT_ERR_NO_CONN, None)
return self._send_unsubscribe(False, topic_list)
def loop_read(self, max_packets=1):
"""Process read network events. """
print("loop_read")
if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
max_packets = len(self._out_messages) + len(self._in_messages)
if max_packets < 1:
max_packets = 1
for i in range(0, max_packets):
rc = self._packet_read()
if rc > 0:
return self._loop_rc_handle(rc)
elif rc == MQTT_ERR_AGAIN:
return MQTT_ERR_SUCCESS
return MQTT_ERR_SUCCESS
def loop_write(self, max_packets=1):
"""Process write network events."""
print("loop_write")
if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
max_packets = len(self._out_packet) + 1
if max_packets < 1:
max_packets = 1
for i in range(0, max_packets):
rc = self._packet_write()
if rc > 0:
return self._loop_rc_handle(rc)
elif rc == MQTT_ERR_AGAIN:
return MQTT_ERR_SUCCESS
return MQTT_ERR_SUCCESS
def loop_misc(self):
"""Process miscellaneous network events."""
print("loop_misc")
if self._sock is None: # and self._ssl is None:
return MQTT_ERR_NO_CONN
now = time.time()
self._check_keepalive()
if self._last_retry_check+1 < now:
# Only check once a second at most
self._message_retry_check()
self._last_retry_check = now
if self._ping_t > 0 and now - self._ping_t >= self._keepalive:
# client->ping_t != 0 means we are waiting for a pingresp.
# This hasn't happened in the keepalive time so we should disconnect.
if self._sock:
self._sock.close()
self._sock = None
if self._state == mqtt_cs_disconnecting:
rc = MQTT_ERR_SUCCESS
else:
rc = 1
if self.on_disconnect:
self._in_callback = True
self.on_disconnect(self, self._userdata, rc)
self._in_callback = False
return MQTT_ERR_CONN_LOST
return MQTT_ERR_SUCCESS
def max_inflight_messages_set(self, inflight):
"""Set the maximum number of messages with QoS>0 that can be part way
through their network flow at once. Defaults to 20."""
if inflight < 0:
raise ValueError('Invalid inflight.')
self._max_inflight_messages = inflight
def message_retry_set(self, retry):
"""Set the timeout in seconds before a message with QoS>0 is retried.
20 seconds by default."""
if retry < 0:
raise ValueError('Invalid retry.')
self._message_retry = retry
def user_data_set(self, userdata):
"""Set the user data variable passed to callbacks. May be any data type."""
self._userdata = userdata
def will_set(self, topic, payload=None, qos=0, retain=False):
"""Set a Will to be sent by the broker in case the client disconnects unexpectedly.
This must be called before connect() to have any effect.
topic: The topic that the will message should be published on.
payload: The message to send as a will. If not given, or set to None a
zero length message will be used as the will. Passing an int or float
will result in the payload being converted to a string representing
that number. If you wish to send a true int/float, use struct.pack() to
create the payload you require.
qos: The quality of service level to use for the will.
retain: If set to true, the will message will be set as the "last known
good"/retained message for the topic.
Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
zero string length.
"""
if topic is None or len(topic) == 0:
raise ValueError('Invalid topic.')
if qos<0 or qos>2:
raise ValueError('Invalid QoS level.')
if isinstance(payload, str):
self._will_payload = payload.encode('utf-8')
elif isinstance(payload, bytearray):
self._will_payload = payload
elif isinstance(payload, int) or isinstance(payload, float):
self._will_payload = str(payload)
elif payload is None:
self._will_payload = None
else:
raise TypeError('payload must be a string, bytearray, int, float or None.')
self._will = True
self._will_topic = topic.encode('utf-8')
self._will_qos = qos
self._will_retain = retain
def will_clear(self):
""" Removes a will that was previously configured with will_set().
Must be called before connect() to have any effect."""
self._will = False
self._will_topic = ""
self._will_payload = None
self._will_qos = 0
self._will_retain = False
def socket(self):
"""Return the socket or ssl object for this client."""
return self._sock
def message_callback_add(self, sub, callback):
"""Register a message callback for a specific topic.
Messages that match 'sub' will be passed to 'callback'. Any
non-matching messages will be passed to the default on_message
callback.
Call multiple times with different 'sub' to define multiple topic
specific callbacks.
Topic specific callbacks may be removed with
message_callback_remove()."""
if callback is None or sub is None:
raise ValueError("sub and callback must both be defined.")
for i in range(0, len(self.on_message_filtered)):
if self.on_message_filtered[i][0] == sub:
self.on_message_filtered[i] = (sub, callback)
return
self.on_message_filtered.append((sub, callback))
def message_callback_remove(self, sub):
"""Remove a message callback previously registered with
message_callback_add()."""
if sub is None:
raise ValueError("sub must defined.")
for i in range(0, len(self.on_message_filtered)):
if self.on_message_filtered[i][0] == sub:
self.on_message_filtered.pop(i)
return
# ============================================================
# Private functions
# ============================================================
def _loop_rc_handle(self, rc):
if rc:
if self._ssl:
self._ssl.close()
self._ssl = None
elif self._sock:
self._sock.close()
self._sock = None
if self._state == mqtt_cs_disconnecting:
rc = MQTT_ERR_SUCCESS
if self.on_disconnect:
self._in_callback = True
self.on_disconnect(self, self._userdata, rc)
self._in_callback = False
return rc
def _packet_read(self):
# This gets called if pselect() indicates that there is network data
# available - ie. at least one byte. What we do depends on what data we
# already have.
# If we've not got a command, attempt to read one and save it. This should
# always work because it's only a single byte.
# Then try to read the remaining length. This may fail because it is may
# be more than one byte - will need to save data pending next read if it
# does fail.
# Then try to read the remaining payload, where 'payload' here means the
# combined variable header and actual payload. This is the most likely to
# fail due to longer length, so save current data and current position.
# After all data is read, send to _mqtt_handle_packet() to deal with.
# Finally, free the memory and reset everything to starting conditions.
print("_packet_read")
if self._in_packet['command'] == 0:
try:
command = self._sock.recv(1)
except socket.error as err:
if err.errno == EAGAIN:
return MQTT_ERR_AGAIN
print(err)
return 1
else:
if len(command) == 0:
return 1
command = struct.unpack("!B", command)
self._in_packet['command'] = command[0]
if self._in_packet['have_remaining'] == 0:
# Read remaining
# Algorithm for decoding taken from pseudo code at
# http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm
while True:
try:
byte = self._sock.recv(1)
except socket.error as err:
if err.errno == EAGAIN:
return MQTT_ERR_AGAIN
print(err)
return 1
else:
byte = struct.unpack("!B", byte)
byte = byte[0]
self._in_packet['remaining_count'].append(byte)
# Max 4 bytes length for remaining length as defined by protocol.
# Anything more likely means a broken/malicious client.
if len(self._in_packet['remaining_count']) > 4:
return MQTT_ERR_PROTOCOL
self._in_packet['remaining_length'] = self._in_packet['remaining_length'] + (byte & 127)*self._in_packet['remaining_mult']
self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128
if (byte & 128) == 0:
break
self._in_packet['have_remaining'] = 1
self._in_packet['to_process'] = self._in_packet['remaining_length']
while self._in_packet['to_process'] > 0:
try:
data = self._sock.recv(self._in_packet['to_process'])
except socket.error as err:
if err.errno == EAGAIN:
return MQTT_ERR_AGAIN
print(err)
return 1
else:
self._in_packet['to_process'] = self._in_packet['to_process'] - len(data)
self._in_packet['packet'] = self._in_packet['packet'] + data
# All data for this packet is read.
self._in_packet['pos'] = 0
rc = self._packet_handle()
# Free data and reset values
self._in_packet = dict(
command=0,
have_remaining=0,
remaining_count=[],
remaining_mult=1,
remaining_length=0,
packet=b"",
to_process=0,
pos=0)
self._last_msg_in = time.time()
return rc
def _packet_write(self):
print("_packet_write")
while self._current_out_packet:
packet = self._current_out_packet
try:
write_length = self._sock.send(packet['packet'][packet['pos']:])
except AttributeError:
return MQTT_ERR_SUCCESS
except socket.error as err:
if err.errno == EAGAIN:
return MQTT_ERR_AGAIN
print(err)
return 1
if write_length > 0:
packet['to_process'] = packet['to_process'] - write_length
packet['pos'] = packet['pos'] + write_length
if packet['to_process'] == 0:
if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0:
if self.on_publish:
self._in_callback = True
self.on_publish(self, self._userdata, packet['mid'])
self._in_callback = False
if (packet['command'] & 0xF0) == DISCONNECT:
self._last_msg_out = time.time()
if self.on_disconnect:
self._in_callback = True
self.on_disconnect(self, self._userdata, 0)
self._in_callback = False
if self._sock:
self._sock.close()
self._sock = None
return MQTT_ERR_SUCCESS
if len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.pop(0)
else:
self._current_out_packet = None
else:
break
self._last_msg_out = time.time()
print("self._current_out_packet =", self._current_out_packet)
return MQTT_ERR_SUCCESS
def _easy_log(self, level, buf):
if self.on_log:
self.on_log(self, self._userdata, level, buf)
def _check_keepalive(self):
now = time.time()
last_msg_out = self._last_msg_out
last_msg_in = self._last_msg_in
if (self._sock is not None or self._ssl is not None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
if self._state == mqtt_cs_connected and self._ping_t == 0:
self._send_pingreq()
self._last_msg_out = now
self._last_msg_in = now
else:
if self._sock:
self._sock.close()
self._sock = None
if self._state == mqtt_cs_disconnecting:
rc = MQTT_ERR_SUCCESS
else:
rc = 1
if self.on_disconnect:
self._in_callback = True
self.on_disconnect(self, self._userdata, rc)
self._in_callback = False
def _mid_generate(self):
self._last_mid = self._last_mid + 1
if self._last_mid == 65536:
self._last_mid = 1
return self._last_mid
def _topic_wildcard_len_check(self, topic):
# Search for + or # in a topic. Return MQTT_ERR_INVAL if found.
# Also returns MQTT_ERR_INVAL if the topic string is too long.
# Returns MQTT_ERR_SUCCESS if everything is fine.
if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535:
return MQTT_ERR_INVAL
else:
return MQTT_ERR_SUCCESS
def _send_pingreq(self):
self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ")
rc = self._send_simple_command(PINGREQ)
if rc == MQTT_ERR_SUCCESS:
self._ping_t = time.time()
return rc
def _send_pingresp(self):
self._easy_log(MQTT_LOG_DEBUG, "Sending PINGRESP")
return self._send_simple_command(PINGRESP)
def _send_puback(self, mid):
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")")
return self._send_command_with_mid(PUBACK, mid, False)
def _send_pubcomp(self, mid):
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")")
return self._send_command_with_mid(PUBCOMP, mid, False)
def _pack_remaining_length(self, packet, remaining_length):
remaining_bytes = []
while True:
byte = remaining_length % 128
remaining_length = remaining_length // 128
# If there are more digits to encode, set the top bit of this digit
if remaining_length > 0:
byte = byte | 0x80
remaining_bytes.append(byte)
packet.extend(struct.pack("!B", byte))
if remaining_length == 0:
# FIXME - this doesn't deal with incorrectly large payloads
return packet
def _pack_str16(self, packet, data):
if isinstance(data, bytearray) or isinstance(data, bytes):
packet.extend(struct.pack("!H", len(data)))
packet.extend(data)
elif isinstance(data, str):
udata = data.encode('utf-8')
pack_format = "!H" + str(len(udata)) + "s"
packet.extend(struct.pack(pack_format, len(udata), udata))
else:
raise TypeError
def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False):
if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN
utopic = topic.encode('utf-8')
command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain
packet = bytearray()
packet.extend(struct.pack("!B", command))
if payload is None:
remaining_length = 2+len(utopic)
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"' (NULL payload)")
else:
if isinstance(payload, str):
upayload = payload.encode('utf-8')
payloadlen = len(upayload)
elif isinstance(payload, bytearray):
payloadlen = len(payload)
elif isinstance(payload, unicode):
upayload = payload.encode('utf-8')
payloadlen = len(upayload)
remaining_length = 2+len(utopic) + payloadlen
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"', ... ("+str(payloadlen)+" bytes)")
if qos > 0:
# For message id
remaining_length = remaining_length + 2
self._pack_remaining_length(packet, remaining_length)
self._pack_str16(packet, topic)
if qos > 0:
# For message id
packet.extend(struct.pack("!H", mid))
if payload is not None:
if isinstance(payload, str):
pack_format = str(payloadlen) + "s"
packet.extend(struct.pack(pack_format, upayload))
elif isinstance(payload, bytearray):
packet.extend(payload)
elif isinstance(payload, unicode):
pack_format = str(payloadlen) + "s"
packet.extend(struct.pack(pack_format, upayload))
else:
raise TypeError('payload must be a string, unicode or a bytearray.')
return self._packet_queue(PUBLISH, packet, mid, qos)
def _send_pubrec(self, mid):
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")")
return self._send_command_with_mid(PUBREC, mid, False)
def _send_pubrel(self, mid, dup=False):
self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")")
return self._send_command_with_mid(PUBREL|2, mid, dup)
def _send_command_with_mid(self, command, mid, dup):
# For PUBACK, PUBCOMP, PUBREC, and PUBREL
if dup:
command = command | 8
remaining_length = 2
packet = struct.pack('!BBH', command, remaining_length, mid)
return self._packet_queue(command, packet, mid, 1)
def _send_simple_command(self, command):
# For DISCONNECT, PINGREQ and PINGRESP
remaining_length = 0
packet = struct.pack('!BB', command, remaining_length)
return self._packet_queue(command, packet, 0, 0)
def _send_connect(self, keepalive, clean_session):
print("_send_connect")
if self._protocol == MQTTv31:
protocol = PROTOCOL_NAMEv31
proto_ver = 3
else:
protocol = PROTOCOL_NAMEv311
proto_ver = 4
remaining_length = 2+len(protocol) + 1+1+2 + 2+len(self._client_id)
connect_flags = 0
if clean_session:
connect_flags = connect_flags | 0x02
if self._will:
if self._will_payload is not None:
remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload)
else:
remaining_length = remaining_length + 2+len(self._will_topic) + 2
connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5)
if self._username:
remaining_length = remaining_length + 2+len(self._username)
connect_flags = connect_flags | 0x80
if self._password:
connect_flags = connect_flags | 0x40
remaining_length = remaining_length + 2+len(self._password)
command = CONNECT
packet = bytearray()
packet.extend(struct.pack("!B", command))
self._pack_remaining_length(packet, remaining_length)
packet.extend(struct.pack("!H"+str(len(protocol))+"sBBH", len(protocol), protocol, proto_ver, connect_flags, keepalive))
self._pack_str16(packet, self._client_id)
if self._will:
self._pack_str16(packet, self._will_topic)
if self._will_payload is None or len(self._will_payload) == 0:
packet.extend(struct.pack("!H", 0))
else:
self._pack_str16(packet, self._will_payload)
if self._username:
self._pack_str16(packet, self._username)
if self._password:
self._pack_str16(packet, self._password)
self._keepalive = keepalive
return self._packet_queue(command, packet, 0, 0)
def _send_disconnect(self):
return self._send_simple_command(DISCONNECT)
def _send_subscribe(self, dup, topics):
print("_send_subscribe")
remaining_length = 2
for t in topics:
remaining_length = remaining_length + 2+len(t[0])+1
command = SUBSCRIBE | (dup<<3) | (1<<1)
packet = bytearray()
packet.extend(struct.pack("!B", command))
self._pack_remaining_length(packet, remaining_length)
local_mid = self._mid_generate()
packet.extend(struct.pack("!H", local_mid))
for t in topics:
self._pack_str16(packet, t[0])
packet.extend(struct.pack("B", t[1]))
return (self._packet_queue(command, packet, local_mid, 1), local_mid)
def _send_unsubscribe(self, dup, topics):
remaining_length = 2
for t in topics:
remaining_length = remaining_length + 2+len(t)
command = UNSUBSCRIBE | (dup<<3) | (1<<1)
packet = bytearray()
packet.extend(struct.pack("!B", command))
self._pack_remaining_length(packet, remaining_length)
local_mid = self._mid_generate()
packet.extend(struct.pack("!H", local_mid))
for t in topics:
self._pack_str16(packet, t)
return (self._packet_queue(command, packet, local_mid, 1), local_mid)
def _message_retry_check_actual(self, messages):
print("_message_retry_check_actual") #needed
now = time.time()
for m in messages:
if m.timestamp + self._message_retry < now:
if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec:
m.timestamp = now
m.dup = True
self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
elif m.state == mqtt_ms_wait_for_pubrel:
m.timestamp = now
m.dup = True
self._send_pubrec(m.mid)
elif m.state == mqtt_ms_wait_for_pubcomp:
m.timestamp = now
m.dup = True
self._send_pubrel(m.mid, True)
def _message_retry_check(self):
print("_message_retry_check") #needed
self._message_retry_check_actual(self._out_messages)
self._message_retry_check_actual(self._in_messages)
def _messages_reconnect_reset_out(self):
print("_messages_reconnect_reset_out") #needed
self._inflight_messages = 0
for m in self._out_messages:
m.timestamp = 0
if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
if m.qos == 0:
m.state = mqtt_ms_publish
elif m.qos == 1:
#self._inflight_messages = self._inflight_messages + 1
if m.state == mqtt_ms_wait_for_puback:
m.dup = True
m.state = mqtt_ms_publish
elif m.qos == 2:
#self._inflight_messages = self._inflight_messages + 1
if m.state == mqtt_ms_wait_for_pubcomp:
m.state = mqtt_ms_resend_pubrel
m.dup = True
else:
if m.state == mqtt_ms_wait_for_pubrec:
m.dup = True
m.state = mqtt_ms_publish
else:
m.state = mqtt_ms_queued
def _messages_reconnect_reset_in(self):
print("_messages_reconnect_reset_in") #needed
for m in self._in_messages:
m.timestamp = 0
if m.qos != 2:
self._in_messages.pop(self._in_messages.index(m))
else:
# Preserve current state
pass
def _messages_reconnect_reset(self):
print("_messages_reconnect_reset") #needed
self._messages_reconnect_reset_out()
self._messages_reconnect_reset_in()
def _packet_queue(self, command, packet, mid, qos):
print("_packet_queue") #needed
mpkt = dict(
command = command,
mid = mid,
qos = qos,
pos = 0,
to_process = len(packet),
packet = packet)
self._out_packet.append(mpkt)
if self._current_out_packet is None and len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.pop(0)
print("self._out_packet =", self._out_packet)
# Write a single byte to sockpairW (connected to sockpairR) to break
# out of select() if in threaded mode.
#try:
# self._sockpairW.send(sockpair_data)
#except socket.error as err:
# if err.errno != EAGAIN:
# raise
print("self._in_callback =", self._in_callback)
if not self._in_callback:
return self.loop_write()
else:
print(MQTT_ERR_SUCCESS)
return MQTT_ERR_SUCCESS
def _packet_handle(self):
print("_packet_handle")
cmd = self._in_packet['command']&0xF0
if cmd == PINGREQ:
return self._handle_pingreq()
elif cmd == PINGRESP:
return self._handle_pingresp()
elif cmd == PUBACK:
return self._handle_pubackcomp("PUBACK")
elif cmd == PUBCOMP:
return self._handle_pubackcomp("PUBCOMP")
elif cmd == PUBLISH: #appear to need PUBLISH
return self._handle_publish()
elif cmd == PUBREC:
return self._handle_pubrec()
elif cmd == PUBREL:
return self._handle_pubrel()
elif cmd == CONNACK:
return self._handle_connack()
elif cmd == SUBACK:
return self._handle_suback()
elif cmd == UNSUBACK:
return self._handle_unsuback()
else:
# If we don't recognise the command, return an error straight away.
self._easy_log(MQTT_LOG_ERR, "Error: Unrecognised command "+str(cmd))
return MQTT_ERR_PROTOCOL
def _handle_connack(self):
print("_handle_connack") #needed
if self._strict_protocol:
if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
if len(self._in_packet['packet']) != 2:
return MQTT_ERR_PROTOCOL
(flags, result) = struct.unpack("!BB", self._in_packet['packet'])
if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311:
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.")
# Downgrade to MQTT v3.1
self._protocol = MQTTv31
return self.reconnect()
if result == 0:
self._state = mqtt_cs_connected
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")")
if self.on_connect:
self._in_callback = True
flags_dict = dict()
flags_dict['session present'] = flags & 0x01
self.on_connect(self, self._userdata, flags_dict, result)
self._in_callback = False
if result == 0:
rc = 0
for m in self._out_messages:
m.timestamp = time.time()
if m.state == mqtt_ms_queued:
self.loop_write() # Process outgoing messages that have just been queued up
return MQTT_ERR_SUCCESS
if m.qos == 0:
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
return rc
elif m.qos == 1:
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_puback
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
return rc
elif m.qos == 2:
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubrec
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
return rc
elif m.state == mqtt_ms_resend_pubrel:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubcomp
self._in_callback = True # Don't call loop_write after _send_pubrel()
rc = self._send_pubrel(m.mid, m.dup)
self._in_callback = False
if rc != 0:
return rc
self.loop_write() # Process outgoing messages that have just been queued up
return rc
elif result > 0 and result < 6:
return MQTT_ERR_CONN_REFUSED
else:
return MQTT_ERR_PROTOCOL
def _handle_suback(self):
print("_handle_suback") #needed after _packet_handle
self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK")
pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
(mid, packet) = struct.unpack(pack_format, self._in_packet['packet'])
pack_format = "!" + "B"*len(packet)
granted_qos = struct.unpack(pack_format, packet)
if self.on_subscribe:
self._in_callback = True
self.on_subscribe(self, self._userdata, mid, granted_qos)
self._in_callback = False
return MQTT_ERR_SUCCESS
def _handle_publish(self):
rc = 0
print("_handle_publish") #needed after packet_handle
header = self._in_packet['command']
message = MQTTMessage()
message.dup = (header & 0x08)>>3
message.qos = (header & 0x06)>>1
message.retain = (header & 0x01)
pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
(slen, packet) = struct.unpack(pack_format, self._in_packet['packet'])
pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's'
(message.topic, packet) = struct.unpack(pack_format, packet)
if len(message.topic) == 0:
return MQTT_ERR_PROTOCOL
if sys.version_info[0] >= 3:
message.topic = message.topic.decode('utf-8')
if message.qos > 0:
pack_format = "!H" + str(len(packet)-2) + 's'
(message.mid, packet) = struct.unpack(pack_format, packet)
message.payload = packet
self._easy_log(
MQTT_LOG_DEBUG,
"Received PUBLISH (d"+str(message.dup)+
", q"+str(message.qos)+", r"+str(message.retain)+
", m"+str(message.mid)+", '"+message.topic+
"', ... ("+str(len(message.payload))+" bytes)")
message.timestamp = time.time()
if message.qos == 0:
self._handle_on_message(message)
return MQTT_ERR_SUCCESS
elif message.qos == 1:
rc = self._send_puback(message.mid)
self._handle_on_message(message)
return rc
elif message.qos == 2:
rc = self._send_pubrec(message.mid)
message.state = mqtt_ms_wait_for_pubrel
self._in_message_mutex.acquire()
self._in_messages.append(message)
self._in_message_mutex.release()
return rc
else:
return MQTT_ERR_PROTOCOL
def _handle_pubrel(self):
if self._strict_protocol:
if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
if len(self._in_packet['packet']) != 2:
return MQTT_ERR_PROTOCOL
mid = struct.unpack("!H", self._in_packet['packet'])
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")")
self._in_message_mutex.acquire()
for i in range(len(self._in_messages)):
if self._in_messages[i].mid == mid:
# Only pass the message on if we have removed it from the queue - this
# prevents multiple callbacks for the same message.
self._handle_on_message(self._in_messages[i])
self._in_messages.pop(i)
self._inflight_messages = self._inflight_messages - 1
if self._max_inflight_messages > 0:
rc = self._update_inflight()
if rc != MQTT_ERR_SUCCESS:
return rc
self._in_message_mutex.release()
return self._send_pubcomp(mid)
return MQTT_ERR_SUCCESS
def _update_inflight(self):
# Dont lock message_mutex here
for m in self._out_messages:
if self._inflight_messages < self._max_inflight_messages:
if m.qos > 0 and m.state == mqtt_ms_queued:
self._inflight_messages = self._inflight_messages + 1
if m.qos == 1:
m.state = mqtt_ms_wait_for_puback
elif m.qos == 2:
m.state = mqtt_ms_wait_for_pubrec
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
if rc != 0:
return rc
else:
return MQTT_ERR_SUCCESS
return MQTT_ERR_SUCCESS
def _handle_pubrec(self):
if self._strict_protocol:
if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
mid = struct.unpack("!H", self._in_packet['packet'])
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")")
for m in self._out_messages:
if m.mid == mid:
m.state = mqtt_ms_wait_for_pubcomp
m.timestamp = time.time()
return self._send_pubrel(mid, False)
return MQTT_ERR_SUCCESS
def _handle_unsuback(self):
if self._strict_protocol:
if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
mid = struct.unpack("!H", self._in_packet['packet'])
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")")
if self.on_unsubscribe:
self._in_callback = True
self.on_unsubscribe(self, self._userdata, mid)
self._in_callback = False
return MQTT_ERR_SUCCESS
def _handle_pubackcomp(self, cmd):
if self._strict_protocol:
if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
mid = struct.unpack("!H", self._in_packet['packet'])
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")")
for i in range(len(self._out_messages)):
try:
if self._out_messages[i].mid == mid:
# Only inform the client the message has been sent once.
if self.on_publish:
self._in_callback = True
self.on_publish(self, self._userdata, mid)
self._in_callback = False
self._out_messages.pop(i)
self._inflight_messages = self._inflight_messages - 1
if self._max_inflight_messages > 0:
rc = self._update_inflight()
if rc != MQTT_ERR_SUCCESS:
return rc
return MQTT_ERR_SUCCESS
except IndexError:
# Have removed item so i>count.
# Not really an error.
pass
return MQTT_ERR_SUCCESS
def _handle_on_message(self, message):
matched = False
for t in self.on_message_filtered:
if topic_matches_sub(t[0], message.topic):
self._in_callback = True
t[1](self, self._userdata, message)
self._in_callback = False
matched = True
if matched == False and self.on_message:
self._in_callback = True
self.on_message(self, self._userdata, message)
self._in_callback = False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment