Skip to content

Instantly share code, notes, and snippets.

@mickey-happygolucky
Last active August 18, 2020 01:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mickey-happygolucky/8f311691be2b8000f82628b56ded435c to your computer and use it in GitHub Desktop.
Save mickey-happygolucky/8f311691be2b8000f82628b56ded435c to your computer and use it in GitHub Desktop.
diff --git a/include/netutils/mqtt.h b/include/netutils/mqtt.h
new file mode 100644
index 00000000..b2077e67
--- /dev/null
+++ b/include/netutils/mqtt.h
@@ -0,0 +1,1509 @@
+#ifndef __MQTT_H__
+#define __MQTT_H__
+
+#include "mqtt_pal.h"
+
+/**
+ * @file
+ * @brief Declares all the MQTT-C functions and datastructures.
+ *
+ * @note You should <code>\#include <mqtt.h></code>.
+ *
+ * @example simple_publisher.c
+ * A simple program to that publishes the current time whenever ENTER is pressed.
+ *
+ * Usage:
+ * \code{.sh}
+ * ./bin/simple_publisher [address [port [topic]]]
+ * \endcode
+ *
+ * Where \c address is the address of the MQTT broker, \c port is the port number the
+ * MQTT broker is running on, and \c topic is the name of the topic to publish with. Note
+ * that all these arguments are optional and the defaults are \c address = \c "test.mosquitto.org",
+ * \c port = \c "1883", and \c topic = "datetime".
+ *
+ * @example simple_subscriber.c
+ * A simple program that subscribes to a single topic and prints all updates that are received.
+ *
+ * Usage:
+ * \code{.sh}
+ * ./bin/simple_subscriber [address [port [topic]]]
+ * \endcode
+ *
+ * Where \c address is the address of the MQTT broker, \c port is the port number the
+ * MQTT broker is running on, and \c topic is the name of the topic subscribe to. Note
+ * that all these arguments are optional and the defaults are \c address = \c "test.mosquitto.org",
+ * \c port = \c "1883", and \c topic = "datetime".
+ *
+ * @example reconnect_subscriber.c
+ * Same program as \ref simple_subscriber.c, but using the automatic reconnect functionality.
+ *
+ * @example bio_publisher.c
+ * Same program as \ref simple_publisher.c, but uses a unencrypted BIO socket.
+ *
+ * @example openssl_publisher.c
+ * Same program as \ref simple_publisher.c, but over an encrypted connection using OpenSSL.
+ *
+ * Usage:
+ * \code{.sh}
+ * ./bin/openssl_publisher ca_file [address [port [topic]]]
+ * \endcode
+ *
+ *
+ * @defgroup api API
+ * @brief Documentation of everything you need to know to use the MQTT-C client.
+ *
+ * This module contains everything you need to know to use MQTT-C in your application.
+ * For usage examples see:
+ * - @ref simple_publisher.c
+ * - @ref simple_subscriber.c
+ * - @ref reconnect_subscriber.c
+ * - @ref bio_publisher.c
+ * - @ref openssl_publisher.c
+ *
+ * @note MQTT-C can be used in both single-threaded and multi-threaded applications. All
+ * the functions in \ref api are thread-safe.
+ *
+ * @defgroup packers Control Packet Serialization
+ * @brief Developer documentation of the functions and datastructures used for serializing MQTT
+ * control packets.
+ *
+ * @defgroup unpackers Control Packet Deserialization
+ * @brief Developer documentation of the functions and datastructures used for deserializing MQTT
+ * control packets.
+ *
+ * @defgroup details Utilities
+ * @brief Developer documentation for the utilities used to implement the MQTT-C client.
+ *
+ * @note To deserialize a packet from a buffer use \ref mqtt_unpack_response (it's the only
+ * function you need).
+ */
+
+
+ /**
+ * @brief An enumeration of the MQTT control packet types.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718021">
+ * MQTT v3.1.1: MQTT Control Packet Types
+ * </a>
+ */
+ enum MQTTControlPacketType {
+ MQTT_CONTROL_CONNECT=1u,
+ MQTT_CONTROL_CONNACK=2u,
+ MQTT_CONTROL_PUBLISH=3u,
+ MQTT_CONTROL_PUBACK=4u,
+ MQTT_CONTROL_PUBREC=5u,
+ MQTT_CONTROL_PUBREL=6u,
+ MQTT_CONTROL_PUBCOMP=7u,
+ MQTT_CONTROL_SUBSCRIBE=8u,
+ MQTT_CONTROL_SUBACK=9u,
+ MQTT_CONTROL_UNSUBSCRIBE=10u,
+ MQTT_CONTROL_UNSUBACK=11u,
+ MQTT_CONTROL_PINGREQ=12u,
+ MQTT_CONTROL_PINGRESP=13u,
+ MQTT_CONTROL_DISCONNECT=14u
+};
+
+/**
+ * @brief The fixed header of an MQTT control packet.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020">
+ * MQTT v3.1.1: Fixed Header
+ * </a>
+ */
+struct mqtt_fixed_header {
+ /** The type of packet. */
+ enum MQTTControlPacketType control_type;
+
+ /** The packets control flags.*/
+ uint8_t control_flags: 4;
+
+ /** The remaining size of the packet in bytes (i.e. the size of variable header and payload).*/
+ uint32_t remaining_length;
+};
+
+/**
+ * @brief The protocol identifier for MQTT v3.1.1.
+ * @ingroup packers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030">
+ * MQTT v3.1.1: CONNECT Variable Header.
+ * </a>
+ */
+#define MQTT_PROTOCOL_LEVEL 0x04
+
+/**
+ * @brief A macro used to declare the enum MQTTErrors and associated
+ * error messages (the members of the num) at the same time.
+ */
+#define __ALL_MQTT_ERRORS(MQTT_ERROR) \
+ MQTT_ERROR(MQTT_ERROR_NULLPTR) \
+ MQTT_ERROR(MQTT_ERROR_CONTROL_FORBIDDEN_TYPE) \
+ MQTT_ERROR(MQTT_ERROR_CONTROL_INVALID_FLAGS) \
+ MQTT_ERROR(MQTT_ERROR_CONTROL_WRONG_TYPE) \
+ MQTT_ERROR(MQTT_ERROR_CONNECT_NULL_CLIENT_ID) \
+ MQTT_ERROR(MQTT_ERROR_CONNECT_NULL_WILL_MESSAGE) \
+ MQTT_ERROR(MQTT_ERROR_CONNECT_FORBIDDEN_WILL_QOS) \
+ MQTT_ERROR(MQTT_ERROR_CONNACK_FORBIDDEN_FLAGS) \
+ MQTT_ERROR(MQTT_ERROR_CONNACK_FORBIDDEN_CODE) \
+ MQTT_ERROR(MQTT_ERROR_PUBLISH_FORBIDDEN_QOS) \
+ MQTT_ERROR(MQTT_ERROR_SUBSCRIBE_TOO_MANY_TOPICS) \
+ MQTT_ERROR(MQTT_ERROR_MALFORMED_RESPONSE) \
+ MQTT_ERROR(MQTT_ERROR_UNSUBSCRIBE_TOO_MANY_TOPICS) \
+ MQTT_ERROR(MQTT_ERROR_RESPONSE_INVALID_CONTROL_TYPE) \
+ MQTT_ERROR(MQTT_ERROR_CONNECT_NOT_CALLED) \
+ MQTT_ERROR(MQTT_ERROR_SEND_BUFFER_IS_FULL) \
+ MQTT_ERROR(MQTT_ERROR_SOCKET_ERROR) \
+ MQTT_ERROR(MQTT_ERROR_MALFORMED_REQUEST) \
+ MQTT_ERROR(MQTT_ERROR_RECV_BUFFER_TOO_SMALL) \
+ MQTT_ERROR(MQTT_ERROR_ACK_OF_UNKNOWN) \
+ MQTT_ERROR(MQTT_ERROR_NOT_IMPLEMENTED) \
+ MQTT_ERROR(MQTT_ERROR_CONNECTION_REFUSED) \
+ MQTT_ERROR(MQTT_ERROR_SUBSCRIBE_FAILED) \
+ MQTT_ERROR(MQTT_ERROR_CONNECTION_CLOSED) \
+ MQTT_ERROR(MQTT_ERROR_INITIAL_RECONNECT) \
+
+/* todo: add more connection refused errors */
+
+/**
+ * @brief A macro used to generate the enum MQTTErrors from
+ * \ref __ALL_MQTT_ERRORS
+ * @see __ALL_MQTT_ERRORS
+*/
+#define GENERATE_ENUM(ENUM) ENUM,
+
+/**
+ * @brief A macro used to generate the error messages associated with
+ * MQTTErrors from \ref __ALL_MQTT_ERRORS
+ * @see __ALL_MQTT_ERRORS
+*/
+#define GENERATE_STRING(STRING) #STRING,
+
+
+/**
+ * @brief An enumeration of error codes. Error messages can be retrieved by calling \ref mqtt_error_str.
+ * @ingroup api
+ *
+ * @see mqtt_error_str
+ */
+enum MQTTErrors {
+ MQTT_ERROR_UNKNOWN=INT_MIN,
+ __ALL_MQTT_ERRORS(GENERATE_ENUM)
+ MQTT_OK = 1
+};
+
+/**
+ * @brief Returns an error message for error code, \p error.
+ * @ingroup api
+ *
+ * @param[in] error the error code.
+ *
+ * @returns The associated error message.
+ */
+const char* mqtt_error_str(enum MQTTErrors error);
+
+/**
+ * @brief Pack a MQTT string, given a c-string \p str.
+ *
+ * @param[out] buf the buffer that the MQTT string will be written to.
+ * @param[in] str the c-string to be written to \p buf.
+ *
+ * @warning This function provides no error checking.
+ *
+ * @returns strlen(str) + 2
+*/
+ssize_t __mqtt_pack_str(uint8_t *buf, const char* str);
+
+/** @brief A macro to get the MQTT string length from a c-string. */
+#define __mqtt_packed_cstrlen(x) (2 + strlen(x))
+
+/* RESPONSES */
+
+/**
+ * @brief An enumeration of the return codes returned in a CONNACK packet.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.1_-">
+ * MQTT v3.1.1: CONNACK return codes.
+ * </a>
+ */
+enum MQTTConnackReturnCode {
+ MQTT_CONNACK_ACCEPTED = 0u,
+ MQTT_CONNACK_REFUSED_PROTOCOL_VERSION = 1u,
+ MQTT_CONNACK_REFUSED_IDENTIFIER_REJECTED = 2u,
+ MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3u,
+ MQTT_CONNACK_REFUSED_BAD_USER_NAME_OR_PASSWORD = 4u,
+ MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5u
+};
+
+/**
+ * @brief A connection response datastructure.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033">
+ * MQTT v3.1.1: CONNACK - Acknowledgement connection response.
+ * </a>
+ */
+struct mqtt_response_connack {
+ /**
+ * @brief Allows client and broker to check if they have a consistent view about whether there is
+ * already a stored session state.
+ */
+ uint8_t session_present_flag;
+
+ /**
+ * @brief The return code of the connection request.
+ *
+ * @see MQTTConnackReturnCode
+ */
+ enum MQTTConnackReturnCode return_code;
+};
+
+ /**
+ * @brief A publish packet received from the broker.
+ * @ingroup unpackers
+ *
+ * A publish packet is received from the broker when a client publishes to a topic that the
+ * \em {local client} is subscribed to.
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037">
+ * MQTT v3.1.1: PUBLISH - Publish Message.
+ * </a>
+ */
+struct mqtt_response_publish {
+ /**
+ * @brief The DUP flag. DUP flag is 0 if its the first attempt to send this publish packet. A DUP flag
+ * of 1 means that this might be a re-delivery of the packet.
+ */
+ uint8_t dup_flag;
+
+ /**
+ * @brief The quality of service level.
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.11_-">
+ * MQTT v3.1.1: QoS Definitions
+ * </a>
+ */
+ uint8_t qos_level;
+
+ /** @brief The retain flag of this publish message. */
+ uint8_t retain_flag;
+
+ /** @brief Size of the topic name (number of characters). */
+ uint16_t topic_name_size;
+
+ /**
+ * @brief The topic name.
+ * @note topic_name is not null terminated. Therefore topic_name_size must be used to get the
+ * string length.
+ */
+ const void* topic_name;
+
+ /** @brief The publish message's packet ID. */
+ uint16_t packet_id;
+
+ /** @brief The publish message's application message.*/
+ const void* application_message;
+
+ /** @brief The size of the application message in bytes. */
+ size_t application_message_size;
+};
+
+/**
+ * @brief A publish acknowledgement for messages that were published with QoS level 1.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043">
+ * MQTT v3.1.1: PUBACK - Publish Acknowledgement.
+ * </a>
+ *
+ */
+struct mqtt_response_puback {
+ /** @brief The published messages packet ID. */
+ uint16_t packet_id;
+};
+
+/**
+ * @brief The response packet to a PUBLISH packet with QoS level 2.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048">
+ * MQTT v3.1.1: PUBREC - Publish Received.
+ * </a>
+ *
+ */
+struct mqtt_response_pubrec {
+ /** @brief The published messages packet ID. */
+ uint16_t packet_id;
+};
+
+/**
+ * @brief The response to a PUBREC packet.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718053">
+ * MQTT v3.1.1: PUBREL - Publish Release.
+ * </a>
+ *
+ */
+struct mqtt_response_pubrel {
+ /** @brief The published messages packet ID. */
+ uint16_t packet_id;
+};
+
+/**
+ * @brief The response to a PUBREL packet.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718058">
+ * MQTT v3.1.1: PUBCOMP - Publish Complete.
+ * </a>
+ *
+ */
+struct mqtt_response_pubcomp {
+ /** T@brief he published messages packet ID. */
+ uint16_t packet_id;
+};
+
+/**
+ * @brief An enumeration of subscription acknowledgement return codes.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.26_-">
+ * MQTT v3.1.1: SUBACK Return Codes.
+ * </a>
+ */
+enum MQTTSubackReturnCodes {
+ MQTT_SUBACK_SUCCESS_MAX_QOS_0 = 0u,
+ MQTT_SUBACK_SUCCESS_MAX_QOS_1 = 1u,
+ MQTT_SUBACK_SUCCESS_MAX_QOS_2 = 2u,
+ MQTT_SUBACK_FAILURE = 128u
+};
+
+/**
+ * @brief The response to a subscription request.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718068">
+ * MQTT v3.1.1: SUBACK - Subscription Acknowledgement.
+ * </a>
+ */
+struct mqtt_response_suback {
+ /** @brief The published messages packet ID. */
+ uint16_t packet_id;
+
+ /**
+ * Array of return codes corresponding to the requested subscribe topics.
+ *
+ * @see MQTTSubackReturnCodes
+ */
+ const uint8_t *return_codes;
+
+ /** The number of return codes. */
+ size_t num_return_codes;
+};
+
+/**
+ * @brief The brokers response to a UNSUBSCRIBE request.
+ * @ingroup unpackers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077">
+ * MQTT v3.1.1: UNSUBACK - Unsubscribe Acknowledgement.
+ * </a>
+ */
+struct mqtt_response_unsuback {
+ /** @brief The published messages packet ID. */
+ uint16_t packet_id;
+};
+
+/**
+ * @brief The response to a ping request.
+ * @ingroup unpackers
+ *
+ * @note This response contains no members.
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718086">
+ * MQTT v3.1.1: PINGRESP - Ping Response.
+ * </a>
+ */
+struct mqtt_response_pingresp {};
+
+/**
+ * @brief A struct used to deserialize/interpret an incoming packet from the broker.
+ * @ingroup unpackers
+ */
+struct mqtt_response {
+ /** @brief The mqtt_fixed_header of the deserialized packet. */
+ struct mqtt_fixed_header fixed_header;
+
+ /**
+ * @brief A union of the possible responses from the broker.
+ *
+ * @note The fixed_header contains the control type. This control type corresponds to the
+ * member of this union that should be accessed. For example if
+ * fixed_header#control_type == \c MQTT_CONTROL_PUBLISH then
+ * decoded#publish should be accessed.
+ */
+ union {
+ struct mqtt_response_connack connack;
+ struct mqtt_response_publish publish;
+ struct mqtt_response_puback puback;
+ struct mqtt_response_pubrec pubrec;
+ struct mqtt_response_pubrel pubrel;
+ struct mqtt_response_pubcomp pubcomp;
+ struct mqtt_response_suback suback;
+ struct mqtt_response_unsuback unsuback;
+ struct mqtt_response_pingresp pingresp;
+ } decoded;
+};
+
+/**
+ * @brief Deserialize the contents of \p buf into an mqtt_fixed_header object.
+ * @ingroup unpackers
+ *
+ * @note This function performs complete error checking and a positive return value
+ * means the entire mqtt_response can be deserialized from \p buf.
+ *
+ * @param[out] response the response who's \ref mqtt_response.fixed_header will be initialized.
+ * @param[in] buf the buffer.
+ * @param[in] bufsz the total number of bytes in the buffer.
+ *
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
+ * bytes to parse the packet, or a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t *buf, size_t bufsz);
+
+/**
+ * @brief Deserialize a CONNACK response from \p buf.
+ * @ingroup unpackers
+ *
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the control packet type
+ * must be \c MQTT_CONTROL_CONNACK.
+ *
+ * @param[out] mqtt_response the mqtt_response that will be initialized.
+ * @param[in] buf the buffer that contains the variable header and payload of the packet. The
+ * first byte of \p buf should be the first byte of the variable header.
+ *
+ * @relates mqtt_response_connack
+ *
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
+ * bytes to parse the packet, or a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_unpack_connack_response (struct mqtt_response *mqtt_response, const uint8_t *buf);
+
+/**
+ * @brief Deserialize a publish response from \p buf.
+ * @ingroup unpackers
+ *
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must
+ * have a control type of \c MQTT_CONTROL_PUBLISH.
+ *
+ * @param[out] mqtt_response the response that is initialized from the contents of \p buf.
+ * @param[in] buf the buffer with the incoming data.
+ *
+ * @relates mqtt_response_publish
+ *
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
+ * bytes to parse the packet, or a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_unpack_publish_response (struct mqtt_response *mqtt_response, const uint8_t *buf);
+
+/**
+ * @brief Deserialize a PUBACK/PUBREC/PUBREL/PUBCOMP packet from \p buf.
+ * @ingroup unpackers
+ *
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must
+ * have a control type of \c MQTT_CONTROL_PUBACK, \c MQTT_CONTROL_PUBREC, \c MQTT_CONTROL_PUBREL
+ * or \c MQTT_CONTROL_PUBCOMP.
+ *
+ * @param[out] mqtt_response the response that is initialized from the contents of \p buf.
+ * @param[in] buf the buffer with the incoming data.
+ *
+ * @relates mqtt_response_puback mqtt_response_pubrec mqtt_response_pubrel mqtt_response_pubcomp
+ *
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
+ * bytes to parse the packet, or a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf);
+
+/**
+ * @brief Deserialize a SUBACK packet from \p buf.
+ * @ingroup unpacker
+ *
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must
+ * have a control type of \c MQTT_CONTROL_SUBACK.
+ *
+ * @param[out] mqtt_response the response that is initialized from the contents of \p buf.
+ * @param[in] buf the buffer with the incoming data.
+ *
+ * @relates mqtt_response_suback
+ *
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
+ * bytes to parse the packet, or a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_unpack_suback_response(struct mqtt_response *mqtt_response, const uint8_t *buf);
+
+/**
+ * @brief Deserialize an UNSUBACK packet from \p buf.
+ * @ingroup unpacker
+ *
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must
+ * have a control type of \c MQTT_CONTROL_UNSUBACK.
+ *
+ * @param[out] mqtt_response the response that is initialized from the contents of \p buf.
+ * @param[in] buf the buffer with the incoming data.
+ *
+ * @relates mqtt_response_unsuback
+ *
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
+ * bytes to parse the packet, or a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf);
+
+/**
+ * @brief Deserialize a packet from the broker.
+ * @ingroup unpackers
+ *
+ * @param[out] response the mqtt_response that will be initialize from \p buf.
+ * @param[in] buf the incoming data buffer.
+ * @param[in] bufsz the number of bytes available in the buffer.
+ *
+ * @relates mqtt_response
+ *
+ * @returns The number of bytes consumed on success, zero \p buf does not contain enough bytes
+ * to deserialize the packet, a negative value if a protocol violation was encountered.
+ */
+ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, size_t bufsz);
+
+/* REQUESTS */
+
+ /**
+ * @brief Serialize an mqtt_fixed_header and write it to \p buf.
+ * @ingroup packers
+ *
+ * @note This function performs complete error checking and a positive return value
+ * guarantees the entire packet will fit into the given buffer.
+ *
+ * @param[out] buf the buffer to write to.
+ * @param[in] bufsz the maximum number of bytes that can be put in to \p buf.
+ * @param[in] fixed_header the fixed header that will be serialized.
+ *
+ * @returns The number of bytes written to \p buf, or 0 if \p buf is too small, or a
+ * negative value if there was a protocol violation.
+ */
+ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fixed_header *fixed_header);
+
+/**
+ * @brief An enumeration of CONNECT packet flags.
+ * @ingroup packers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030">
+ * MQTT v3.1.1: CONNECT Variable Header.
+ * </a>
+ */
+enum MQTTConnectFlags {
+ MQTT_CONNECT_RESERVED = 1u,
+ MQTT_CONNECT_CLEAN_SESSION = 2u,
+ MQTT_CONNECT_WILL_FLAG = 4u,
+ MQTT_CONNECT_WILL_QOS_0 = (0u & 0x03) << 3,
+ MQTT_CONNECT_WILL_QOS_1 = (1u & 0x03) << 3,
+ MQTT_CONNECT_WILL_QOS_2 = (2u & 0x03) << 3,
+ MQTT_CONNECT_WILL_RETAIN = 32u,
+ MQTT_CONNECT_PASSWORD = 64u,
+ MQTT_CONNECT_USER_NAME = 128u,
+};
+
+/**
+ * @brief Serialize a connection request into a buffer.
+ * @ingroup packers
+ *
+ * @param[out] buf the buffer to pack the connection request packet into.
+ * @param[in] bufsz the number of bytes left in \p buf.
+ * @param[in] client_id the ID that identifies the local client. \p client_id is a required
+ * parameter.
+ * @param[in] will_topic the topic under which the local client's will message will be published.
+ * Set to \c NULL for no will message. If \p will_topic is not \c NULL a
+ * \p will_message must also be provided.
+ * @param[in] will_message the will message to be published upon a unsuccessful disconnection of
+ * the local client. Set to \c NULL if \p will_topic is \c NULL.
+ * \p will_message must \em not be \c NULL if \p will_topic is not
+ * \c NULL.
+ * @param[in] will_message_size The size of \p will_message in bytes.
+ * @param[in] user_name the username to be used to connect to the broker with. Set to \c NULL if
+ * no username is required.
+ * @param[in] password the password to be used to connect to the broker with. Set to \c NULL if
+ * no password is required.
+ * @param[in] connect_flags additional MQTTConnectFlags to be set. The only flags that need to be
+ * set manually are \c MQTT_CONNECT_CLEAN_SESSION,
+ * \c MQTT_CONNECT_WILL_QOS_X (for \c X &isin; {0, 1, 2}), and
+ * \c MQTT_CONNECT_WILL_RETAIN. Set to 0 if no additional flags are
+ * required.
+ * @param[in] keep_alive the keep alive time in seconds. It is the responsibility of the clinet
+ * to ensure packets are sent to the server \em {at least} this frequently.
+ *
+ * @note If there is a \p will_topic and no additional \p connect_flags are given, then by
+ * default \p will_message will be published at QoS level 0.
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028">
+ * MQTT v3.1.1: CONNECT - Client Requests a Connection to a Server.
+ * </a>
+ *
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the CONNECT
+ * packet, a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz,
+ const char* client_id,
+ const char* will_topic,
+ const void* will_message,
+ size_t will_message_size,
+ const char* user_name,
+ const char* password,
+ uint8_t connect_flags,
+ uint16_t keep_alive);
+
+/**
+ * @brief An enumeration of the PUBLISH flags.
+ * @ingroup packers
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037">
+ * MQTT v3.1.1: PUBLISH - Publish Message.
+ * </a>
+ */
+enum MQTTPublishFlags {
+ MQTT_PUBLISH_DUP = 8u,
+ MQTT_PUBLISH_QOS_0 = ((0u << 1) & 0x06),
+ MQTT_PUBLISH_QOS_1 = ((1u << 1) & 0x06),
+ MQTT_PUBLISH_QOS_2 = ((2u << 1) & 0x06),
+ MQTT_PUBLISH_QOS_MASK = ((3u << 1) & 0x06),
+ MQTT_PUBLISH_RETAIN = 0x01
+};
+
+/**
+ * @brief Serialize a PUBLISH request and put it in \p buf.
+ * @ingroup packers
+ *
+ * @param[out] buf the buffer to put the PUBLISH packet in.
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf.
+ * @param[in] topic_name the topic to publish \p application_message under.
+ * @param[in] packet_id this packets packet ID.
+ * @param[in] application_message the application message to be published.
+ * @param[in] application_message_size the size of \p application_message in bytes.
+ * @param[in] publish_flags The flags to publish \p application_message with. These include
+ * the \c MQTT_PUBLISH_DUP flag, \c MQTT_PUBLISH_QOS_X (\c X &isin;
+ * {0, 1, 2}), and \c MQTT_PUBLISH_RETAIN flag.
+ *
+ * @note The default QoS is level 0.
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037">
+ * MQTT v3.1.1: PUBLISH - Publish Message.
+ * </a>
+ *
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBLISH
+ * packet, a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz,
+ const char* topic_name,
+ uint16_t packet_id,
+ void* application_message,
+ size_t application_message_size,
+ uint8_t publish_flags);
+
+/**
+ * @brief Serialize a PUBACK, PUBREC, PUBREL, or PUBCOMP packet and put it in \p buf.
+ * @ingroup packers
+ *
+ * @param[out] buf the buffer to put the PUBXXX packet in.
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf.
+ * @param[in] control_type the type of packet. Must be one of: \c MQTT_CONTROL_PUBACK,
+ * \c MQTT_CONTROL_PUBREC, \c MQTT_CONTROL_PUBREL,
+ * or \c MQTT_CONTROL_PUBCOMP.
+ * @param[in] packet_id the packet ID of the packet being acknowledged.
+ *
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043">
+ * MQTT v3.1.1: PUBACK - Publish Acknowledgement.
+ * </a>
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048">
+ * MQTT v3.1.1: PUBREC - Publish Received.
+ * </a>
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718053">
+ * MQTT v3.1.1: PUBREL - Publish Released.
+ * </a>
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718058">
+ * MQTT v3.1.1: PUBCOMP - Publish Complete.
+ * </a>
+ *
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBXXX
+ * packet, a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz,
+ enum MQTTControlPacketType control_type,
+ uint16_t packet_id);
+
+/**
+ * @brief The maximum number topics that can be subscribed to in a single call to
+ * mqtt_pack_subscribe_request.
+ * @ingroup packers
+ *
+ * @see mqtt_pack_subscribe_request
+ */
+#define MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS 8
+
+/**
+ * @brief Serialize a SUBSCRIBE packet and put it in \p buf.
+ * @ingroup packers
+ *
+ * @param[out] buf the buffer to put the SUBSCRIBE packet in.
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf.
+ * @param[in] packet_id the packet ID to be used.
+ * @param[in] ... \c NULL terminated list of (\c {const char *topic_name}, \c {int max_qos_level})
+ * pairs.
+ *
+ * @note The variadic arguments, \p ..., \em must be followed by a \c NULL. For example:
+ * @code
+ * ssize_t n = mqtt_pack_subscribe_request(buf, bufsz, 1234, "topic_1", 0, "topic_2", 2, NULL);
+ * @endcode
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063">
+ * MQTT v3.1.1: SUBSCRIBE - Subscribe to Topics.
+ * </a>
+ *
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the SUBSCRIBE
+ * packet, a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz,
+ uint16_t packet_id,
+ ...); /* null terminated */
+
+/**
+ * @brief The maximum number topics that can be subscribed to in a single call to
+ * mqtt_pack_unsubscribe_request.
+ * @ingroup packers
+ *
+ * @see mqtt_pack_unsubscribe_request
+ */
+#define MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS 8
+
+/**
+ * @brief Serialize a UNSUBSCRIBE packet and put it in \p buf.
+ * @ingroup packers
+ *
+ * @param[out] buf the buffer to put the UNSUBSCRIBE packet in.
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf.
+ * @param[in] packet_id the packet ID to be used.
+ * @param[in] ... \c NULL terminated list of \c {const char *topic_name}'s to unsubscribe from.
+ *
+ * @note The variadic arguments, \p ..., \em must be followed by a \c NULL. For example:
+ * @code
+ * ssize_t n = mqtt_pack_unsubscribe_request(buf, bufsz, 4321, "topic_1", "topic_2", NULL);
+ * @endcode
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072">
+ * MQTT v3.1.1: UNSUBSCRIBE - Unsubscribe from Topics.
+ * </a>
+ *
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the UNSUBSCRIBE
+ * packet, a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz,
+ uint16_t packet_id,
+ ...); /* null terminated */
+
+/**
+ * @brief Serialize a PINGREQ and put it into \p buf.
+ * @ingroup packers
+ *
+ * @param[out] buf the buffer to put the PINGREQ packet in.
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf.
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718081">
+ * MQTT v3.1.1: PINGREQ - Ping Request.
+ * </a>
+ *
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PINGREQ
+ * packet, a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_pack_ping_request(uint8_t *buf, size_t bufsz);
+
+/**
+ * @brief Serialize a DISCONNECT and put it into \p buf.
+ * @ingroup packers
+ *
+ * @param[out] buf the buffer to put the DISCONNECT packet in.
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf.
+ *
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090">
+ * MQTT v3.1.1: DISCONNECT - Disconnect Notification.
+ * </a>
+ *
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the DISCONNECT
+ * packet, a negative value if there was a protocol violation.
+ */
+ssize_t mqtt_pack_disconnect(uint8_t *buf, size_t bufsz);
+
+
+/**
+ * @brief An enumeration of queued message states.
+ * @ingroup details
+ */
+enum MQTTQueuedMessageState {
+ MQTT_QUEUED_UNSENT,
+ MQTT_QUEUED_AWAITING_ACK,
+ MQTT_QUEUED_COMPLETE
+};
+
+/**
+ * @brief A message in a mqtt_message_queue.
+ * @ingroup details
+ */
+struct mqtt_queued_message {
+ /** @brief A pointer to the start of the message. */
+ uint8_t *start;
+
+ /** @brief The number of bytes in the message. */
+ size_t size;
+
+ /** @brief The state of the message. */
+ enum MQTTQueuedMessageState state;
+
+ /**
+ * @brief The time at which the message was sent..
+ *
+ * @note A timeout will only occur if the message is in
+ * the MQTT_QUEUED_AWAITING_ACK \c state.
+ */
+ mqtt_pal_time_t time_sent;
+
+ /**
+ * @brief The control type of the message.
+ */
+ enum MQTTControlPacketType control_type;
+
+ /**
+ * @brief The packet id of the message.
+ *
+ * @note This field is only used if the associate \c control_type has a
+ * \c packet_id field.
+ */
+ uint16_t packet_id;
+};
+
+/**
+ * @brief A message queue.
+ * @ingroup details
+ *
+ * @note This struct is used internally to manage sending messages.
+ * @note The only members the user should use are \c curr and \c curr_sz.
+ */
+struct mqtt_message_queue {
+ /**
+ * @brief The start of the message queue's memory block.
+ *
+ * @warning This member should \em not be manually changed.
+ */
+ void *mem_start;
+
+ /** @brief The end of the message queue's memory block. */
+ void *mem_end;
+
+ /**
+ * @brief A pointer to the position in the buffer you can pack bytes at.
+ *
+ * @note Immediately after packing bytes at \c curr you \em must call
+ * mqtt_mq_register.
+ */
+ uint8_t *curr;
+
+ /**
+ * @brief The number of bytes that can be written to \c curr.
+ *
+ * @note curr_sz will decrease by more than the number of bytes you write to
+ * \c curr. This is because the mqtt_queued_message structs share the
+ * same memory (and thus, a mqtt_queued_message must be allocated in
+ * the message queue's memory whenever a new message is registered).
+ */
+ size_t curr_sz;
+
+ /**
+ * @brief The tail of the array of mqtt_queued_messages's.
+ *
+ * @note This member should not be used manually.
+ */
+ struct mqtt_queued_message *queue_tail;
+};
+
+/**
+ * @brief Initialize a message queue.
+ * @ingroup details
+ *
+ * @param[out] mq The message queue to initialize.
+ * @param[in] buf The buffer for this message queue.
+ * @param[in] bufsz The number of bytes in the buffer.
+ *
+ * @relates mqtt_message_queue
+ */
+void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz);
+
+/**
+ * @brief Clear as many messages from the front of the queue as possible.
+ * @ingroup details
+ *
+ * @note Calls to this function are the \em only way to remove messages from the queue.
+ *
+ * @param mq The message queue.
+ *
+ * @relates mqtt_message_queue
+ */
+void mqtt_mq_clean(struct mqtt_message_queue *mq);
+
+/**
+ * @brief Register a message that was just added to the buffer.
+ * @ingroup details
+ *
+ * @note This function should be called immediately following a call to a packer function
+ * that returned a positive value. The positive value (number of bytes packed) should
+ * be passed to this function.
+ *
+ * @param mq The message queue.
+ * @param[in] nbytes The number of bytes that were just packed.
+ *
+ * @note This function will step mqtt_message_queue::curr and update mqtt_message_queue::curr_sz.
+ * @relates mqtt_message_queue
+ *
+ * @returns The newly added struct mqtt_queued_message.
+ */
+struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes);
+
+/**
+ * @brief Find a message in the message queue.
+ * @ingroup details
+ *
+ * @param mq The message queue.
+ * @param[in] control_type The control type of the message you want to find.
+ * @param[in] packet_id The packet ID of the message you want to find. Set to \c NULL if you
+ * don't want to specify a packet ID.
+ *
+ * @relates mqtt_message_queue
+ * @returns The found message. \c NULL if the message was not found.
+ */
+struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQTTControlPacketType control_type, uint16_t *packet_id);
+
+/**
+ * @brief Returns the mqtt_queued_message at \p index.
+ * @ingroup details
+ *
+ * @param mq_ptr A pointer to the message queue.
+ * @param index The index of the message.
+ *
+ * @returns The mqtt_queued_message at \p index.
+ */
+#define mqtt_mq_get(mq_ptr, index) (((struct mqtt_queued_message*) ((mq_ptr)->mem_end)) - 1 - index)
+
+/**
+ * @brief Returns the number of messages in the message queue, \p mq_ptr.
+ * @ingroup details
+ */
+#define mqtt_mq_length(mq_ptr) (((struct mqtt_queued_message*) ((mq_ptr)->mem_end)) - (mq_ptr)->queue_tail)
+
+/**
+ * @brief Used internally to recalculate the \c curr_sz.
+ * @ingroup details
+ */
+#define mqtt_mq_currsz(mq_ptr) (mq_ptr->curr >= (uint8_t*) ((mq_ptr)->queue_tail - 1)) ? 0 : ((uint8_t*) ((mq_ptr)->queue_tail - 1)) - (mq_ptr)->curr
+
+/* CLIENT */
+
+/**
+ * @brief An MQTT client.
+ * @ingroup details
+ *
+ * @note All members can be manipulated via the related functions.
+ */
+struct mqtt_client {
+ /** @brief The socket connecting to the MQTT broker. */
+ mqtt_pal_socket_handle socketfd;
+
+ /** @brief The LFSR state used to generate packet ID's. */
+ uint16_t pid_lfsr;
+
+ /** @brief The keep-alive time in seconds. */
+ uint16_t keep_alive;
+
+ /**
+ * @brief A counter counting pings that have been sent to keep the connection alive.
+ * @see keep_alive
+ */
+ int number_of_keep_alives;
+
+ /**
+ * @brief The timestamp of the last message sent to the buffer.
+ *
+ * This is used to detect the need for keep-alive pings.
+ *
+ * @see keep_alive
+ */
+ mqtt_pal_time_t time_of_last_send;
+
+ /**
+ * @brief The error state of the client.
+ *
+ * error should be MQTT_OK for the entirety of the connection.
+ *
+ * @note The error state will be MQTT_ERROR_CONNECT_NOT_CALLED until
+ * you call mqtt_connect.
+ */
+ enum MQTTErrors error;
+
+ /**
+ * @brief The timeout period in seconds.
+ *
+ * If the broker doesn't return an ACK within response_timeout seconds a timeout
+ * will occur and the message will be retransmitted.
+ *
+ * @note The default value is 30 [seconds] but you can change it at any time.
+ */
+ int response_timeout;
+
+ /** @brief A counter counting the number of timeouts that have occurred. */
+ int number_of_timeouts;
+
+ /**
+ * @brief Approximately much time it has typically taken to receive responses from the
+ * broker.
+ *
+ * @note This is tracked using a exponential-averaging.
+ */
+ double typical_response_time;
+
+ /**
+ * @brief The callback that is called whenever a publish is received from the broker.
+ *
+ * Any topics that you have subscribed to will be returned from the broker as
+ * mqtt_response_publish messages. All the publishes received from the broker will
+ * be passed to this function.
+ *
+ * @note A pointer to publish_response_callback_state is always passed to the callback.
+ * Use publish_response_callback_state to keep track of any state information you
+ * need.
+ */
+ void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish);
+
+ /**
+ * @brief A pointer to any publish_response_callback state information you need.
+ *
+ * @note A pointer to this pointer will always be publish_response_callback upon
+ * receiving a publish message from the broker.
+ */
+ void* publish_response_callback_state;
+
+ /**
+ * @brief A user-specified callback, triggered on each \ref mqtt_sync, allowing
+ * the user to perform state inspections (and custom socket error detection)
+ * on the client.
+ *
+ * This callback is triggered on each call to \ref mqtt_sync. If it returns MQTT_OK
+ * then \ref mqtt_sync will continue normally (performing reads and writes). If it
+ * returns an error then \ref mqtt_sync will not call reads and writes.
+ *
+ * This callback can be used to perform custom error detection, namely platform
+ * specific socket error detection, and force the client into an error state.
+ *
+ * This member is always initialized to NULL but it can be manually set at any
+ * time.
+ */
+ enum MQTTErrors (*inspector_callback)(struct mqtt_client*);
+
+ /**
+ * @brief A callback that is called whenever the client is in an error state.
+ *
+ * This callback is responsible for: application level error handling, closing
+ * previous sockets, and reestabilishing the connection to the broker and
+ * session configurations (i.e. subscriptions).
+ */
+ void (*reconnect_callback)(struct mqtt_client*, void**);
+
+ /**
+ * @brief A pointer to some state. A pointer to this member is passed to
+ * \ref mqtt_client.reconnect_callback.
+ */
+ void* reconnect_state;
+
+ /**
+ * @brief The buffer where ingress data is temporarily stored.
+ */
+ struct {
+ /** @brief The start of the receive buffer's memory. */
+ uint8_t *mem_start;
+
+ /** @brief The size of the receive buffer's memory. */
+ size_t mem_size;
+
+ /** @brief A pointer to the next writtable location in the receive buffer. */
+ uint8_t *curr;
+
+ /** @brief The number of bytes that are still writable at curr. */
+ size_t curr_sz;
+ } recv_buffer;
+
+ /**
+ * @brief A variable passed to support thread-safety.
+ *
+ * A pointer to this variable is passed to \c MQTT_PAL_MUTEX_LOCK, and
+ * \c MQTT_PAL_MUTEX_UNLOCK.
+ */
+ mqtt_pal_mutex_t mutex;
+
+ /** @brief The sending message queue. */
+ struct mqtt_message_queue mq;
+};
+
+/**
+ * @brief Generate a new next packet ID.
+ * @ingroup details
+ *
+ * Packet ID's are generated using a max-length LFSR.
+ *
+ * @param client The MQTT client.
+ *
+ * @returns The new packet ID that should be used.
+ */
+uint16_t __mqtt_next_pid(struct mqtt_client *client);
+
+/**
+ * @brief Handles egress client traffic.
+ * @ingroup details
+ *
+ * @param client The MQTT client.
+ *
+ * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+ssize_t __mqtt_send(struct mqtt_client *client);
+
+/**
+ * @brief Handles ingress client traffic.
+ * @ingroup details
+ *
+ * @param client The MQTT client.
+ *
+ * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+ssize_t __mqtt_recv(struct mqtt_client *client);
+
+/**
+ * @brief Function that does the actual sending and receiving of
+ * traffic from the network.
+ * @ingroup api
+ *
+ * All the other functions in the @ref api simply stage messages for
+ * being sent to the broker. This function does the actual sending of
+ * those messages. Additionally this function receives traffic (responses and
+ * acknowledgements) from the broker and responds to that traffic accordingly.
+ * Lastly this function also calls the \c publish_response_callback when
+ * any \c MQTT_CONTROL_PUBLISH messages are received.
+ *
+ * @pre mqtt_init must have been called.
+ *
+ * @param[in,out] client The MQTT client.
+ *
+ * @attention It is the responsibility of the application programmer to
+ * call this function periodically. All functions in the @ref api are
+ * thread-safe so it is perfectly reasonable to have a thread dedicated
+ * to calling this function every 200 ms or so. MQTT-C can be used in single
+ * threaded application though by simply calling this functino periodically
+ * inside your main thread. See @ref simple_publisher.c and @ref simple_subscriber.c
+ * for examples (specifically the \c client_refresher functions).
+ *
+ * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+enum MQTTErrors mqtt_sync(struct mqtt_client *client);
+
+/**
+ * @brief Initializes an MQTT client.
+ * @ingroup api
+ *
+ * This function \em must be called before any other API function calls.
+ *
+ * @pre None.
+ *
+ * @param[out] client The MQTT client.
+ * @param[in] sockfd The socket file descriptor (or equivalent socket handle, e.g. BIO pointer
+ * for OpenSSL sockets) connected to the MQTT broker.
+ * @param[in] sendbuf A buffer that will be used for sending messages to the broker.
+ * @param[in] sendbufsz The size of \p sendbuf in bytes.
+ * @param[in] recvbuf A buffer that will be used for receiving messages from the broker.
+ * @param[in] recvbufsz The size of \p recvbuf in bytes.
+ * @param[in] publish_response_callback The callback to call whenever application messages
+ * are received from the broker.
+ *
+ * @post mqtt_connect must be called.
+ *
+ * @note \p sockfd is a non-blocking TCP connection.
+ * @note If \p sendbuf fills up completely during runtime a \c MQTT_ERROR_SEND_BUFFER_IS_FULL
+ * error will be set. Similarly if \p recvbuf is ever to small to receive a message from
+ * the broker an MQTT_ERROR_RECV_BUFFER_TOO_SMALL error will be set.
+ * @note A pointer to \ref mqtt_client.publish_response_callback_state is always passed as the
+ * \c state argument to \p publish_response_callback. Note that the second argument is
+ * the mqtt_response_publish that was received from the broker.
+ *
+ * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or
+ * \ref mqtt_init_reconnect more than once per client).
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+enum MQTTErrors mqtt_init(struct mqtt_client *client,
+ mqtt_pal_socket_handle sockfd,
+ uint8_t *sendbuf, size_t sendbufsz,
+ uint8_t *recvbuf, size_t recvbufsz,
+ void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish));
+
+/**
+ * @brief Initializes an MQTT client and enables automatic reconnections.
+ * @ingroup api
+ *
+ * An alternative to \ref mqtt_init that allows the client to automatically reconnect to the
+ * broker after an error occurs (e.g. socket error or internal buffer overflows).
+ *
+ * This is accomplished by calling the \p reconnect_callback whenever the client enters an error
+ * state. The job of the \p reconnect_callback is to: (1) perform error handling/logging,
+ * (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the
+ * client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other
+ * API calls such as \ref mqtt_subscribe.
+ *
+ * The first argument to the \p reconnect_callback is the client (which will be in an error
+ * state) and the second argument is a pointer to a void pointer where you can store some state
+ * information. Internally, MQTT-C calls the reconnect callback like so:
+ *
+ * \code
+ * client->reconnect_callback(client, &client->reconnect_state)
+ * \endcode
+ *
+ * Note that the \p reconnect_callback is also called to setup the initial session. After
+ * calling \ref mqtt_init_reconnect the client will be in the error state
+ * \c MQTT_ERROR_INITIAL_RECONNECT.
+ *
+ * @pre None.
+ *
+ * @param[in,out] client The MQTT client that will be initialized.
+ * @param[in] reconnect_callback The callback that will be called to connect/reconnect the
+ * client to the broker and perform application level error handling.
+ * @param[in] reconnect_state A pointer to some state data for your \p reconnect_callback.
+ * If your \p reconnect_callback does not require any state information set this
+ * to NULL. A pointer to the memory address where the client stores a copy of this
+ * pointer is passed as the second argumnet to \p reconnect_callback.
+ * @param[in] publish_response_callback The callback to call whenever application messages
+ * are received from the broker.
+ *
+ * @post Call \p reconnect_callback yourself, or call \ref mqtt_sync
+ * (which will trigger the call to \p reconnect_callback).
+ *
+ * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or
+ * \ref mqtt_init_reconnect more than once per client).
+ *
+ */
+void mqtt_init_reconnect(struct mqtt_client *client,
+ void (*reconnect_callback)(struct mqtt_client *client, void** state),
+ void *reconnect_state,
+ void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish));
+
+/**
+ * @brief Safely assign/reassign a socket and buffers to an new/existing client.
+ * @ingroup api
+ *
+ * This function also clears the \p client error state. Upon exiting this function
+ * \c client->error will be \c MQTT_ERROR_CONNECT_NOT_CALLED (which will be cleared)
+ * as soon as \ref mqtt_connect is called.
+ *
+ * @pre This function must be called BEFORE \ref mqtt_connect.
+ *
+ * @param[in,out] client The MQTT client.
+ * @param[in] socketfd The new socket connected to the broker.
+ * @param[in] sendbuf The buffer that will be used to buffer egress traffic to the broker.
+ * @param[in] sendbufsz The size of \p sendbuf in bytes.
+ * @param[in] recvbuf The buffer that will be used to buffer ingress traffic from the broker.
+ * @param[in] recvbufsz The size of \p recvbuf in bytes.
+ *
+ * @post Call \ref mqtt_connect.
+ *
+ * @attention This function should be used in conjunction with clients that have been
+ * initialzed with \ref mqtt_init_reconnect.
+ */
+void mqtt_reinit(struct mqtt_client* client,
+ mqtt_pal_socket_handle socketfd,
+ uint8_t *sendbuf, size_t sendbufsz,
+ uint8_t *recvbuf, size_t recvbufsz);
+
+/**
+ * @brief Establishes a session with the MQTT broker.
+ * @ingroup api
+ *
+ * @pre mqtt_init must have been called.
+ *
+ * @param[in,out] client The MQTT client.
+ * @param[in] client_id The unique name identifying the client.
+ * @param[in] will_topic The topic name of client's \p will_message. If no will message is
+ * desired set to \c NULL.
+ * @param[in] will_message The application message (data) to be published in the event the
+ * client ungracefully disconnects. Set to \c NULL if \p will_topic is \c NULL.
+ * @param[in] will_message_size The size of \p will_message in bytes.
+ * @param[in] user_name The username to use when establishing the session with the MQTT broker.
+ * Set to \c NULL if a username is not required.
+ * @param[in] password The password to use when establishing the session with the MQTT broker.
+ * Set to \c NULL if a password is not required.
+ * @param[in] connect_flags Additional \ref MQTTConnectFlags to use when establishing the connection.
+ * These flags are for forcing the session to start clean,
+ * \c MQTT_CONNECT_CLEAN_SESSION, the QOS level to publish the \p will_message with
+ * (provided \c will_message != \c NULL), MQTT_CONNECT_WILL_QOS_[0,1,2], and whether
+ * or not the broker should retain the \c will_message, MQTT_CONNECT_WILL_RETAIN.
+ * @param[in] keep_alive The keep-alive time in seconds. A reasonable value for this is 400 [seconds].
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+enum MQTTErrors mqtt_connect(struct mqtt_client *client,
+ const char* client_id,
+ const char* will_topic,
+ const void* will_message,
+ size_t will_message_size,
+ const char* user_name,
+ const char* password,
+ uint8_t connect_flags,
+ uint16_t keep_alive);
+
+/*
+ todo: will_message should be a void*
+*/
+
+/**
+ * @brief Publish an application message.
+ * @ingroup api
+ *
+ * Publishes an application message to the MQTT broker.
+ *
+ * @pre mqtt_connect must have been called.
+ *
+ * @param[in,out] client The MQTT client.
+ * @param[in] topic_name The name of the topic.
+ * @param[in] application_message The data to be published.
+ * @param[in] application_message_size The size of \p application_message in bytes.
+ * @param[in] publish_flags \ref MQTTPublishFlags to be used, namely the QOS level to
+ * publish at (MQTT_PUBLISH_QOS_[0,1,2]) or whether or not the broker should
+ * retain the publish (MQTT_PUBLISH_RETAIN).
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+enum MQTTErrors mqtt_publish(struct mqtt_client *client,
+ const char* topic_name,
+ void* application_message,
+ size_t application_message_size,
+ uint8_t publish_flags);
+
+/**
+ * @brief Acknowledge an ingree publish with QOS==1.
+ * @ingroup details
+ *
+ * @param[in,out] client The MQTT client.
+ * @param[in] packet_id The packet ID of the ingress publish being acknowledged.
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id);
+
+/**
+ * @brief Acknowledge an ingree publish with QOS==2.
+ * @ingroup details
+ *
+ * @param[in,out] client The MQTT client.
+ * @param[in] packet_id The packet ID of the ingress publish being acknowledged.
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id);
+
+/**
+ * @brief Acknowledge an ingree PUBREC packet.
+ * @ingroup details
+ *
+ * @param[in,out] client The MQTT client.
+ * @param[in] packet_id The packet ID of the ingress PUBREC being acknowledged.
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id);
+
+/**
+ * @brief Acknowledge an ingree PUBREL packet.
+ * @ingroup details
+ *
+ * @param[in,out] client The MQTT client.
+ * @param[in] packet_id The packet ID of the ingress PUBREL being acknowledged.
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id);
+
+
+/**
+ * @brief Subscribe to a topic.
+ * @ingroup api
+ *
+ * @pre mqtt_connect must have been called.
+ *
+ * @param[in,out] client The MQTT client.
+ * @param[in] topic_name The name of the topic to subscribe to.
+ * @param[in] max_qos_level The maximum QOS level with which the broker can send application
+ * messages for this topic.
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+enum MQTTErrors mqtt_subscribe(struct mqtt_client *client,
+ const char* topic_name,
+ int max_qos_level);
+
+/**
+ * @brief Unsubscribe from a topic.
+ * @ingroup api
+ *
+ * @pre mqtt_connect must have been called.
+ *
+ * @param[in,out] client The MQTT client.
+ * @param[in] topic_name The name of the topic to unsubscribe from.
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client,
+ const char* topic_name);
+
+/**
+ * @brief Ping the broker.
+ * @ingroup api
+ *
+ * @pre mqtt_connect must have been called.
+ *
+ * @param[in,out] client The MQTT client.
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+enum MQTTErrors mqtt_ping(struct mqtt_client *client);
+
+/**
+ * @brief Ping the broker without locking/unlocking the mutex.
+ * @see mqtt_ping
+ */
+enum MQTTErrors __mqtt_ping(struct mqtt_client *client);
+
+/**
+ * @brief Terminate the session with the MQTT broker.
+ * @ingroup api
+ *
+ * @pre mqtt_connect must have been called.
+ *
+ * @param[in,out] client The MQTT client.
+ *
+ * @note To re-establish the session, mqtt_connect must be called.
+ *
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
+ */
+enum MQTTErrors mqtt_disconnect(struct mqtt_client *client);
+
+#endif
diff --git a/include/netutils/mqtt_pal.h b/include/netutils/mqtt_pal.h
new file mode 100644
index 00000000..5c83d64a
--- /dev/null
+++ b/include/netutils/mqtt_pal.h
@@ -0,0 +1,99 @@
+#ifndef __MQTT_PAL_H__
+#define __MQTT_PAL_H__
+
+/**
+ * @file
+ * @brief Includes/supports the types/calls required by the MQTT-C client.
+ *
+ * @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore
+ * responsible for including/supporting all the required types and calls.
+ *
+ * @defgroup pal Platform abstraction layer
+ * @brief Documentation of the types and calls required to port MQTT-C to a new platform.
+ *
+ * mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a
+ * new platform the following types, functions, constants, and macros must be defined in
+ * mqtt_pal.h:
+ * - Types:
+ * - \c size_t, \c ssize_t
+ * - \c uint8_t, \c uint16_t, \c uint32_t
+ * - \c va_list
+ * - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME()
+ * - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and
+ * \c MQTT_PAL_MUTEX_RELEASE
+ * - Functions:
+ * - \c memcpy, \c strlen
+ * - \c va_start, \c va_arg, \c va_end
+ * - Constants:
+ * - \c INT_MIN
+ *
+ * Additionally, three macro's are required:
+ * - \c MQTT_PAL_HTONS(s) : host-to-network endian conversion for uint16_t.
+ * - \c MQTT_PAL_NTOHS(s) : network-to-host endian conversion for uint16_t.
+ * - \c MQTT_PAL_TIME() : returns [type: \c mqtt_pal_time_t] current time in seconds.
+ * - \c MQTT_PAL_MUTEX_LOCK(mtx_pointer) : macro that locks the mutex pointed to by \c mtx_pointer.
+ * - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by
+ * \c mtx_pointer.
+ *
+ * Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c
+ * for sending and receiving data using the platforms socket calls.
+ */
+
+
+/* UNIX-like platform support */
+#define __unix__ 1
+#ifdef __unix__
+ #include <limits.h>
+ #include <string.h>
+ #include <stdarg.h>
+ #include <time.h>
+ #include <arpa/inet.h>
+ #include <pthread.h>
+
+ #define MQTT_PAL_HTONS(s) htons(s)
+ #define MQTT_PAL_NTOHS(s) ntohs(s)
+
+ #define MQTT_PAL_TIME() time(NULL)
+
+ typedef time_t mqtt_pal_time_t;
+ typedef pthread_mutex_t mqtt_pal_mutex_t;
+
+ #define MQTT_PAL_MUTEX_INIT(mtx_ptr) pthread_mutex_init(mtx_ptr, NULL)
+ #define MQTT_PAL_MUTEX_LOCK(mtx_ptr) pthread_mutex_lock(mtx_ptr)
+ #define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) pthread_mutex_unlock(mtx_ptr)
+
+ #ifdef MQTT_USE_BIO
+ #include <openssl/bio.h>
+ typedef BIO* mqtt_pal_socket_handle;
+ #else
+ typedef int mqtt_pal_socket_handle;
+ #endif
+#endif
+
+/**
+ * @brief Sends all the bytes in a buffer.
+ * @ingroup pal
+ *
+ * @param[in] fd The file-descriptor (or handle) of the socket.
+ * @param[in] buf A pointer to the first byte in the buffer to send.
+ * @param[in] len The number of bytes to send (starting at \p buf).
+ * @param[in] flags Flags which are passed to the underlying socket.
+ *
+ * @returns The number of bytes sent if successful, an \ref MQTTErrors otherwise.
+ */
+ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags);
+
+/**
+ * @brief Non-blocking receive all the byte available.
+ * @ingroup pal
+ *
+ * @param[in] fd The file-descriptor (or handle) of the socket.
+ * @param[in] buf A pointer to the receive buffer.
+ * @param[in] bufsz The max number of bytes that can be put into \p buf.
+ * @param[in] flags Flags which are passed to the underlying socket.
+ *
+ * @returns The number of bytes received if successful, an \ref MQTTErrors otherwise.
+ */
+ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags);
+
+#endif
diff --git a/netutils/mqttc/Kconfig b/netutils/mqttc/Kconfig
new file mode 100644
index 00000000..35723ba6
--- /dev/null
+++ b/netutils/mqttc/Kconfig
@@ -0,0 +1,19 @@
+#
+# For a description of the syntax of this configuration file,
+# see the file kconfig-language.txt in the NuttX tools repository.
+#
+
+config NETUTILS_MQTTC
+ bool "MQTTC client library"
+ default n
+ depends on NET && NET_TCP
+ ---help---
+ Enable support for the mqtt. This is a public domain
+ Telnet client library available from https://github.com/LiamBindle/MQTT-C
+ modified for use with NuttX. Original Authors:
+
+ Liam Bindle
+ Demilade Adeoye
+
+if NETUTILS_MQTTC
+endif
diff --git a/netutils/mqttc/Make.defs b/netutils/mqttc/Make.defs
new file mode 100644
index 00000000..3832c0fd
--- /dev/null
+++ b/netutils/mqttc/Make.defs
@@ -0,0 +1,39 @@
+# apps/netutils/telnetc/Make.defs
+# Adds selected applications to apps/ build
+#
+# Copyright (C) 2017 Gregory Nutt. All rights reserved.
+# Author: Gregory Nutt <gnutt@nuttx.org>
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# 3. Neither the name NuttX nor the names of its contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+############################################################################
+
+ifeq ($(CONFIG_NETUTILS_MQTTC),y)
+CONFIGURED_APPS += netutils/mqttc
+endif
+
diff --git a/netutils/mqttc/Makefile b/netutils/mqttc/Makefile
new file mode 100644
index 00000000..b29bc0e5
--- /dev/null
+++ b/netutils/mqttc/Makefile
@@ -0,0 +1,106 @@
+############################################################################
+# apps/netutils/mqttc/Makefile
+#
+# Copyright (C) 2015 Max Nekludov. All rights reserved.
+# Author: Max Nekludov <macscomp@gmail.com>
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# 3. Neither the name NuttX nor the names of its contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+############################################################################
+
+-include $(TOPDIR)/.config
+-include $(TOPDIR)/Make.defs
+include $(APPDIR)/Make.defs
+
+ASRCS =
+CSRCS =
+
+CSRCS += mqtt.c mqtt_pal.c
+
+ifeq ($(CONFIG_NETUTILS_MQTTC),y)
+#CSRCS += pap.c
+CSRCS +=
+endif
+
+AOBJS = $(ASRCS:.S=$(OBJEXT))
+COBJS = $(CSRCS:.c=$(OBJEXT))
+
+SRCS = $(ASRCS) $(CSRCS)
+OBJS = $(AOBJS) $(COBJS)
+
+ifeq ($(CONFIG_WINDOWS_NATIVE),y)
+ BIN = ..\..\libapps$(LIBEXT)
+else
+ifeq ($(WINTOOL),y)
+ BIN = ..\\..\\libapps$(LIBEXT)
+else
+ BIN = ../../libapps$(LIBEXT)
+endif
+endif
+
+ROOTDEPPATH = --dep-path .
+
+# Common build
+
+VPATH =
+
+all: .built
+.PHONY: context depend clean distclean preconfig
+.PRECIOUS: ../../libapps$(LIBEXT)
+
+$(AOBJS): %$(OBJEXT): %.S
+ $(call ASSEMBLE, $<, $@)
+
+$(COBJS): %$(OBJEXT): %.c
+ $(call COMPILE, $<, $@)
+
+.built: $(OBJS)
+ $(call ARCHIVE, $(BIN), $(OBJS))
+ $(Q) touch .built
+
+install:
+
+context:
+
+.depend: Makefile $(SRCS)
+ $(Q) $(MKDEP) $(ROOTDEPPATH) "$(CC)" -- $(CFLAGS) -- $(SRCS) >Make.dep
+ $(Q) touch $@
+
+depend: .depend
+
+clean:
+ $(call DELFILE, .built)
+ $(call CLEAN)
+
+distclean: clean
+ $(call DELFILE, Make.dep)
+ $(call DELFILE, .depend)
+
+preconfig:
+
+-include Make.dep
diff --git a/netutils/mqttc/mqtt.c b/netutils/mqttc/mqtt.c
new file mode 100644
index 00000000..d876abb4
--- /dev/null
+++ b/netutils/mqttc/mqtt.c
@@ -0,0 +1,1663 @@
+#include "netutils/mqtt.h"
+
+/**
+ * @file
+ * @brief Implements the functionality of MQTT-C.
+ * @note The only files that are included are mqtt.h and mqtt_pal.h.
+ *
+ * @cond Doxygen_Suppress
+ */
+
+enum MQTTErrors mqtt_sync(struct mqtt_client *client) {
+ /* Recover from any errors */
+ MQTT_PAL_MUTEX_LOCK(&client->mutex);
+ if (client->error != MQTT_OK && client->reconnect_callback != NULL) {
+ client->reconnect_callback(client, &client->reconnect_state);
+ /* unlocked during CONNECT */
+ } else {
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ }
+
+ /* Call inspector callback if necessary */
+ enum MQTTErrors err;
+ if (client->inspector_callback != NULL) {
+ MQTT_PAL_MUTEX_LOCK(&client->mutex);
+ err = client->inspector_callback(client);
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ if (err != MQTT_OK) return err;
+ }
+ /* Call receive */
+ err = __mqtt_recv(client);
+ if (err != MQTT_OK) return err;
+ /* Call send */
+ err = __mqtt_send(client);
+ return err;
+}
+
+uint16_t __mqtt_next_pid(struct mqtt_client *client) {
+ if (client->pid_lfsr == 0) {
+ client->pid_lfsr = 163u;
+ }
+ /* LFSR taps taken from: https://en.wikipedia.org/wiki/Linear-feedback_shift_register */
+ int pid_exists = 0;
+ do {
+ unsigned lsb = client->pid_lfsr & 1;
+ (client->pid_lfsr) >>= 1;
+ if (lsb) {
+ client->pid_lfsr ^= 0xB400u;
+ }
+
+ /* check that the PID is unique */
+ pid_exists = 0;
+ struct mqtt_queued_message *curr;
+ for(curr = mqtt_mq_get(&(client->mq), 0); curr >= client->mq.queue_tail; --curr) {
+ if (curr->packet_id == client->pid_lfsr) {
+ pid_exists = 1;
+ break;
+ }
+ }
+
+ } while(pid_exists);
+ return client->pid_lfsr;
+}
+
+enum MQTTErrors mqtt_init(struct mqtt_client *client,
+ mqtt_pal_socket_handle sockfd,
+ uint8_t *sendbuf, size_t sendbufsz,
+ uint8_t *recvbuf, size_t recvbufsz,
+ void (*publish_response_callback)(void** state,struct mqtt_response_publish *publish))
+{
+ if (client == NULL || sendbuf == NULL || recvbuf == NULL) {
+ return MQTT_ERROR_NULLPTR;
+ }
+
+ /* initialize mutex */
+ MQTT_PAL_MUTEX_INIT(&client->mutex);
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); /* unlocked during CONNECT */
+
+ client->socketfd = sockfd;
+
+ mqtt_mq_init(&client->mq, sendbuf, sendbufsz);
+
+ client->recv_buffer.mem_start = recvbuf;
+ client->recv_buffer.mem_size = recvbufsz;
+ client->recv_buffer.curr = client->recv_buffer.mem_start;
+ client->recv_buffer.curr_sz = client->recv_buffer.mem_size;
+
+ client->error = MQTT_ERROR_CONNECT_NOT_CALLED;
+ client->response_timeout = 30;
+ client->number_of_timeouts = 0;
+ client->number_of_keep_alives = 0;
+ client->typical_response_time = -1.0;
+ client->publish_response_callback = publish_response_callback;
+
+ client->inspector_callback = NULL;
+ client->reconnect_callback = NULL;
+ client->reconnect_state = NULL;
+
+ return MQTT_OK;
+}
+
+void mqtt_init_reconnect(struct mqtt_client *client,
+ void (*reconnect)(struct mqtt_client *, void**),
+ void *reconnect_state,
+ void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish))
+{
+ /* initialize mutex */
+ MQTT_PAL_MUTEX_INIT(&client->mutex);
+
+ client->socketfd = (mqtt_pal_socket_handle) -1;
+
+ mqtt_mq_init(&client->mq, NULL, 0);
+
+ client->recv_buffer.mem_start = NULL;
+ client->recv_buffer.mem_size = 0;
+ client->recv_buffer.curr = NULL;
+ client->recv_buffer.curr_sz = 0;
+
+ client->error = MQTT_ERROR_INITIAL_RECONNECT;
+ client->response_timeout = 30;
+ client->number_of_timeouts = 0;
+ client->number_of_keep_alives = 0;
+ client->typical_response_time = -1.0;
+ client->publish_response_callback = publish_response_callback;
+
+ client->inspector_callback = NULL;
+ client->reconnect_callback = reconnect;
+ client->reconnect_state = reconnect_state;
+}
+
+void mqtt_reinit(struct mqtt_client* client,
+ mqtt_pal_socket_handle socketfd,
+ uint8_t *sendbuf, size_t sendbufsz,
+ uint8_t *recvbuf, size_t recvbufsz)
+{
+ client->error = MQTT_ERROR_CONNECT_NOT_CALLED;
+ client->socketfd = socketfd;
+
+ mqtt_mq_init(&client->mq, sendbuf, sendbufsz);
+
+ client->recv_buffer.mem_start = recvbuf;
+ client->recv_buffer.mem_size = recvbufsz;
+ client->recv_buffer.curr = client->recv_buffer.mem_start;
+ client->recv_buffer.curr_sz = client->recv_buffer.mem_size;
+}
+
+/**
+ * A macro function that:
+ * 1) Checks that the client isn't in an error state.
+ * 2) Attempts to pack to client's message queue.
+ * a) handles errors
+ * b) if mq buffer is too small, cleans it and tries again
+ * 3) Upon successful pack, registers the new message.
+ */
+#define MQTT_CLIENT_TRY_PACK(tmp, msg, client, pack_call, release) \
+ if (client->error < 0) { \
+ if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
+ return client->error; \
+ } \
+ tmp = pack_call; \
+ if (tmp < 0) { \
+ client->error = tmp; \
+ if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
+ return tmp; \
+ } else if (tmp == 0) { \
+ mqtt_mq_clean(&client->mq); \
+ tmp = pack_call; \
+ if (tmp < 0) { \
+ client->error = tmp; \
+ if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
+ return tmp; \
+ } else if(tmp == 0) { \
+ client->error = MQTT_ERROR_SEND_BUFFER_IS_FULL; \
+ if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
+ return MQTT_ERROR_SEND_BUFFER_IS_FULL; \
+ } \
+ } \
+ msg = mqtt_mq_register(&client->mq, tmp); \
+
+
+enum MQTTErrors mqtt_connect(struct mqtt_client *client,
+ const char* client_id,
+ const char* will_topic,
+ const void* will_message,
+ size_t will_message_size,
+ const char* user_name,
+ const char* password,
+ uint8_t connect_flags,
+ uint16_t keep_alive)
+{
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+
+ /* Note: Current thread already has mutex locked. */
+
+ /* update the client's state */
+ client->keep_alive = keep_alive;
+ if (client->error == MQTT_ERROR_CONNECT_NOT_CALLED) {
+ client->error = MQTT_OK;
+ }
+
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(rv, msg, client,
+ mqtt_pack_connection_request(
+ client->mq.curr, client->mq.curr_sz,
+ client_id, will_topic, will_message,
+ will_message_size,user_name, password,
+ connect_flags, keep_alive
+ ),
+ 1
+ );
+ /* save the control type of the message */
+ msg->control_type = MQTT_CONTROL_CONNECT;
+
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_OK;
+}
+
+enum MQTTErrors mqtt_publish(struct mqtt_client *client,
+ const char* topic_name,
+ void* application_message,
+ size_t application_message_size,
+ uint8_t publish_flags)
+{
+ MQTT_PAL_MUTEX_LOCK(&client->mutex);
+ uint16_t packet_id = __mqtt_next_pid(client);
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+ printf("\npacket_id = 0x%08X,%d\n",packet_id);
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(
+ rv, msg, client,
+ mqtt_pack_publish_request(
+ client->mq.curr, client->mq.curr_sz,
+ topic_name,
+ packet_id,
+ application_message,
+ application_message_size,
+ publish_flags
+ ),
+ 1
+ );
+ /* save the control type and packet id of the message */
+ msg->control_type = MQTT_CONTROL_PUBLISH;
+ msg->packet_id = packet_id;
+
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_OK;
+}
+
+ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id) {
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(
+ rv, msg, client,
+ mqtt_pack_pubxxx_request(
+ client->mq.curr, client->mq.curr_sz,
+ MQTT_CONTROL_PUBACK,
+ packet_id
+ ),
+ 0
+ );
+ /* save the control type and packet id of the message */
+ msg->control_type = MQTT_CONTROL_PUBACK;
+ msg->packet_id = packet_id;
+
+ return MQTT_OK;
+}
+
+ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id) {
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(
+ rv, msg, client,
+ mqtt_pack_pubxxx_request(
+ client->mq.curr, client->mq.curr_sz,
+ MQTT_CONTROL_PUBREC,
+ packet_id
+ ),
+ 0
+ );
+ /* save the control type and packet id of the message */
+ msg->control_type = MQTT_CONTROL_PUBREC;
+ msg->packet_id = packet_id;
+
+ return MQTT_OK;
+}
+
+ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id) {
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(
+ rv, msg, client,
+ mqtt_pack_pubxxx_request(
+ client->mq.curr, client->mq.curr_sz,
+ MQTT_CONTROL_PUBREL,
+ packet_id
+ ),
+ 0
+ );
+ /* save the control type and packet id of the message */
+ msg->control_type = MQTT_CONTROL_PUBREL;
+ msg->packet_id = packet_id;
+
+ return MQTT_OK;
+}
+
+ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id) {
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(
+ rv, msg, client,
+ mqtt_pack_pubxxx_request(
+ client->mq.curr, client->mq.curr_sz,
+ MQTT_CONTROL_PUBCOMP,
+ packet_id
+ ),
+ 0
+ );
+ /* save the control type and packet id of the message */
+ msg->control_type = MQTT_CONTROL_PUBCOMP;
+ msg->packet_id = packet_id;
+
+ return MQTT_OK;
+}
+
+enum MQTTErrors mqtt_subscribe(struct mqtt_client *client,
+ const char* topic_name,
+ int max_qos_level)
+{
+ MQTT_PAL_MUTEX_LOCK(&client->mutex);
+ uint16_t packet_id = __mqtt_next_pid(client);
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(
+ rv, msg, client,
+ mqtt_pack_subscribe_request(
+ client->mq.curr, client->mq.curr_sz,
+ packet_id,
+ topic_name,
+ max_qos_level,
+ NULL
+ ),
+ 1
+ );
+ /* save the control type and packet id of the message */
+ msg->control_type = MQTT_CONTROL_SUBSCRIBE;
+ msg->packet_id = packet_id;
+
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_OK;
+}
+
+enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client,
+ const char* topic_name)
+{
+ MQTT_PAL_MUTEX_LOCK(&client->mutex);
+ uint16_t packet_id = __mqtt_next_pid(client);
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(
+ rv, msg, client,
+ mqtt_pack_unsubscribe_request(
+ client->mq.curr, client->mq.curr_sz,
+ packet_id,
+ topic_name,
+ NULL
+ ),
+ 1
+ );
+ /* save the control type and packet id of the message */
+ msg->control_type = MQTT_CONTROL_UNSUBSCRIBE;
+ msg->packet_id = packet_id;
+
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_OK;
+}
+
+enum MQTTErrors mqtt_ping(struct mqtt_client *client) {
+ enum MQTTErrors rv;
+ MQTT_PAL_MUTEX_LOCK(&client->mutex);
+ rv = __mqtt_ping(client);
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return rv;
+}
+
+enum MQTTErrors __mqtt_ping(struct mqtt_client *client)
+{
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(
+ rv, msg, client,
+ mqtt_pack_ping_request(
+ client->mq.curr, client->mq.curr_sz
+ ),
+ 0
+ );
+ /* save the control type and packet id of the message */
+ msg->control_type = MQTT_CONTROL_PINGREQ;
+
+
+ return MQTT_OK;
+}
+
+enum MQTTErrors mqtt_disconnect(struct mqtt_client *client)
+{
+ MQTT_PAL_MUTEX_LOCK(&client->mutex);
+ ssize_t rv;
+ struct mqtt_queued_message *msg;
+
+ /* try to pack the message */
+ MQTT_CLIENT_TRY_PACK(
+ rv, msg, client,
+ mqtt_pack_disconnect(
+ client->mq.curr, client->mq.curr_sz
+ ),
+ 1
+ );
+ /* save the control type and packet id of the message */
+ msg->control_type = MQTT_CONTROL_DISCONNECT;
+
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_OK;
+}
+
+ssize_t __mqtt_send(struct mqtt_client *client)
+{
+ MQTT_PAL_MUTEX_LOCK(&client->mutex);
+ uint8_t inspected;
+ int i;
+ if (client->error < 0 && client->error != MQTT_ERROR_SEND_BUFFER_IS_FULL) {
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return client->error;
+ }
+
+ /* loop through all messages in the queue */
+ int len = mqtt_mq_length(&client->mq);
+ int inflight_qos2 = 0;
+ for(i = 0; i < len; ++i) {
+ struct mqtt_queued_message *msg = mqtt_mq_get(&client->mq, i);
+ int resend = 0;
+ if (msg->state == MQTT_QUEUED_UNSENT) {
+ /* message has not been sent to lets send it */
+ resend = 1;
+ } else if (msg->state == MQTT_QUEUED_AWAITING_ACK) {
+ /* check for timeout */
+ if (MQTT_PAL_TIME() > msg->time_sent + client->response_timeout) {
+ resend = 1;
+ client->number_of_timeouts += 1;
+ }
+ }
+
+ /* only send QoS 2 message if there are no inflight QoS 2 PUBLISH messages */
+ if (msg->control_type == MQTT_CONTROL_PUBLISH
+ && (msg->state == MQTT_QUEUED_UNSENT || msg->state == MQTT_QUEUED_AWAITING_ACK))
+ {
+ inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */
+ if (inspected == 2) {
+ if (inflight_qos2) {
+ resend = 0;
+ }
+ inflight_qos2 = 1;
+ }
+ }
+
+ /* goto next message if we don't need to send */
+ if (!resend) {
+ continue;
+ }
+
+ /* we're sending the message */
+ ssize_t tmp = mqtt_pal_sendall(client->socketfd, msg->start, msg->size, 0);
+ if (tmp < 0) {
+ client->error = tmp;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return tmp;
+ }
+
+ /* update timeout watcher */
+ client->time_of_last_send = MQTT_PAL_TIME();
+ msg->time_sent = client->time_of_last_send;
+
+ /*
+ Determine the state to put the message in.
+ Control Types:
+ MQTT_CONTROL_CONNECT -> awaiting
+ MQTT_CONTROL_CONNACK -> n/a
+ MQTT_CONTROL_PUBLISH -> qos == 0 ? complete : awaiting
+ MQTT_CONTROL_PUBACK -> complete
+ MQTT_CONTROL_PUBREC -> awaiting
+ MQTT_CONTROL_PUBREL -> awaiting
+ MQTT_CONTROL_PUBCOMP -> complete
+ MQTT_CONTROL_SUBSCRIBE -> awaiting
+ MQTT_CONTROL_SUBACK -> n/a
+ MQTT_CONTROL_UNSUBSCRIBE -> awaiting
+ MQTT_CONTROL_UNSUBACK -> n/a
+ MQTT_CONTROL_PINGREQ -> awaiting
+ MQTT_CONTROL_PINGRESP -> n/a
+ MQTT_CONTROL_DISCONNECT -> complete
+ */
+ switch (msg->control_type) {
+ case MQTT_CONTROL_PUBACK:
+ case MQTT_CONTROL_PUBCOMP:
+ case MQTT_CONTROL_DISCONNECT:
+ msg->state = MQTT_QUEUED_COMPLETE;
+ break;
+ case MQTT_CONTROL_PUBLISH:
+ inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */
+ if (inspected == 0) {
+ msg->state = MQTT_QUEUED_COMPLETE;
+ } else if (inspected == 1) {
+ msg->state = MQTT_QUEUED_AWAITING_ACK;
+ /*set DUP flag for subsequent sends */
+ msg->start[1] |= MQTT_PUBLISH_DUP;
+ } else {
+ msg->state = MQTT_QUEUED_AWAITING_ACK;
+ }
+ break;
+ case MQTT_CONTROL_CONNECT:
+ case MQTT_CONTROL_PUBREC:
+ case MQTT_CONTROL_PUBREL:
+ case MQTT_CONTROL_SUBSCRIBE:
+ case MQTT_CONTROL_UNSUBSCRIBE:
+ case MQTT_CONTROL_PINGREQ:
+ msg->state = MQTT_QUEUED_AWAITING_ACK;
+ break;
+ default:
+ client->error = MQTT_ERROR_MALFORMED_REQUEST;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_MALFORMED_REQUEST;
+ }
+ }
+
+ /* check for keep-alive */
+ mqtt_pal_time_t keep_alive_timeout = client->time_of_last_send + (mqtt_pal_time_t) ((float) (client->keep_alive) * 0.75);
+ if (MQTT_PAL_TIME() > keep_alive_timeout) {
+ ssize_t rv = __mqtt_ping(client);
+ if (rv != MQTT_OK) {
+ client->error = rv;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return rv;
+ }
+ }
+
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_OK;
+}
+
+ssize_t __mqtt_recv(struct mqtt_client *client)
+{
+ MQTT_PAL_MUTEX_LOCK(&client->mutex);
+ struct mqtt_response response;
+ /* read until there is nothing left to read */
+ while(1) {
+ /* read in as many bytes as possible */
+ ssize_t rv, consumed;
+
+ rv = mqtt_pal_recvall(client->socketfd, client->recv_buffer.curr, client->recv_buffer.curr_sz, 0);
+ if (rv < 0) {
+ /* an error occurred */
+ client->error = rv;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return rv;
+ } else {
+ client->recv_buffer.curr += rv;
+ client->recv_buffer.curr_sz -= rv;
+ }
+
+ /* attempt to parse */
+ consumed = mqtt_unpack_response(&response, client->recv_buffer.mem_start, client->recv_buffer.curr - client->recv_buffer.mem_start);
+
+ if (consumed < 0) {
+ client->error = consumed;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return consumed;
+ } else if (consumed == 0) {
+ /* if curr_sz is 0 then the buffer is too small to ever fit the message */
+ if (client->recv_buffer.curr_sz == 0) {
+ client->error = MQTT_ERROR_RECV_BUFFER_TOO_SMALL;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_RECV_BUFFER_TOO_SMALL;
+ }
+
+ /* just need to wait for the rest of the data */
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_OK;
+ }
+
+ /* response was unpacked successfully */
+ struct mqtt_queued_message *msg = NULL;
+
+ /*
+ The switch statement below manages how the client responds to messages from the broker.
+
+ Control Types (that we expect to receive from the broker):
+ MQTT_CONTROL_CONNACK:
+ -> release associated CONNECT
+ -> handle response
+ MQTT_CONTROL_PUBLISH:
+ -> stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2
+ -> call publish callback
+ MQTT_CONTROL_PUBACK:
+ -> release associated PUBLISH
+ MQTT_CONTROL_PUBREC:
+ -> release PUBLISH
+ -> stage PUBREL
+ MQTT_CONTROL_PUBREL:
+ -> release associated PUBREC
+ -> stage PUBCOMP
+ MQTT_CONTROL_PUBCOMP:
+ -> release PUBREL
+ MQTT_CONTROL_SUBACK:
+ -> release SUBSCRIBE
+ -> handle response
+ MQTT_CONTROL_UNSUBACK:
+ -> release UNSUBSCRIBE
+ MQTT_CONTROL_PINGRESP:
+ -> release PINGREQ
+ */
+ switch (response.fixed_header.control_type) {
+ case MQTT_CONTROL_CONNACK:
+ /* release associated CONNECT */
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_CONNECT, NULL);
+ if (msg == NULL) {
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_ACK_OF_UNKNOWN;
+ }
+ msg->state = MQTT_QUEUED_COMPLETE;
+ /* initialize typical response time */
+ client->typical_response_time = (double) (MQTT_PAL_TIME() - msg->time_sent);
+ /* check that connection was successful */
+ if (response.decoded.connack.return_code != MQTT_CONNACK_ACCEPTED) {
+ client->error = MQTT_ERROR_CONNECTION_REFUSED;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_CONNECTION_REFUSED;
+ }
+ break;
+ case MQTT_CONTROL_PUBLISH:
+ /* stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2 */
+ if (response.decoded.publish.qos_level == 1) {
+ rv = __mqtt_puback(client, response.decoded.publish.packet_id);
+ if (rv != MQTT_OK) {
+ client->error = rv;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return rv;
+ }
+ } else if (response.decoded.publish.qos_level == 2) {
+ /* check if this is a duplicate */
+ if (mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREC, &response.decoded.publish.packet_id) != NULL) {
+ break;
+ }
+
+ rv = __mqtt_pubrec(client, response.decoded.publish.packet_id);
+ if (rv != MQTT_OK) {
+ client->error = rv;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return rv;
+ }
+ }
+ /* call publish callback */
+ client->publish_response_callback(&client->publish_response_callback_state, &response.decoded.publish);
+ break;
+ case MQTT_CONTROL_PUBACK:
+ /* release associated PUBLISH */
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.puback.packet_id);
+ if (msg == NULL) {
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_ACK_OF_UNKNOWN;
+ }
+ msg->state = MQTT_QUEUED_COMPLETE;
+ /* update response time */
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
+ break;
+ case MQTT_CONTROL_PUBREC:
+ /* check if this is a duplicate */
+ if (mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREL, &response.decoded.pubrec.packet_id) != NULL) {
+ break;
+ }
+ /* release associated PUBLISH */
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.pubrec.packet_id);
+ if (msg == NULL) {
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_ACK_OF_UNKNOWN;
+ }
+ msg->state = MQTT_QUEUED_COMPLETE;
+ /* update response time */
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
+ /* stage PUBREL */
+ rv = __mqtt_pubrel(client, response.decoded.pubrec.packet_id);
+ if (rv != MQTT_OK) {
+ client->error = rv;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return rv;
+ }
+ break;
+ case MQTT_CONTROL_PUBREL:
+ /* release associated PUBREC */
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREC, &response.decoded.pubrel.packet_id);
+ if (msg == NULL) {
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_ACK_OF_UNKNOWN;
+ }
+ msg->state = MQTT_QUEUED_COMPLETE;
+ /* update response time */
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
+ /* stage PUBCOMP */
+ rv = __mqtt_pubcomp(client, response.decoded.pubrec.packet_id);
+ if (rv != MQTT_OK) {
+ client->error = rv;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return rv;
+ }
+ break;
+ case MQTT_CONTROL_PUBCOMP:
+ /* release associated PUBREL */
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREL, &response.decoded.pubcomp.packet_id);
+ if (msg == NULL) {
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_ACK_OF_UNKNOWN;
+ }
+ msg->state = MQTT_QUEUED_COMPLETE;
+ /* update response time */
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
+ break;
+ case MQTT_CONTROL_SUBACK:
+ /* release associated SUBSCRIBE */
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_SUBSCRIBE, &response.decoded.suback.packet_id);
+ if (msg == NULL) {
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_ACK_OF_UNKNOWN;
+ }
+ msg->state = MQTT_QUEUED_COMPLETE;
+ /* update response time */
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
+ /* check that subscription was successful (not currently only one subscribe at a time) */
+ if (response.decoded.suback.return_codes[0] == MQTT_SUBACK_FAILURE) {
+ client->error = MQTT_ERROR_SUBSCRIBE_FAILED;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_SUBSCRIBE_FAILED;
+ }
+ break;
+ case MQTT_CONTROL_UNSUBACK:
+ /* release associated UNSUBSCRIBE */
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_UNSUBSCRIBE, &response.decoded.unsuback.packet_id);
+ if (msg == NULL) {
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_ACK_OF_UNKNOWN;
+ }
+ msg->state = MQTT_QUEUED_COMPLETE;
+ /* update response time */
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
+ break;
+ case MQTT_CONTROL_PINGRESP:
+ /* release associated PINGREQ */
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PINGREQ, NULL);
+ if (msg == NULL) {
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_ACK_OF_UNKNOWN;
+ }
+ msg->state = MQTT_QUEUED_COMPLETE;
+ /* update response time */
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
+ break;
+ default:
+ client->error = MQTT_ERROR_MALFORMED_RESPONSE;
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_ERROR_MALFORMED_RESPONSE;
+ }
+
+ /* we've handled the response, now clean the buffer */
+ void *dest = client->recv_buffer.mem_start;
+ void *src = client->recv_buffer.mem_start + consumed;
+ size_t n = client->recv_buffer.curr - client->recv_buffer.mem_start - consumed;
+ memmove(dest, src, n);
+ client->recv_buffer.curr -= consumed;
+ client->recv_buffer.curr_sz += consumed;
+ }
+
+ /* never hit (always return once there's nothing left. */
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
+ return MQTT_OK;
+}
+
+/* FIXED HEADER */
+
+#define MQTT_BITFIELD_RULE_VIOLOATION(bitfield, rule_value, rule_mask) ((bitfield ^ rule_value) & rule_mask)
+
+struct {
+ const uint8_t control_type_is_valid[16];
+ const uint8_t required_flags[16];
+ const uint8_t mask_required_flags[16];
+} mqtt_fixed_header_rules = {
+ { /* boolean value, true if type is valid */
+ 0x00, /* MQTT_CONTROL_RESERVED */
+ 0x01, /* MQTT_CONTROL_CONNECT */
+ 0x01, /* MQTT_CONTROL_CONNACK */
+ 0x01, /* MQTT_CONTROL_PUBLISH */
+ 0x01, /* MQTT_CONTROL_PUBACK */
+ 0x01, /* MQTT_CONTROL_PUBREC */
+ 0x01, /* MQTT_CONTROL_PUBREL */
+ 0x01, /* MQTT_CONTROL_PUBCOMP */
+ 0x01, /* MQTT_CONTROL_SUBSCRIBE */
+ 0x01, /* MQTT_CONTROL_SUBACK */
+ 0x01, /* MQTT_CONTROL_UNSUBSCRIBE */
+ 0x01, /* MQTT_CONTROL_UNSUBACK */
+ 0x01, /* MQTT_CONTROL_PINGREQ */
+ 0x01, /* MQTT_CONTROL_PINGRESP */
+ 0x01, /* MQTT_CONTROL_DISCONNECT */
+ 0x00 /* MQTT_CONTROL_RESERVED */
+ },
+ { /* flags that must be set for the associated control type */
+ 0x00, /* MQTT_CONTROL_RESERVED */
+ 0x00, /* MQTT_CONTROL_CONNECT */
+ 0x00, /* MQTT_CONTROL_CONNACK */
+ 0x00, /* MQTT_CONTROL_PUBLISH */
+ 0x00, /* MQTT_CONTROL_PUBACK */
+ 0x00, /* MQTT_CONTROL_PUBREC */
+ 0x02, /* MQTT_CONTROL_PUBREL */
+ 0x00, /* MQTT_CONTROL_PUBCOMP */
+ 0x02, /* MQTT_CONTROL_SUBSCRIBE */
+ 0x00, /* MQTT_CONTROL_SUBACK */
+ 0x02, /* MQTT_CONTROL_UNSUBSCRIBE */
+ 0x00, /* MQTT_CONTROL_UNSUBACK */
+ 0x00, /* MQTT_CONTROL_PINGREQ */
+ 0x00, /* MQTT_CONTROL_PINGRESP */
+ 0x00, /* MQTT_CONTROL_DISCONNECT */
+ 0x00 /* MQTT_CONTROL_RESERVED */
+ },
+ { /* mask of flags that must be specific values for the associated control type*/
+ 0x00, /* MQTT_CONTROL_RESERVED */
+ 0x0F, /* MQTT_CONTROL_CONNECT */
+ 0x0F, /* MQTT_CONTROL_CONNACK */
+ 0x00, /* MQTT_CONTROL_PUBLISH */
+ 0x0F, /* MQTT_CONTROL_PUBACK */
+ 0x0F, /* MQTT_CONTROL_PUBREC */
+ 0x0F, /* MQTT_CONTROL_PUBREL */
+ 0x0F, /* MQTT_CONTROL_PUBCOMP */
+ 0x0F, /* MQTT_CONTROL_SUBSCRIBE */
+ 0x0F, /* MQTT_CONTROL_SUBACK */
+ 0x0F, /* MQTT_CONTROL_UNSUBSCRIBE */
+ 0x0F, /* MQTT_CONTROL_UNSUBACK */
+ 0x0F, /* MQTT_CONTROL_PINGREQ */
+ 0x0F, /* MQTT_CONTROL_PINGRESP */
+ 0x0F, /* MQTT_CONTROL_DISCONNECT */
+ 0x00 /* MQTT_CONTROL_RESERVED */
+ }
+};
+
+ssize_t mqtt_fixed_header_rule_violation(const struct mqtt_fixed_header *fixed_header) {
+ uint8_t control_type;
+ uint8_t control_flags;
+ uint8_t required_flags;
+ uint8_t mask_required_flags;
+
+ /* get value and rules */
+ control_type = fixed_header->control_type;
+ control_flags = fixed_header->control_flags;
+ required_flags = mqtt_fixed_header_rules.required_flags[control_type];
+ mask_required_flags = mqtt_fixed_header_rules.mask_required_flags[control_type];
+
+ /* check for valid type */
+ if (!mqtt_fixed_header_rules.control_type_is_valid[control_type]) {
+ return MQTT_ERROR_CONTROL_FORBIDDEN_TYPE;
+ }
+
+ /* check that flags are appropriate */
+ if(MQTT_BITFIELD_RULE_VIOLOATION(control_flags, required_flags, mask_required_flags)) {
+ return MQTT_ERROR_CONTROL_INVALID_FLAGS;
+ }
+
+ return 0;
+}
+
+ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t *buf, size_t bufsz) {
+ struct mqtt_fixed_header *fixed_header;
+ const uint8_t *start = buf;
+ int lshift;
+ ssize_t errcode;
+
+ /* check for null pointers or empty buffer */
+ if (response == NULL || buf == NULL) {
+ return MQTT_ERROR_NULLPTR;
+ }
+ fixed_header = &(response->fixed_header);
+
+ /* check that bufsz is not zero */
+ if (bufsz == 0) return 0;
+
+ /* parse control type and flags */
+ fixed_header->control_type = *buf >> 4;
+ fixed_header->control_flags = *buf & 0x0F;
+
+ /* parse remaining size */
+ fixed_header->remaining_length = 0;
+
+ lshift = 0;
+ do {
+ /* consume byte and assert at least 1 byte left */
+ --bufsz;
+ ++buf;
+ if (bufsz == 0) return 0;
+
+ /* parse next byte*/
+ fixed_header->remaining_length += (*buf & 0x7F) << lshift;
+ lshift += 7;
+ } while(*buf & 0x80); /* while continue bit is set */
+
+ /* consume last byte */
+ --bufsz;
+ ++buf;
+
+ /* check that the fixed header is valid */
+ errcode = mqtt_fixed_header_rule_violation(fixed_header);
+ if (errcode) {
+ return errcode;
+ }
+
+ /* check that the buffer size if GT remaining length */
+ if (bufsz < fixed_header->remaining_length) {
+ return 0;
+ }
+
+ /* return how many bytes were consumed */
+ return buf - start;
+}
+
+ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fixed_header *fixed_header) {
+ const uint8_t *start = buf;
+ ssize_t errcode;
+ uint32_t remaining_length;
+
+ /* check for null pointers or empty buffer */
+ if (fixed_header == NULL || buf == NULL) {
+ return MQTT_ERROR_NULLPTR;
+ }
+
+ /* check that the fixed header is valid */
+ errcode = mqtt_fixed_header_rule_violation(fixed_header);
+ if (errcode) {
+ return errcode;
+ }
+
+ /* check that bufsz is not zero */
+ if (bufsz == 0) return 0;
+
+ /* pack control type and flags */
+ *buf = (((uint8_t) fixed_header->control_type) << 4) & 0xF0;
+ *buf |= ((uint8_t) fixed_header->control_flags) & 0x0F;
+
+ remaining_length = fixed_header->remaining_length;
+ do {
+ /* consume byte and assert at least 1 byte left */
+ --bufsz;
+ ++buf;
+ if (bufsz == 0) return 0;
+
+ /* pack next byte */
+ *buf = remaining_length & 0x7F;
+ if(remaining_length > 127) *buf |= 0x80;
+ remaining_length = remaining_length >> 7;
+ } while(*buf & 0x80);
+
+ /* consume last byte */
+ --bufsz;
+ ++buf;
+
+ /* check that there's still enough space in buffer for packet */
+ if (bufsz < fixed_header->remaining_length) {
+ return 0;
+ }
+
+ /* return how many bytes were consumed */
+ return buf - start;
+}
+
+/* CONNECT */
+ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz,
+ const char* client_id,
+ const char* will_topic,
+ const void* will_message,
+ size_t will_message_size,
+ const char* user_name,
+ const char* password,
+ uint8_t connect_flags,
+ uint16_t keep_alive)
+{
+ struct mqtt_fixed_header fixed_header;
+ uint32_t remaining_length;
+ const uint8_t const* start = buf;
+ ssize_t rv;
+ uint8_t temp;
+
+ /* pack the fixed headr */
+ fixed_header.control_type = MQTT_CONTROL_CONNECT;
+ fixed_header.control_flags = 0x00;
+
+ /* calculate remaining length and build connect_flags at the same time */
+ connect_flags = connect_flags & ~MQTT_CONNECT_RESERVED;
+ remaining_length = 10; /* size of variable header */
+
+ if (client_id == NULL) {
+ /* client_id is a mandatory parameter */
+ return MQTT_ERROR_CONNECT_NULL_CLIENT_ID;
+ } else {
+ /* mqtt_string length is strlen + 2 */
+ remaining_length += __mqtt_packed_cstrlen(client_id);
+ }
+
+ if (will_topic != NULL) {
+ /* there is a will */
+ connect_flags |= MQTT_CONNECT_WILL_FLAG;
+ remaining_length += __mqtt_packed_cstrlen(will_topic);
+
+ if (will_message == NULL) {
+ /* if there's a will there MUST be a will message */
+ return MQTT_ERROR_CONNECT_NULL_WILL_MESSAGE;
+ }
+ remaining_length += 2 + will_message_size; /* size of will_message */
+
+ /* assert that the will QOS is valid (i.e. not 3) */
+ temp = connect_flags & 0x18; /* mask to QOS */
+ if (temp == 0x18) {
+ /* bitwise equality with QoS 3 (invalid)*/
+ return MQTT_ERROR_CONNECT_FORBIDDEN_WILL_QOS;
+ }
+ } else {
+ /* there is no will so set all will flags to zero */
+ connect_flags &= ~MQTT_CONNECT_WILL_FLAG;
+ connect_flags &= ~0x18;
+ connect_flags &= ~MQTT_CONNECT_WILL_RETAIN;
+ }
+
+ if (user_name != NULL) {
+ /* a user name is present */
+ connect_flags |= MQTT_CONNECT_USER_NAME;
+ remaining_length += __mqtt_packed_cstrlen(user_name);
+ } else {
+ connect_flags &= ~MQTT_CONNECT_USER_NAME;
+ }
+
+ if (password != NULL) {
+ /* a password is present */
+ connect_flags |= MQTT_CONNECT_PASSWORD;
+ remaining_length += __mqtt_packed_cstrlen(password);
+ } else {
+ connect_flags &= ~MQTT_CONNECT_PASSWORD;
+ }
+
+ /* fixed header length is now calculated*/
+ fixed_header.remaining_length = remaining_length;
+
+ /* pack fixed header and perform error checks */
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
+ if (rv <= 0) {
+ /* something went wrong */
+ return rv;
+ }
+ buf += rv;
+ bufsz -= rv;
+
+ /* check that the buffer has enough space to fit the remaining length */
+ if (bufsz < fixed_header.remaining_length) {
+ return 0;
+ }
+
+ /* pack the variable header */
+ *buf++ = 0x00;
+ *buf++ = 0x04;
+ *buf++ = (uint8_t) 'M';
+ *buf++ = (uint8_t) 'Q';
+ *buf++ = (uint8_t) 'T';
+ *buf++ = (uint8_t) 'T';
+ *buf++ = MQTT_PROTOCOL_LEVEL;
+ *buf++ = connect_flags;
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(keep_alive);
+ buf += 2;
+
+ /* pack the payload */
+ buf += __mqtt_pack_str(buf, client_id);
+ if (connect_flags & MQTT_CONNECT_WILL_FLAG) {
+ buf += __mqtt_pack_str(buf, will_topic);
+ memcpy(buf, will_message, will_message_size);
+ buf += will_message_size;
+ }
+ if (connect_flags & MQTT_CONNECT_USER_NAME) {
+ buf += __mqtt_pack_str(buf, user_name);
+ }
+ if (connect_flags & MQTT_CONNECT_PASSWORD) {
+ buf += __mqtt_pack_str(buf, password);
+ }
+
+ /* return the number of bytes that were consumed */
+ return buf - start;
+}
+
+/* CONNACK */
+ssize_t mqtt_unpack_connack_response(struct mqtt_response *mqtt_response, const uint8_t *buf) {
+ const uint8_t const *start = buf;
+ struct mqtt_response_connack *response;
+
+ /* check that remaining length is 2 */
+ if (mqtt_response->fixed_header.remaining_length != 2) {
+ return MQTT_ERROR_MALFORMED_RESPONSE;
+ }
+
+ response = &(mqtt_response->decoded.connack);
+ /* unpack */
+ if (*buf & 0xFE) {
+ /* only bit 1 can be set */
+ return MQTT_ERROR_CONNACK_FORBIDDEN_FLAGS;
+ } else {
+ response->session_present_flag = *buf++;
+ }
+
+ if (*buf > 5u) {
+ /* only bit 1 can be set */
+ return MQTT_ERROR_CONNACK_FORBIDDEN_CODE;
+ } else {
+ response->return_code = (enum MQTTConnackReturnCode) *buf++;
+ }
+ return buf - start;
+}
+
+/* DISCONNECT */
+ssize_t mqtt_pack_disconnect(uint8_t *buf, size_t bufsz) {
+ struct mqtt_fixed_header fixed_header;
+ fixed_header.control_type = MQTT_CONTROL_DISCONNECT;
+ fixed_header.control_flags = 0;
+ fixed_header.remaining_length = 0;
+ return mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
+}
+
+/* PING */
+ssize_t mqtt_pack_ping_request(uint8_t *buf, size_t bufsz) {
+ struct mqtt_fixed_header fixed_header;
+ fixed_header.control_type = MQTT_CONTROL_PINGREQ;
+ fixed_header.control_flags = 0;
+ fixed_header.remaining_length = 0;
+ return mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
+}
+
+/* PUBLISH */
+ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz,
+ const char* topic_name,
+ uint16_t packet_id,
+ void* application_message,
+ size_t application_message_size,
+ uint8_t publish_flags)
+{
+ const uint8_t const *start = buf;
+ ssize_t rv;
+ struct mqtt_fixed_header fixed_header;
+ uint16_t remaining_length;
+ uint8_t inspected_qos;
+
+ /* check for null pointers */
+ if(buf == NULL || topic_name == NULL) {
+ return MQTT_ERROR_NULLPTR;
+ }
+
+ /* inspect QoS level */
+ inspected_qos = (publish_flags & 0x06) >> 1; /* mask */
+
+ /* build the fixed header */
+ fixed_header.control_type = MQTT_CONTROL_PUBLISH;
+
+ /* calculate remaining length */
+ remaining_length = __mqtt_packed_cstrlen(topic_name);
+ if (inspected_qos > 0) {
+ remaining_length += 2;
+ }
+ remaining_length += application_message_size;
+ fixed_header.remaining_length = remaining_length;
+
+ /* force dup to 0 if qos is 0 */
+ if (inspected_qos == 0) {
+ publish_flags &= ~MQTT_PUBLISH_DUP;
+ }
+
+ /* make sure that qos is not 3 */
+ if (inspected_qos == 3) {
+ return MQTT_ERROR_PUBLISH_FORBIDDEN_QOS;
+ }
+ fixed_header.control_flags = publish_flags;
+
+ /* pack fixed header */
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
+ if (rv <= 0) {
+ /* something went wrong */
+ return rv;
+ }
+ buf += rv;
+ bufsz -= rv;
+
+ /* check that buffer is big enough */
+ if (bufsz < remaining_length) {
+ return 0;
+ }
+
+ /* pack variable header */
+ buf += __mqtt_pack_str(buf, topic_name);
+ if (inspected_qos > 0) {
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id);
+ buf += 2;
+ }
+
+ /* pack payload */
+ memcpy(buf, application_message, application_message_size);
+ buf += application_message_size;
+
+ return buf - start;
+}
+
+ssize_t mqtt_unpack_publish_response(struct mqtt_response *mqtt_response, const uint8_t *buf)
+{
+ const uint8_t const *start = buf;
+ struct mqtt_fixed_header *fixed_header;
+ struct mqtt_response_publish *response;
+
+ fixed_header = &(mqtt_response->fixed_header);
+ response = &(mqtt_response->decoded.publish);
+
+ /* get flags */
+ response->dup_flag = (fixed_header->control_flags & MQTT_PUBLISH_DUP) >> 3;
+ response->qos_level = (fixed_header->control_flags & 0x06) >> 1;
+ response->retain_flag = fixed_header->control_flags & MQTT_PUBLISH_RETAIN;
+
+ /* make sure that remaining length is valid */
+ if (mqtt_response->fixed_header.remaining_length < 4) {
+ return MQTT_ERROR_MALFORMED_RESPONSE;
+ }
+
+ /* parse variable header */
+ response->topic_name_size = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf);
+ buf += 2;
+ response->topic_name = buf;
+ buf += response->topic_name_size;
+
+ if (response->qos_level > 0) {
+ response->packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf);
+ buf += 2;
+ }
+
+ /* get payload */
+ response->application_message = buf;
+ if (response->qos_level == 0) {
+ response->application_message_size = fixed_header->remaining_length - response->topic_name_size - 2;
+ } else {
+ response->application_message_size = fixed_header->remaining_length - response->topic_name_size - 4;
+ }
+ buf += response->application_message_size;
+
+ /* return number of bytes consumed */
+ return buf - start;
+}
+
+/* PUBXXX */
+ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz,
+ enum MQTTControlPacketType control_type,
+ uint16_t packet_id)
+{
+ const uint8_t const *start = buf;
+ struct mqtt_fixed_header fixed_header;
+ ssize_t rv;
+ if (buf == NULL) {
+ return MQTT_ERROR_NULLPTR;
+ }
+
+ /* pack fixed header */
+ fixed_header.control_type = control_type;
+ if (control_type == MQTT_CONTROL_PUBREL) {
+ fixed_header.control_flags = 0x02;
+ } else {
+ fixed_header.control_flags = 0;
+ }
+ fixed_header.remaining_length = 2;
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
+ if (rv <= 0) {
+ return rv;
+ }
+ buf += rv;
+ bufsz -= rv;
+
+ if (bufsz < fixed_header.remaining_length) {
+ return 0;
+ }
+
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id);
+ buf += 2;
+
+ return buf - start;
+}
+
+ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf)
+{
+ const uint8_t const *start = buf;
+ uint16_t packet_id;
+
+ /* assert remaining length is correct */
+ if (mqtt_response->fixed_header.remaining_length != 2) {
+ return MQTT_ERROR_MALFORMED_RESPONSE;
+ }
+
+ /* parse packet_id */
+ packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf);
+ buf += 2;
+
+ if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBACK) {
+ mqtt_response->decoded.puback.packet_id = packet_id;
+ } else if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBREC) {
+ mqtt_response->decoded.pubrec.packet_id = packet_id;
+ } else if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBREL) {
+ mqtt_response->decoded.pubrel.packet_id = packet_id;
+ } else {
+ mqtt_response->decoded.pubcomp.packet_id = packet_id;
+ }
+
+ return buf - start;
+}
+
+/* SUBACK */
+ssize_t mqtt_unpack_suback_response (struct mqtt_response *mqtt_response, const uint8_t *buf) {
+ const uint8_t const *start = buf;
+ uint32_t remaining_length = mqtt_response->fixed_header.remaining_length;
+
+ /* assert remaining length is at least 3 (for packet id and at least 1 topic) */
+ if (remaining_length < 3) {
+ return MQTT_ERROR_MALFORMED_RESPONSE;
+ }
+
+ /* unpack packet_id */
+ mqtt_response->decoded.suback.packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf);
+ buf += 2;
+ remaining_length -= 2;
+
+ /* unpack return codes */
+ mqtt_response->decoded.suback.num_return_codes = (size_t) remaining_length;
+ mqtt_response->decoded.suback.return_codes = buf;
+ buf += remaining_length;
+
+ return buf - start;
+}
+
+/* SUBSCRIBE */
+ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_id, ...) {
+ va_list args;
+ const uint8_t const *start = buf;
+ ssize_t rv;
+ struct mqtt_fixed_header fixed_header;
+ unsigned int num_subs = 0;
+ unsigned int i;
+ const char *topic[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
+ uint8_t max_qos[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
+
+ /* parse all subscriptions */
+ va_start(args, packet_id);
+ while(1) {
+ topic[num_subs] = va_arg(args, const char*);
+ if (topic[num_subs] == NULL) {
+ /* end of list */
+ break;
+ }
+
+ max_qos[num_subs] = (uint8_t) va_arg(args, unsigned int);
+
+ ++num_subs;
+ if (num_subs >= MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS) {
+ return MQTT_ERROR_SUBSCRIBE_TOO_MANY_TOPICS;
+ }
+ }
+ va_end(args);
+
+ /* build the fixed header */
+ fixed_header.control_type = MQTT_CONTROL_SUBSCRIBE;
+ fixed_header.control_flags = 2u;
+ fixed_header.remaining_length = 2u; /* size of variable header */
+ for(i = 0; i < num_subs; ++i) {
+ /* payload is topic name + max qos (1 byte) */
+ fixed_header.remaining_length += __mqtt_packed_cstrlen(topic[i]) + 1;
+ }
+
+ /* pack the fixed header */
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
+ if (rv <= 0) {
+ return rv;
+ }
+ buf += rv;
+ bufsz -= rv;
+
+ /* check that the buffer has enough space */
+ if (bufsz < fixed_header.remaining_length) {
+ return 0;
+ }
+
+ /* pack variable header */
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id);
+ buf += 2;
+
+
+ /* pack payload */
+ for(i = 0; i < num_subs; ++i) {
+ buf += __mqtt_pack_str(buf, topic[i]);
+ *buf++ = max_qos[i];
+ }
+
+ return buf - start;
+}
+
+/* UNSUBACK */
+ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf)
+{
+ const uint8_t const *start = buf;
+
+ if (mqtt_response->fixed_header.remaining_length != 2) {
+ return MQTT_ERROR_MALFORMED_RESPONSE;
+ }
+
+ /* parse packet_id */
+ mqtt_response->decoded.unsuback.packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf);
+ buf += 2;
+
+ return buf - start;
+}
+
+/* UNSUBSCRIBE */
+ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_id, ...) {
+ va_list args;
+ const uint8_t const *start = buf;
+ ssize_t rv;
+ struct mqtt_fixed_header fixed_header;
+ unsigned int num_subs = 0;
+ unsigned int i;
+ const char *topic[MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
+
+ /* parse all subscriptions */
+ va_start(args, packet_id);
+ while(1) {
+ topic[num_subs] = va_arg(args, const char*);
+ if (topic[num_subs] == NULL) {
+ /* end of list */
+ break;
+ }
+
+ ++num_subs;
+ if (num_subs >= MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS) {
+ return MQTT_ERROR_UNSUBSCRIBE_TOO_MANY_TOPICS;
+ }
+ }
+ va_end(args);
+
+ /* build the fixed header */
+ fixed_header.control_type = MQTT_CONTROL_UNSUBSCRIBE;
+ fixed_header.control_flags = 2u;
+ fixed_header.remaining_length = 2u; /* size of variable header */
+ for(i = 0; i < num_subs; ++i) {
+ /* payload is topic name */
+ fixed_header.remaining_length += __mqtt_packed_cstrlen(topic[i]);
+ }
+
+ /* pack the fixed header */
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
+ if (rv <= 0) {
+ return rv;
+ }
+ buf += rv;
+ bufsz -= rv;
+
+ /* check that the buffer has enough space */
+ if (bufsz < fixed_header.remaining_length) {
+ return 0;
+ }
+
+ /* pack variable header */
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id);
+ buf += 2;
+
+
+ /* pack payload */
+ for(i = 0; i < num_subs; ++i) {
+ buf += __mqtt_pack_str(buf, topic[i]);
+ }
+
+ return buf - start;
+}
+
+/* MESSAGE QUEUE */
+void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz)
+{
+ mq->mem_start = buf;
+ mq->mem_end = buf + bufsz;
+ mq->curr = buf;
+ mq->queue_tail = mq->mem_end;
+ mq->curr_sz = mqtt_mq_currsz(mq);
+}
+
+struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes)
+{
+ /* make queued message header */
+ --(mq->queue_tail);
+ mq->queue_tail->start = mq->curr;
+ mq->queue_tail->size = nbytes;
+ mq->queue_tail->state = MQTT_QUEUED_UNSENT;
+
+ /* move curr and recalculate curr_sz */
+ mq->curr += nbytes;
+ mq->curr_sz = mqtt_mq_currsz(mq);
+
+ return mq->queue_tail;
+}
+
+void mqtt_mq_clean(struct mqtt_message_queue *mq) {
+ struct mqtt_queued_message *new_head;
+ ssize_t i;
+
+ for(new_head = mqtt_mq_get(mq, 0); new_head >= mq->queue_tail; --new_head) {
+ if (new_head->state != MQTT_QUEUED_COMPLETE) break;
+ }
+
+ /* check if everything can be removed */
+ if (new_head < mq->queue_tail) {
+ mq->curr = mq->mem_start;
+ mq->queue_tail = mq->mem_end;
+ mq->curr_sz = mqtt_mq_currsz(mq);
+ return;
+ } else if (new_head == mqtt_mq_get(mq, 0)) {
+ /* do nothing */
+ return;
+ }
+
+ /* move buffered data */
+ size_t n = mq->curr - new_head->start;
+ size_t removing = new_head->start - (uint8_t*) mq->mem_start;
+ memmove(mq->mem_start, new_head->start, n);
+ mq->curr = mq->mem_start + n;
+
+ /* move queue */
+ ssize_t new_tail_idx = new_head - mq->queue_tail;
+ memmove(mqtt_mq_get(mq, new_tail_idx), mq->queue_tail, sizeof(struct mqtt_queued_message) * (new_tail_idx + 1));
+ mq->queue_tail = mqtt_mq_get(mq, new_tail_idx);
+
+ /* bump back start's */
+ for(i = 0; i < new_tail_idx + 1; ++i) {
+ mqtt_mq_get(mq, i)->start -= removing;
+ }
+
+ /* get curr_sz */
+ mq->curr_sz = mqtt_mq_currsz(mq);
+}
+
+struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQTTControlPacketType control_type, uint16_t *packet_id)
+{
+ struct mqtt_queued_message *curr;
+ for(curr = mqtt_mq_get(mq, 0); curr >= mq->queue_tail; --curr) {
+ if (curr->control_type == control_type) {
+ if (packet_id == NULL || (packet_id != NULL && *packet_id == curr->packet_id)) {
+ return curr;
+ }
+ }
+ }
+ return NULL;
+}
+
+
+/* RESPONSE UNPACKING */
+ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, size_t bufsz) {
+ const uint8_t const *start = buf;
+ ssize_t rv = mqtt_unpack_fixed_header(response, buf, bufsz);
+ if (rv <= 0) return rv;
+ else buf += rv;
+ switch(response->fixed_header.control_type) {
+ case MQTT_CONTROL_CONNACK:
+ rv = mqtt_unpack_connack_response(response, buf);
+ break;
+ case MQTT_CONTROL_PUBLISH:
+ rv = mqtt_unpack_publish_response(response, buf);
+ break;
+ case MQTT_CONTROL_PUBACK:
+ rv = mqtt_unpack_pubxxx_response(response, buf);
+ break;
+ case MQTT_CONTROL_PUBREC:
+ rv = mqtt_unpack_pubxxx_response(response, buf);
+ break;
+ case MQTT_CONTROL_PUBREL:
+ rv = mqtt_unpack_pubxxx_response(response, buf);
+ break;
+ case MQTT_CONTROL_PUBCOMP:
+ rv = mqtt_unpack_pubxxx_response(response, buf);
+ break;
+ case MQTT_CONTROL_SUBACK:
+ rv = mqtt_unpack_suback_response(response, buf);
+ break;
+ case MQTT_CONTROL_UNSUBACK:
+ rv = mqtt_unpack_unsuback_response(response, buf);
+ break;
+ case MQTT_CONTROL_PINGRESP:
+ return rv;
+ default:
+ return MQTT_ERROR_RESPONSE_INVALID_CONTROL_TYPE;
+ }
+
+ if (rv < 0) return rv;
+ buf += rv;
+ return buf - start;
+}
+
+/* EXTRA DETAILS */
+ssize_t __mqtt_pack_str(uint8_t *buf, const char* str) {
+ uint16_t length = strlen(str);
+ int i;
+
+ /* pack string length */
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(length);
+ buf += 2;
+
+ /* pack string */
+ for(i = 0; i < length; ++i) {
+ *(buf++) = str[i];
+ }
+
+ /* return number of bytes consumed */
+ return length + 2;
+}
+
+static const char *MQTT_ERRORS_STR[] = {
+ "MQTT_UNKNOWN_ERROR",
+ __ALL_MQTT_ERRORS(GENERATE_STRING)
+};
+
+const char* mqtt_error_str(enum MQTTErrors error) {
+ int offset = error - MQTT_ERROR_UNKNOWN;
+ if (offset >= 0) {
+ return MQTT_ERRORS_STR[offset];
+ } else if (error == 0) {
+ return "MQTT_ERROR: Buffer too small.";
+ } else if (error > 0) {
+ return "MQTT_OK";
+ } else {
+ return MQTT_ERRORS_STR[0];
+ }
+}
+
+/** @endcond*/
diff --git a/netutils/mqttc/mqtt_pal.c b/netutils/mqttc/mqtt_pal.c
new file mode 100644
index 00000000..4c6a534d
--- /dev/null
+++ b/netutils/mqttc/mqtt_pal.c
@@ -0,0 +1,90 @@
+#include "netutils/mqtt.h"
+
+/**
+ * @file
+ * @brief Implements @ref mqtt_pal_sendall and @ref mqtt_pal_recvall and
+ * any platform-specific helpers you'd like.
+ * @cond Doxygen_Suppress
+ */
+
+//#define __unix__ 1
+#ifdef __unix__
+
+#ifdef MQTT_USE_BIO
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
+ size_t sent = 0;
+
+ while(sent < len) {
+ int tmp = BIO_write(fd, buf + sent, len - sent);
+ if (tmp > 0) {
+ sent += (size_t) tmp;
+ } else if (tmp <= 0 && !BIO_should_retry(fd)) {
+ return MQTT_ERROR_SOCKET_ERROR;
+ }
+ }
+
+ return sent;
+}
+
+ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
+ const void const *start = buf;
+ int rv;
+ do {
+ rv = BIO_read(fd, buf, bufsz);
+ if (rv > 0) {
+ /* successfully read bytes from the socket */
+ buf += rv;
+ bufsz -= rv;
+ } else if (!BIO_should_retry(fd)) {
+ /* an error occurred that wasn't "nothing to read". */
+ return MQTT_ERROR_SOCKET_ERROR;
+ }
+ } while (!BIO_should_read(fd));
+
+ return (ssize_t)(buf - start);
+}
+
+#else
+#include <errno.h>
+
+ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
+ size_t sent = 0;
+
+ while(sent < len) {
+ ssize_t tmp = send(fd, buf + sent, len - sent, flags);
+ if (tmp < 1) {
+ return MQTT_ERROR_SOCKET_ERROR;
+ }
+ sent += (size_t) tmp;
+ }
+ return sent;
+}
+
+ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
+ const void const *start = buf;
+ ssize_t rv;
+
+ do {
+ rv = recv(fd, buf, bufsz, flags);
+ if (rv > 0) {
+ /* successfully read bytes from the socket */
+ buf += rv;
+ bufsz -= rv;
+ } else if (rv < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ /* an error occurred that wasn't "nothing to read". */
+ return MQTT_ERROR_SOCKET_ERROR;
+ }
+ } while (rv > 0);
+
+ return buf - start;
+}
+
+#endif
+
+#endif
+
+/** @endcond */
diff --git a/system/mqttc/.gitignore b/system/mqttc/.gitignore
new file mode 100644
index 00000000..83bd7b81
--- /dev/null
+++ b/system/mqttc/.gitignore
@@ -0,0 +1,11 @@
+/Make.dep
+/.depend
+/.built
+/*.asm
+/*.rel
+/*.lst
+/*.sym
+/*.adb
+/*.lib
+/*.src
+/*.obj
diff --git a/system/mqttc/Kconfig b/system/mqttc/Kconfig
new file mode 100644
index 00000000..d5bc2737
--- /dev/null
+++ b/system/mqttc/Kconfig
@@ -0,0 +1,31 @@
+#
+# For a description of the syntax of this configuration file,
+# see the file kconfig-language.txt in the NuttX tools repository.
+#
+
+config SYSTEM_MQTTC
+ bool "'mqttc' command"
+ default n
+ depends on NET
+ ---help---
+ Enable support for the 'mqttc' command.
+
+if SYSTEM_MQTTC
+config SYSTEM_MQTTC_PROGNAME
+ string "MQTTC program name"
+ default "mqttc"
+ depends on BUILD_KERNEL
+ ---help---
+ This is the name of the program that will be use when the NSH ELF
+ program is installed.
+
+config SYSTEM_MQTTC_PRIORITY
+ int "MQTTC task priority"
+ default 100
+
+config SYSTEM_MQTTC_STACKSIZE
+ int "MQTTC stack size"
+ default 8192
+
+endif
+
diff --git a/system/mqttc/Make.defs b/system/mqttc/Make.defs
new file mode 100644
index 00000000..07090fcc
--- /dev/null
+++ b/system/mqttc/Make.defs
@@ -0,0 +1,40 @@
+############################################################################
+# apps/system/mqttc/Make.defs
+# Adds selected applications to apps/ build
+#
+# Copyright (C) 2017 Gregory Nutt. All rights reserved.
+# Author: Gregory Nutt <gnutt@nuttx.org>
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# 3. Neither the name NuttX nor the names of its contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+############################################################################
+
+ifeq ($(CONFIG_SYSTEM_MQTTC),y)
+CONFIGURED_APPS += system/mqttc
+endif
+
diff --git a/system/mqttc/Makefile b/system/mqttc/Makefile
new file mode 100644
index 00000000..27286eea
--- /dev/null
+++ b/system/mqttc/Makefile
@@ -0,0 +1,147 @@
+############################################################################
+# apps/system/mqttc/Makefile
+#
+# Copyright (C) 2017 Gregory Nutt. All rights reserved.
+# Author: Gregory Nutt <gnutt@nuttx.org>
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# 3. Neither the name NuttX nor the names of its contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+############################################################################
+
+-include $(TOPDIR)/.config
+-include $(TOPDIR)/Make.defs
+include $(APPDIR)/Make.defs
+
+# mqttc command
+
+CONFIG_SYSTEM_MQTTC_PRIORITY ?= SCHED_PRIORITY_DEFAULT
+CONFIG_SYSTEM_MQTTC_STACKSIZE ?= 2048
+
+APPNAME = mqttc
+PRIORITY = $(CONFIG_SYSTEM_MQTTC_PRIORITY)
+STACKSIZE = $(CONFIG_SYSTEM_MQTTC_STACKSIZE)
+
+CONFIG_SYSTEM_MQTTC_PROGNAME ?= mqttc$(EXEEXT)
+PROGNAME = $(CONFIG_SYSTEM_MQTTC_PROGNAME)
+
+# Files
+
+ASRCS =
+CSRCS =
+MAINSRC = simple_publisher.c
+
+AOBJS = $(ASRCS:.S=$(OBJEXT))
+COBJS = $(CSRCS:.c=$(OBJEXT))
+MAINOBJ = $(MAINSRC:.c=$(OBJEXT))
+
+SRCS = $(ASRCS) $(CSRCS) $(MAINSRC)
+OBJS = $(AOBJS) $(COBJS)
+
+ifneq ($(CONFIG_BUILD_KERNEL),y)
+ OBJS += $(MAINOBJ)
+endif
+
+ifeq ($(CONFIG_WINDOWS_NATIVE),y)
+ BIN = ..\..\libapps$(LIBEXT)
+else
+ifeq ($(WINTOOL),y)
+ BIN = ..\\..\\libapps$(LIBEXT)
+else
+ BIN = ../../libapps$(LIBEXT)
+endif
+endif
+
+ifeq ($(WINTOOL),y)
+ INSTALL_DIR = "${shell cygpath -w $(BIN_DIR)}"
+else
+ INSTALL_DIR = $(BIN_DIR)
+endif
+
+ROOTDEPPATH = --dep-path .
+
+# Common build
+
+VPATH =
+
+all: .built
+.PHONY: context depend clean distclean preconfig
+.PRECIOUS: ../../libapps$(LIBEXT)
+
+$(AOBJS): %$(OBJEXT): %.S
+ $(call ASSEMBLE, $<, $@)
+
+$(COBJS) $(MAINOBJ): %$(OBJEXT): %.c
+ $(call COMPILE, $<, $@)
+
+.built: $(OBJS)
+ $(call ARCHIVE, $(BIN), $(OBJS))
+ $(Q) touch .built
+
+ifeq ($(CONFIG_BUILD_KERNEL),y)
+$(BIN_DIR)$(DELIM)$(PROGNAME): $(OBJS) $(MAINOBJ)
+ @echo "LD: $(PROGNAME)"
+ $(Q) $(LD) $(LDELFFLAGS) $(LDLIBPATH) -o $(INSTALL_DIR)$(DELIM)$(PROGNAME) $(ARCHCRT0OBJ) $(MAINOBJ) $(LDLIBS)
+ $(Q) $(NM) -u $(INSTALL_DIR)$(DELIM)$(PROGNAME)
+
+install: $(BIN_DIR)$(DELIM)$(PROGNAME)
+
+else
+install:
+
+endif
+
+# Register application
+
+ifeq ($(CONFIG_NSH_BUILTIN_APPS),y)
+$(BUILTIN_REGISTRY)$(DELIM)$(APPNAME)_main.bdat: $(DEPCONFIG) Makefile
+ $(call REGISTER,$(APPNAME),$(PRIORITY),$(STACKSIZE),$(APPNAME)_main)
+
+context: $(BUILTIN_REGISTRY)$(DELIM)$(APPNAME)_main.bdat
+else
+context:
+endif
+
+# Create dependencies
+
+.depend: Makefile $(SRCS)
+ $(Q) $(MKDEP) $(ROOTDEPPATH) "$(CC)" -- $(CFLAGS) -- $(SRCS) >Make.dep
+ $(Q) touch $@
+
+depend: .depend
+
+clean:
+ $(call DELFILE, .built)
+ $(call CLEAN)
+
+distclean: clean
+ $(call DELFILE, Make.dep)
+ $(call DELFILE, .depend)
+
+preconfig:
+
+-include Make.dep
diff --git a/system/mqttc/bio_publisher.c b/system/mqttc/bio_publisher.c
new file mode 100644
index 00000000..8c92b549
--- /dev/null
+++ b/system/mqttc/bio_publisher.c
@@ -0,0 +1,156 @@
+
+/**
+ * @file
+ * A simple program to that publishes the current time whenever ENTER is pressed.
+ */
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include <mqtt.h>
+#include "templates/bio_sockets.h"
+
+
+/**
+ * @brief The function that would be called whenever a PUBLISH is received.
+ *
+ * @note This function is not used in this example.
+ */
+void publish_callback(void** unused, struct mqtt_response_publish *published);
+
+/**
+ * @brief The client's refresher. This function triggers back-end routines to
+ * handle ingress/egress traffic to the broker.
+ *
+ * @note All this function needs to do is call \ref __mqtt_recv and
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that
+ * client ingress/egress traffic will be handled every 100 ms.
+ */
+void* client_refresher(void* client);
+
+/**
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
+ */
+void exit_example(int status, BIO* sockfd, pthread_t *client_daemon);
+
+/**
+ * A simple program to that publishes the current time whenever ENTER is pressed.
+ */
+int main(int argc, const char *argv[])
+{
+ const char* addr;
+ const char* port;
+ const char* topic;
+
+ /* Load OpenSSL */
+ SSL_load_error_strings();
+ ERR_load_BIO_strings();
+ OpenSSL_add_all_algorithms();
+
+ /* get address (argv[1] if present) */
+ if (argc > 1) {
+ addr = argv[1];
+ } else {
+ addr = "test.mosquitto.org";
+ }
+
+ /* get port number (argv[2] if present) */
+ if (argc > 2) {
+ port = argv[2];
+ } else {
+ port = "1883";
+ }
+
+ /* get the topic name to publish */
+ if (argc > 3) {
+ topic = argv[3];
+ } else {
+ topic = "datetime";
+ }
+
+ /* open the non-blocking TCP socket (connecting to the broker) */
+ BIO* sockfd = open_nb_socket(addr, port);
+
+ if (sockfd == NULL) {
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+ }
+
+ /* setup a client */
+ struct mqtt_client client;
+ uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
+ uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
+ mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
+ mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
+
+ /* check that we don't have any errors */
+ if (client.error != MQTT_OK) {
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+ }
+
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */
+ pthread_t client_daemon;
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
+ fprintf(stderr, "Failed to start client daemon.\n");
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+
+ }
+
+ /* start publishing the time */
+ printf("%s is ready to begin publishing the time.\n", argv[0]);
+ printf("Press ENTER to publish the current time.\n");
+ printf("Press CTRL-D (or any other key) to exit.\n\n");
+ while(fgetc(stdin) == '\n') {
+ /* get the current time */
+ time_t timer;
+ time(&timer);
+ struct tm* tm_info = localtime(&timer);
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
+
+ /* print a message */
+ char application_message[256];
+ snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
+ printf("%s published : \"%s\"", argv[0], application_message);
+
+ /* publish the time */
+ mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
+
+ /* check for errors */
+ if (client.error != MQTT_OK) {
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
+ exit_example(EXIT_FAILURE, sockfd, &client_daemon);
+ }
+ }
+
+ /* disconnect */
+ printf("\n%s disconnecting from %s\n", argv[0], addr);
+ sleep(1);
+
+ /* exit */
+ exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
+}
+
+void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)
+{
+ if (sockfd != NULL) BIO_free_all(sockfd);
+ if (client_daemon != NULL) pthread_cancel(*client_daemon);
+ exit(status);
+}
+
+
+
+void publish_callback(void** unused, struct mqtt_response_publish *published)
+{
+ /* not used in this example */
+}
+
+void* client_refresher(void* client)
+{
+ while(1)
+ {
+ mqtt_sync((struct mqtt_client*) client);
+ usleep(100000U);
+ }
+ return NULL;
+}
\ No newline at end of file
diff --git a/system/mqttc/openssl_publisher.c b/system/mqttc/openssl_publisher.c
new file mode 100644
index 00000000..a09fbbe8
--- /dev/null
+++ b/system/mqttc/openssl_publisher.c
@@ -0,0 +1,167 @@
+
+/**
+ * @file
+ */
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include <mqtt.h>
+#include "templates/openssl_sockets.h"
+
+
+/**
+ * @brief The function that would be called whenever a PUBLISH is received.
+ *
+ * @note This function is not used in this example.
+ */
+void publish_callback(void** unused, struct mqtt_response_publish *published);
+
+/**
+ * @brief The client's refresher. This function triggers back-end routines to
+ * handle ingress/egress traffic to the broker.
+ *
+ * @note All this function needs to do is call \ref __mqtt_recv and
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that
+ * client ingress/egress traffic will be handled every 100 ms.
+ */
+void* client_refresher(void* client);
+
+/**
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
+ */
+void exit_example(int status, BIO* sockfd, pthread_t *client_daemon);
+
+/**
+ * A simple program to that publishes the current time whenever ENTER is pressed.
+ */
+int main(int argc, const char *argv[])
+{
+ const char* addr;
+ const char* port;
+ const char* topic;
+ const char* ca_file;
+
+ /* Load OpenSSL */
+ SSL_load_error_strings();
+ ERR_load_BIO_strings();
+ OpenSSL_add_all_algorithms();
+ SSL_library_init();
+
+ SSL_CTX* ssl_ctx;
+ BIO* sockfd;
+
+ if (argc > 1) {
+ ca_file = argv[1];
+ } else {
+ printf("error: path to the CA certificate to use\n");
+ exit(1);
+ }
+
+ /* get address (argv[2] if present) */
+ if (argc > 2) {
+ addr = argv[2];
+ } else {
+ addr = "test.mosquitto.org";
+ }
+
+ /* get port number (argv[3] if present) */
+ if (argc > 3) {
+ port = argv[3];
+ } else {
+ port = "8883";
+ }
+
+ /* get the topic name to publish */
+ if (argc > 4) {
+ topic = argv[4];
+ } else {
+ topic = "datetime";
+ }
+
+ /* open the non-blocking TCP socket (connecting to the broker) */
+ open_nb_socket(&sockfd, &ssl_ctx, addr, port, ca_file, NULL);
+
+ if (sockfd == NULL) {
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+ }
+
+ /* setup a client */
+ struct mqtt_client client;
+ uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
+ uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
+ mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
+ mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
+
+ /* check that we don't have any errors */
+ if (client.error != MQTT_OK) {
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+ }
+
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */
+ pthread_t client_daemon;
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
+ fprintf(stderr, "Failed to start client daemon.\n");
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+
+ }
+
+ /* start publishing the time */
+ printf("%s is ready to begin publishing the time.\n", argv[0]);
+ printf("Press ENTER to publish the current time.\n");
+ printf("Press CTRL-D (or any other key) to exit.\n\n");
+ while(fgetc(stdin) == '\n') {
+ /* get the current time */
+ time_t timer;
+ time(&timer);
+ struct tm* tm_info = localtime(&timer);
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
+
+ /* print a message */
+ char application_message[256];
+ snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
+ printf("%s published : \"%s\"", argv[0], application_message);
+
+ /* publish the time */
+ mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
+
+ /* check for errors */
+ if (client.error != MQTT_OK) {
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
+ exit_example(EXIT_FAILURE, sockfd, &client_daemon);
+ }
+ }
+
+ /* disconnect */
+ printf("\n%s disconnecting from %s\n", argv[0], addr);
+ sleep(1);
+
+ /* exit */
+ exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
+}
+
+void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)
+{
+ if (sockfd != NULL) BIO_free_all(sockfd);
+ if (client_daemon != NULL) pthread_cancel(*client_daemon);
+ exit(status);
+}
+
+
+
+void publish_callback(void** unused, struct mqtt_response_publish *published)
+{
+ /* not used in this example */
+}
+
+void* client_refresher(void* client)
+{
+ while(1)
+ {
+ mqtt_sync((struct mqtt_client*) client);
+ usleep(100000U);
+ }
+ return NULL;
+}
\ No newline at end of file
diff --git a/system/mqttc/ping.c b/system/mqttc/ping.c
new file mode 100644
index 00000000..b99c5af4
--- /dev/null
+++ b/system/mqttc/ping.c
@@ -0,0 +1,573 @@
+/****************************************************************************
+ * apps/system/ping/ping.c
+ *
+ * Copyright (C) 2017-2018 Gregory Nutt. All rights reserved.
+ * Author: Gregory Nutt <gnutt@nuttx.org>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name NuttX nor the names of its contributors may be
+ * used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ ****************************************************************************/
+
+/****************************************************************************
+ * Included Files
+ ****************************************************************************/
+
+#include <nuttx/config.h>
+
+#include <sys/socket.h>
+#include <sys/socket.h>
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <time.h>
+#include <poll.h>
+#include <string.h>
+#include <errno.h>
+
+#if defined(CONFIG_LIBC_NETDB) && defined(CONFIG_NETDB_DNSCLIENT)
+# include <netdb.h>
+#endif
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <nuttx/net/icmp.h>
+
+/****************************************************************************
+ * Pre-processor Definitions
+ ****************************************************************************/
+
+#define ICMP_PING_DATALEN 56
+#define ICMP_IOBUFFER_SIZE sizeof(struct icmp_hdr_s) + ICMP_PING_DATALEN
+
+#define ICMP_NPINGS 10 /* Default number of pings */
+#define ICMP_POLL_DELAY 1000 /* 1 second in milliseconds */
+
+/****************************************************************************
+ * Private Types
+ ****************************************************************************/
+
+struct ping_info_s
+{
+ int sockfd; /* Open IPPROTO_ICMP socket */
+ FAR struct in_addr dest; /* Target address to ping */
+ uint16_t count; /* Number of pings requested */
+ uint16_t nrequests; /* Number of ICMP ECHO requests sent */
+ uint16_t nreplies; /* Number of matching ICMP ECHO replies received */
+ int16_t delay; /* Deciseconds to delay between pings */
+
+ /* I/O buffer for data transfers */
+
+ uint8_t iobuffer[ICMP_IOBUFFER_SIZE];
+};
+
+/****************************************************************************
+ * Private Data
+ ****************************************************************************/
+
+/* NOTE: This will not work in the kernel build where there will be a
+ * separate instance of g_pingid in every process space.
+ */
+
+static uint16_t g_pingid = 0;
+
+/****************************************************************************
+ * Private Functions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Name: ping_newid
+ ****************************************************************************/
+
+static inline uint16_t ping_newid(void)
+{
+ /* Revisit: No thread safe */
+
+ return ++g_pingid;
+}
+
+/****************************************************************************
+ * Name: ping_gethostip
+ *
+ * Description:
+ * Call gethostbyname() to get the IP address associated with a hostname.
+ *
+ * Input Parameters
+ * hostname - The host name to use in the nslookup.
+ * ipv4addr - The location to return the IPv4 address.
+ *
+ * Returned Value:
+ * Zero (OK) on success; a negated errno value on failure.
+ *
+ ****************************************************************************/
+
+static int ping_gethostip(FAR char *hostname, FAR struct ping_info_s *info)
+{
+#if defined(CONFIG_LIBC_NETDB) && defined(CONFIG_NETDB_DNSCLIENT)
+ /* Netdb DNS client support is enabled */
+
+ FAR struct hostent *he;
+
+ he = gethostbyname(hostname);
+ if (he == NULL)
+ {
+ nerr("ERROR: gethostbyname failed: %d\n", h_errno);
+ return -ENOENT;
+ }
+ else if (he->h_addrtype == AF_INET)
+ {
+ memcpy(&info->dest, he->h_addr, sizeof(in_addr_t));
+ }
+ else
+ {
+ nerr("ERROR: gethostbyname returned an address of type: %d\n",
+ he->h_addrtype);
+ return -ENOEXEC;
+ }
+
+ return OK;
+
+#else /* CONFIG_LIBC_NETDB */
+
+ /* No host name support */
+ /* Convert strings to numeric IPv6 address */
+
+ int ret = inet_pton(AF_INET, hostname, &info->dest);
+
+ /* The inet_pton() function returns 1 if the conversion succeeds. It will
+ * return 0 if the input is not a valid IPv4 dotted-decimal string or -1
+ * with errno set to EAFNOSUPPORT if the address family argument is
+ * unsupported.
+ */
+
+ return (ret > 0) ? OK : ERROR;
+
+#endif /* CONFIG_LIBC_NETDB */
+}
+
+/****************************************************************************
+ * Name: icmp_ping
+ ****************************************************************************/
+
+static void icmp_ping(FAR struct ping_info_s *info)
+{
+ struct sockaddr_in destaddr;
+ struct sockaddr_in fromaddr;
+ struct icmp_hdr_s outhdr;
+ FAR struct icmp_hdr_s *inhdr;
+ struct pollfd recvfd;
+ FAR uint8_t *ptr;
+ int32_t elapsed;
+ clock_t start;
+ socklen_t addrlen;
+ ssize_t nsent;
+ ssize_t nrecvd;
+ size_t outsize;
+ bool retry;
+ int delay;
+ int ret;
+ int ch;
+ int i;
+
+ memset(&destaddr, 0, sizeof(struct sockaddr_in));
+ destaddr.sin_family = AF_INET;
+ destaddr.sin_port = 0;
+ destaddr.sin_addr.s_addr = info->dest.s_addr;
+
+ memset(&outhdr, 0, sizeof(struct icmp_hdr_s));
+ outhdr.type = ICMP_ECHO_REQUEST;
+ outhdr.id = ping_newid();
+ outhdr.seqno = 0;
+
+ printf("PING %u.%u.%u.%u %d bytes of data\n",
+ (info->dest.s_addr ) & 0xff,
+ (info->dest.s_addr >> 8 ) & 0xff,
+ (info->dest.s_addr >> 16) & 0xff,
+ (info->dest.s_addr >> 24) & 0xff,
+ ICMP_PING_DATALEN);
+
+ while (info->nrequests < info->count)
+ {
+ /* Copy the ICMP header into the I/O buffer */
+
+ memcpy(info->iobuffer, &outhdr, sizeof(struct icmp_hdr_s));
+
+ /* Add some easily verifiable payload data */
+
+ ptr = &info->iobuffer[sizeof(struct icmp_hdr_s)];
+ ch = 0x20;
+
+ for (i = 0; i < ICMP_PING_DATALEN; i++)
+ {
+ *ptr++ = ch;
+ if (++ch > 0x7e)
+ {
+ ch = 0x20;
+ }
+ }
+
+ start = clock();
+ outsize = sizeof(struct icmp_hdr_s) + ICMP_PING_DATALEN;
+ nsent = sendto(info->sockfd, info->iobuffer, outsize, 0,
+ (FAR struct sockaddr*)&destaddr,
+ sizeof(struct sockaddr_in));
+ if (nsent < 0)
+ {
+ fprintf(stderr, "ERROR: sendto failed at seqno %u: %d\n",
+ outhdr.seqno, errno);
+ return;
+ }
+ else if (nsent != outsize)
+ {
+ fprintf(stderr, "ERROR: sendto returned %ld, expected %lu\n",
+ (long)nsent, (unsigned long)outsize);
+ return;
+ }
+
+ info->nrequests++;
+
+ delay = info->delay;
+ do
+ {
+ /* Wait for a reply with a timeout */
+
+ retry = false;
+
+ recvfd.fd = info->sockfd;
+ recvfd.events = POLLIN;
+ recvfd.revents = 0;
+
+ ret = poll(&recvfd, 1, delay);
+ if (ret < 0)
+ {
+ fprintf(stderr, "ERROR: poll failed: %d\n", errno);
+ return;
+ }
+ else if (ret == 0)
+ {
+ printf("No response from %u.%u.%u.%u: icmp_seq=%u time=%u ms\n",
+ (info->dest.s_addr ) & 0xff,
+ (info->dest.s_addr >> 8 ) & 0xff,
+ (info->dest.s_addr >> 16) & 0xff,
+ (info->dest.s_addr >> 24) & 0xff,
+ outhdr.seqno, info->delay);
+ continue;
+ }
+
+ /* Get the ICMP response (ignoring the sender) */
+
+ addrlen = sizeof(struct sockaddr_in);
+ nrecvd = recvfrom(info->sockfd, info->iobuffer,
+ ICMP_IOBUFFER_SIZE, 0,
+ (FAR struct sockaddr *)&fromaddr, &addrlen);
+ if (nrecvd < 0)
+ {
+ fprintf(stderr, "ERROR: recvfrom failed: %d\n", errno);
+ return;
+ }
+ else if (nrecvd < sizeof(struct icmp_hdr_s))
+ {
+ fprintf(stderr, "ERROR: short ICMP packet: %ld\n", (long)nrecvd);
+ return;
+ }
+
+ elapsed = (unsigned int)TICK2MSEC(clock() - start);
+ inhdr = (FAR struct icmp_hdr_s *)info->iobuffer;
+
+ if (inhdr->type == ICMP_ECHO_REPLY)
+ {
+ if (inhdr->id != outhdr.id)
+ {
+ fprintf(stderr,
+ "WARNING: Ignoring ICMP reply with ID %u. "
+ "Expected %u\n",
+ inhdr->id, outhdr.id);
+ retry = true;
+ }
+ else if (inhdr->seqno > outhdr.seqno)
+ {
+ fprintf(stderr,
+ "WARNING: Ignoring ICMP reply to sequence %u. "
+ "Expected <= &u\n",
+ inhdr->seqno, outhdr.seqno);
+ retry = true;
+ }
+ else
+ {
+ bool verified = true;
+ int32_t pktdelay = elapsed;
+
+ if (inhdr->seqno < outhdr.seqno)
+ {
+ fprintf(stderr, "WARNING: Received after timeout\n");
+ pktdelay += info->delay;
+ retry = true;
+ }
+
+ printf("%ld bytes from %u.%u.%u.%u: icmp_seq=%u time=%u ms\n",
+ nrecvd - sizeof(struct icmp_hdr_s),
+ (info->dest.s_addr ) & 0xff,
+ (info->dest.s_addr >> 8 ) & 0xff,
+ (info->dest.s_addr >> 16) & 0xff,
+ (info->dest.s_addr >> 24) & 0xff,
+ inhdr->seqno, pktdelay);
+
+ /* Verify the payload data */
+
+ if (nrecvd != outsize)
+ {
+ fprintf(stderr,
+ "WARNING: Ignoring ICMP reply with different payload "
+ "size: %ld vs %lu\n",
+ (long)nrecvd, (unsigned long)outsize);
+ verified = false;
+ }
+ else
+ {
+ ptr = &info->iobuffer[sizeof(struct icmp_hdr_s)];
+ ch = 0x20;
+
+ for (i = 0; i < ICMP_PING_DATALEN; i++, ptr++)
+ {
+ if (*ptr != ch)
+ {
+ fprintf(stderr, "WARNING: Echoed data corrupted\n");
+ verified = false;
+ break;
+ }
+
+ if (++ch > 0x7e)
+ {
+ ch = 0x20;
+ }
+ }
+ }
+
+ /* Only count the number of good replies */
+
+ if (verified)
+ {
+ info->nreplies++;
+ }
+ }
+ }
+ else
+ {
+ fprintf(stderr, "WARNING: ICMP packet with unknown type: %u\n",
+ inhdr->type);
+ }
+
+ delay -= elapsed;
+ }
+ while (retry && delay > 0);
+
+ /* Wait if necessary to preserved the requested ping rate */
+
+ elapsed = (unsigned int)TICK2MSEC(clock() - start);
+ if (elapsed < info->delay)
+ {
+ struct timespec rqt;
+ unsigned int remaining;
+ unsigned int sec;
+ unsigned int frac; /* In deciseconds */
+
+ remaining = info->delay - elapsed;
+ sec = remaining / MSEC_PER_SEC;
+ frac = remaining - MSEC_PER_SEC * sec;
+
+ rqt.tv_sec = sec;
+ rqt.tv_nsec = frac * NSEC_PER_MSEC;
+
+ (void)nanosleep(&rqt, NULL);
+ }
+
+ outhdr.seqno++;
+ }
+}
+
+/****************************************************************************
+ * Name: show_usage
+ ****************************************************************************/
+
+static void show_usage(FAR const char *progname, int exitcode) noreturn_function;
+static void show_usage(FAR const char *progname, int exitcode)
+{
+#if defined(CONFIG_LIBC_NETDB) && defined(CONFIG_NETDB_DNSCLIENT)
+ printf("\nUsage: %s [-c <count>] [-i <interval>] <hostname>\n", progname);
+ printf(" %s -h\n", progname);
+ printf("\nWhere:\n");
+ printf(" <hostname> is either an IPv6 address or the name of the remote host\n");
+ printf(" that is requested the ICMPv6 ECHO reply.\n");
+#else
+ printf("\nUsage: %s [-c <count>] [-i <interval>] <ip-address>\n", progname);
+ printf(" %s -h\n", progname);
+ printf("\nWhere:\n");
+ printf(" <ip-address> is the IPv4 address request the ICMP ECHO reply.\n");
+#endif
+ printf(" -c <count> determines the number of pings. Default %u.\n",
+ ICMP_NPINGS);
+ printf(" -i <interval> is the default delay between pings (milliseconds).\n");
+ printf(" Default %d.\n", ICMP_POLL_DELAY);
+ printf(" -h shows this text and exits.\n");
+ exit(exitcode);
+}
+
+/****************************************************************************
+ * Public Functions
+ ****************************************************************************/
+
+#ifdef CONFIG_BUILD_KERNEL
+int main(int argc, FAR char *argv[])
+#else
+int ping_main(int argc, char **argv)
+#endif
+{
+ FAR struct ping_info_s *info;
+ FAR char *endptr;
+ clock_t start;
+ int32_t elapsed;
+ int exitcode;
+ int option;
+
+ /* Allocate memory to hold ping information */
+
+ info = (FAR struct ping_info_s *)zalloc(sizeof(struct ping_info_s));
+ if (info == NULL)
+ {
+ fprintf(stderr, "ERROR: Failed to allocate memory\n", argv[1]);
+ return EXIT_FAILURE;
+ }
+
+ info->count = ICMP_NPINGS;
+ info->delay = ICMP_POLL_DELAY;
+
+ /* Parse command line options */
+
+ exitcode = EXIT_FAILURE;
+
+ while ((option = getopt(argc, argv, ":c:i:h")) != ERROR)
+ {
+ switch (option)
+ {
+ case 'c':
+ {
+ long count = strtol(optarg, &endptr, 10);
+ if (count < 1 || count > UINT16_MAX)
+ {
+ fprintf(stderr, "ERROR: <count> out of range: %ld\n", count);
+ goto errout_with_usage;
+ }
+
+ info->count = (uint16_t)count;
+ }
+ break;
+
+ case 'i':
+ {
+ long delay = strtol(optarg, &endptr, 10);
+ if (delay < 1 || delay > INT16_MAX)
+ {
+ fprintf(stderr, "ERROR: <interval> out of range: %ld\n", delay);
+ goto errout_with_usage;
+ }
+
+ info->delay = (int16_t)delay;
+ }
+ break;
+
+ case 'h':
+ exitcode = EXIT_SUCCESS;
+ goto errout_with_usage;
+
+ case ':':
+ fprintf(stderr, "ERROR: Missing required argument\n");
+ goto errout_with_usage;
+
+ case '?':
+ default:
+ fprintf(stderr, "ERROR: Unrecognized option\n");
+ goto errout_with_usage;
+ }
+ }
+
+ /* There should be one final parameters remaining on the command line */
+
+ if (optind >= argc)
+ {
+ printf("ERROR: Missing required <ip-address> argument\n");
+ free(info);
+ show_usage(argv[0], EXIT_FAILURE);
+ }
+
+ if (ping_gethostip(argv[optind], info) < 0)
+ {
+ fprintf(stderr, "ERROR: ping_gethostip(%s) failed\n", argv[optind]);
+ goto errout_with_info;
+ }
+
+ info->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP);
+ if (info->sockfd < 0)
+ {
+ fprintf(stderr, "ERROR: socket() failed: %d\n", errno);
+ goto errout_with_info;
+ }
+
+ start = clock();
+ icmp_ping(info);
+
+ /* Get the total elapsed time */
+
+ elapsed = (int32_t)TICK2MSEC(clock() - start);
+
+ if (info->nrequests > 0)
+ {
+ unsigned int tmp;
+
+ /* Calculate the percentage of lost packets */
+
+ tmp = (100 * (info->nrequests - info->nreplies) + (info->nrequests >> 1)) /
+ info->nrequests;
+
+ printf("%u packets transmitted, %u received, %u%% packet loss, time %ld ms\n",
+ info->nrequests, info->nreplies, tmp, (long)elapsed);
+ }
+
+ close(info->sockfd);
+ free(info);
+ return EXIT_SUCCESS;
+
+errout_with_usage:
+ free(info);
+ show_usage(argv[0], exitcode);
+ return exitcode; /* Not reachable */
+
+errout_with_info:
+ free(info);
+ return EXIT_FAILURE;
+}
diff --git a/system/mqttc/reconnect_subscriber.c b/system/mqttc/reconnect_subscriber.c
new file mode 100644
index 00000000..04748abe
--- /dev/null
+++ b/system/mqttc/reconnect_subscriber.c
@@ -0,0 +1,195 @@
+
+/**
+ * @file
+ * A simple subscriber program that performs automatic reconnections.
+ */
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include <mqtt.h>
+#include "templates/posix_sockets.h"
+
+/**
+ * @brief A structure that I will use to keep track of some data needed
+ * to setup the connection to the broker.
+ *
+ * An instance of this struct will be created in my \c main(). Then, whenever
+ * \ref reconnect_client is called, this instance will be passed.
+ */
+struct reconnect_state_t {
+ const char* hostname;
+ const char* port;
+ const char* topic;
+ uint8_t* sendbuf;
+ size_t sendbufsz;
+ uint8_t* recvbuf;
+ size_t recvbufsz;
+};
+
+
+/**
+ * @brief My reconnect callback. It will reestablish the connection whenever
+ * an error occurs.
+ */
+void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr);
+
+/**
+ * @brief The function will be called whenever a PUBLISH message is received.
+ */
+void publish_callback(void** unused, struct mqtt_response_publish *published);
+
+/**
+ * @brief The client's refresher. This function triggers back-end routines to
+ * handle ingress/egress traffic to the broker.
+ *
+ * @note All this function needs to do is call \ref __mqtt_recv and
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that
+ * client ingress/egress traffic will be handled every 100 ms.
+ */
+void* client_refresher(void* client);
+
+/**
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
+ */
+void exit_example(int status, int sockfd, pthread_t *client_daemon);
+
+
+int main(int argc, const char *argv[])
+{
+ const char* addr;
+ const char* port;
+ const char* topic;
+
+ /* get address (argv[1] if present) */
+ if (argc > 1) {
+ addr = argv[1];
+ } else {
+ addr = "test.mosquitto.org";
+ }
+
+ /* get port number (argv[2] if present) */
+ if (argc > 2) {
+ port = argv[2];
+ } else {
+ port = "1883";
+ }
+
+ /* get the topic name to publish */
+ if (argc > 3) {
+ topic = argv[3];
+ } else {
+ topic = "datetime";
+ }
+
+ /* build the reconnect_state structure which will be passed to reconnect */
+ struct reconnect_state_t reconnect_state;
+ reconnect_state.hostname = addr;
+ reconnect_state.port = port;
+ reconnect_state.topic = topic;
+ uint8_t sendbuf[2048];
+ uint8_t recvbuf[1024];
+ reconnect_state.sendbuf = sendbuf;
+ reconnect_state.sendbufsz = sizeof(sendbuf);
+ reconnect_state.recvbuf = recvbuf;
+ reconnect_state.recvbufsz = sizeof(recvbuf);
+
+ /* setup a client */
+ struct mqtt_client client;
+
+ mqtt_init_reconnect(&client,
+ reconnect_client, &reconnect_state,
+ publish_callback
+ );
+
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */
+ pthread_t client_daemon;
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
+ fprintf(stderr, "Failed to start client daemon.\n");
+ exit_example(EXIT_FAILURE, -1, NULL);
+
+ }
+
+ /* start publishing the time */
+ printf("%s listening for '%s' messages.\n", argv[0], topic);
+ printf("Press ENTER to inject an error.\n");
+ printf("Press CTRL-D to exit.\n\n");
+
+ /* block */
+ while(fgetc(stdin) != EOF) {
+ printf("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\"\n");
+ client.error = MQTT_ERROR_SOCKET_ERROR;
+ }
+
+ /* disconnect */
+ printf("\n%s disconnecting from %s\n", argv[0], addr);
+ sleep(1);
+
+ /* exit */
+ exit_example(EXIT_SUCCESS, client.socketfd, &client_daemon);
+}
+
+void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr)
+{
+ struct reconnect_state_t *reconnect_state = *((struct reconnect_state_t**) reconnect_state_vptr);
+
+ /* Close the clients socket if this isn't the initial reconnect call */
+ if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
+ close(client->socketfd);
+ }
+
+ /* Perform error handling here. */
+ if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
+ printf("reconnect_client: called while client was in error state \"%s\"\n",
+ mqtt_error_str(client->error)
+ );
+ }
+
+ /* Open a new socket. */
+ int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
+ if (sockfd == -1) {
+ perror("Failed to open socket: ");
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+ }
+
+ /* Reinitialize the client. */
+ mqtt_reinit(client, sockfd,
+ reconnect_state->sendbuf, reconnect_state->sendbufsz,
+ reconnect_state->recvbuf, reconnect_state->recvbufsz
+ );
+
+ /* Send connection request to the broker. */
+ mqtt_connect(client, "subscribing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
+
+ /* Subscribe to the topic. */
+ mqtt_subscribe(client, reconnect_state->topic, 0);
+}
+
+void exit_example(int status, int sockfd, pthread_t *client_daemon)
+{
+ if (sockfd != -1) close(sockfd);
+ if (client_daemon != NULL) pthread_cancel(*client_daemon);
+ exit(status);
+}
+
+void publish_callback(void** unused, struct mqtt_response_publish *published)
+{
+ /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
+ char* topic_name = (char*) malloc(published->topic_name_size + 1);
+ memcpy(topic_name, published->topic_name, published->topic_name_size);
+ topic_name[published->topic_name_size] = '\0';
+
+ printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message);
+
+ free(topic_name);
+}
+
+void* client_refresher(void* client)
+{
+ while(1)
+ {
+ mqtt_sync((struct mqtt_client*) client);
+ usleep(100000U);
+ }
+ return NULL;
+}
\ No newline at end of file
diff --git a/system/mqttc/simple_publisher.c b/system/mqttc/simple_publisher.c
new file mode 100644
index 00000000..1368cfe3
--- /dev/null
+++ b/system/mqttc/simple_publisher.c
@@ -0,0 +1,163 @@
+
+/**
+ * @file
+ * A simple program to that publishes the current time whenever ENTER is pressed.
+ */
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "netutils/mqtt.h"
+#include "templates/posix_sockets.h"
+
+
+/**
+ * @brief The function that would be called whenever a PUBLISH is received.
+ *
+ * @note This function is not used in this example.
+ */
+void publish_callback(void** unused, struct mqtt_response_publish *published);
+
+/**
+ * @brief The client's refresher. This function triggers back-end routines to
+ * handle ingress/egress traffic to the broker.
+ *
+ * @note All this function needs to do is call \ref __mqtt_recv and
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that
+ * client ingress/egress traffic will be handled every 100 ms.
+ */
+void* client_refresher(void* client);
+
+/**
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
+ */
+void exit_example(int status, int sockfd, pthread_t *client_daemon);
+
+/**
+ * A simple program to that publishes the current time whenever ENTER is pressed.
+ */
+#ifdef CONFIG_BUILD_KERNEL
+int main(int argc, FAR char *argv[])
+#else
+int mqttc_main(int argc, char **argv)
+#endif
+{
+ const char* addr;
+ const char* port;
+ const char* topic;
+ char application_message[256];
+ char stat[256];
+ int i;
+
+ /* get address (argv[1] if present) */
+ if (argc > 1) {
+ addr = argv[1];
+ } else {
+ addr = "test.mosquitto.org";
+ }
+
+ /* get port number (argv[2] if present) */
+ if (argc > 2) {
+ port = argv[2];
+ } else {
+ port = "1883";
+ }
+
+ /* get the topic name to publish */
+ if (argc > 3) {
+ topic = argv[3];
+ } else {
+ topic = "datetime";
+ }
+
+ /* open the non-blocking TCP socket (connecting to the broker) */
+ int sockfd = open_nb_socket(addr, port);
+
+ if (sockfd == -1) {
+ perror("Failed to open socket: ");
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+ }
+
+ /* setup a client */
+ struct mqtt_client client;
+ uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
+ uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
+ mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
+ mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
+
+ /* check that we don't have any errors */
+ if (client.error != MQTT_OK) {
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+ }
+
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */
+ pthread_t client_daemon;
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
+ fprintf(stderr, "Failed to start client daemon.\n");
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+
+ }
+
+ /* start publishing the time */
+ printf("%s is ready to begin publishing the time.\n", argv[0]);
+ printf("Press ENTER to publish the current time.\n");
+ printf("Press CTRL-D (or any other key) to exit.\n\n");
+ i=0;
+ while(fgetc(stdin) == '\r') {
+
+ /* get the current time */
+ time_t timer;
+ time(&timer);
+ struct tm* tm_info = localtime(&timer);
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
+
+ /* print a message */
+
+ snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
+ printf("%s published : \"%s\"", argv[0], application_message);
+
+ /* publish the time */
+ mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_0);
+
+ /* check for errors */
+ if (client.error != MQTT_OK) {
+ printf("error: %s\n", mqtt_error_str(client.error));
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
+ exit_example(EXIT_FAILURE, sockfd, &client_daemon);
+ }
+ i+=1;
+ }
+
+ /* disconnect */
+ printf("\n%s disconnecting from %s\n", argv[0], addr);
+ sleep(1);
+
+ /* exit */
+ exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
+}
+
+void exit_example(int status, int sockfd, pthread_t *client_daemon)
+{
+ if (sockfd != -1) close(sockfd);
+ if (client_daemon != NULL) pthread_cancel(*client_daemon);
+ exit(status);
+}
+
+
+
+void publish_callback(void** unused, struct mqtt_response_publish *published)
+{
+ /* not used in this example */
+}
+
+void* client_refresher(void* client)
+{
+ while(1)
+ {
+ mqtt_sync((struct mqtt_client*) client);
+ usleep(100000U);
+ }
+ return NULL;
+}
diff --git a/system/mqttc/simple_subscriber.c b/system/mqttc/simple_subscriber.c
new file mode 100644
index 00000000..f385086a
--- /dev/null
+++ b/system/mqttc/simple_subscriber.c
@@ -0,0 +1,137 @@
+
+/**
+ * @file
+ * A simple program that subscribes to a topic.
+ */
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include <mqtt.h>
+#include "templates/posix_sockets.h"
+
+
+/**
+ * @brief The function will be called whenever a PUBLISH message is received.
+ */
+void publish_callback(void** unused, struct mqtt_response_publish *published);
+
+/**
+ * @brief The client's refresher. This function triggers back-end routines to
+ * handle ingress/egress traffic to the broker.
+ *
+ * @note All this function needs to do is call \ref __mqtt_recv and
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that
+ * client ingress/egress traffic will be handled every 100 ms.
+ */
+void* client_refresher(void* client);
+
+/**
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
+ */
+void exit_example(int status, int sockfd, pthread_t *client_daemon);
+
+int main(int argc, const char *argv[])
+{
+ const char* addr;
+ const char* port;
+ const char* topic;
+
+ /* get address (argv[1] if present) */
+ if (argc > 1) {
+ addr = argv[1];
+ } else {
+ addr = "test.mosquitto.org";
+ }
+
+ /* get port number (argv[2] if present) */
+ if (argc > 2) {
+ port = argv[2];
+ } else {
+ port = "1883";
+ }
+
+ /* get the topic name to publish */
+ if (argc > 3) {
+ topic = argv[3];
+ } else {
+ topic = "datetime";
+ }
+
+ /* open the non-blocking TCP socket (connecting to the broker) */
+ int sockfd = open_nb_socket(addr, port);
+
+ if (sockfd == -1) {
+ perror("Failed to open socket: ");
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+ }
+
+ /* setup a client */
+ struct mqtt_client client;
+ uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
+ uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
+ mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
+ mqtt_connect(&client, "subscribing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
+
+ /* check that we don't have any errors */
+ if (client.error != MQTT_OK) {
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+ }
+
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */
+ pthread_t client_daemon;
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
+ fprintf(stderr, "Failed to start client daemon.\n");
+ exit_example(EXIT_FAILURE, sockfd, NULL);
+
+ }
+
+ /* subscribe */
+ mqtt_subscribe(&client, topic, 0);
+
+ /* start publishing the time */
+ printf("%s listening for '%s' messages.\n", argv[0], topic);
+ printf("Press CTRL-D to exit.\n\n");
+
+ /* block */
+ while(fgetc(stdin) != EOF);
+
+ /* disconnect */
+ printf("\n%s disconnecting from %s\n", argv[0], addr);
+ sleep(1);
+
+ /* exit */
+ exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
+}
+
+void exit_example(int status, int sockfd, pthread_t *client_daemon)
+{
+ if (sockfd != -1) close(sockfd);
+ if (client_daemon != NULL) pthread_cancel(*client_daemon);
+ exit(status);
+}
+
+
+
+void publish_callback(void** unused, struct mqtt_response_publish *published)
+{
+ /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
+ char* topic_name = (char*) malloc(published->topic_name_size + 1);
+ memcpy(topic_name, published->topic_name, published->topic_name_size);
+ topic_name[published->topic_name_size] = '\0';
+
+ printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message);
+
+ free(topic_name);
+}
+
+void* client_refresher(void* client)
+{
+ while(1)
+ {
+ mqtt_sync((struct mqtt_client*) client);
+ usleep(100000U);
+ }
+ return NULL;
+}
\ No newline at end of file
diff --git a/system/mqttc/templates/bio_sockets.h b/system/mqttc/templates/bio_sockets.h
new file mode 100644
index 00000000..78acd606
--- /dev/null
+++ b/system/mqttc/templates/bio_sockets.h
@@ -0,0 +1,28 @@
+#ifndef __BIO_SOCKET_TEMPLATE_H__
+#define __BIO_SOCKET_TEMPLATE_H__
+
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+/*
+ A template for opening a non-blocking BIO socket.
+*/
+BIO* open_nb_socket(const char* addr, const char* port) {
+ BIO* bio = BIO_new_connect(addr);
+ BIO_set_nbio(bio, 1);
+ BIO_set_conn_port(bio, port);
+
+ /* timeout after 10 seconds */
+ int start_time = time(NULL);
+ while(BIO_do_connect(bio) == 0 && (int)time(NULL) - start_time < 10);
+
+ if (BIO_do_connect(bio) <= 0) {
+ fprintf(stderr, "Failed to open socket: BIO_do_connect returned <= 0\n");
+ return NULL;
+ }
+
+ return bio;
+}
+
+#endif
\ No newline at end of file
diff --git a/system/mqttc/templates/openssl_sockets.h b/system/mqttc/templates/openssl_sockets.h
new file mode 100644
index 00000000..7e86857a
--- /dev/null
+++ b/system/mqttc/templates/openssl_sockets.h
@@ -0,0 +1,49 @@
+#ifndef __OPENSSL_SOCKET_TEMPLATE_H__
+#define __OPENSSL_SOCKET_TEMPLATE_H__
+
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+/*
+ A template for opening a non-blocking OpenSSL connection.
+*/
+void open_nb_socket(BIO** bio, SSL_CTX** ssl_ctx, const char* addr, const char* port, const char* ca_file, const char* ca_path) {
+ *ssl_ctx = SSL_CTX_new(SSLv23_client_method());
+ SSL* ssl;
+
+ /* load certificate */
+ if (!SSL_CTX_load_verify_locations(*ssl_ctx, ca_file, ca_path)) {
+ printf("error: failed to load certificate\n");
+ exit(1);
+ }
+
+ /* open BIO socket */
+ *bio = BIO_new_ssl_connect(*ssl_ctx);
+ BIO_get_ssl(*bio, &ssl);
+ SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);
+ BIO_set_conn_hostname(*bio, addr);
+ BIO_set_nbio(*bio, 1);
+ BIO_set_conn_port(*bio, port);
+
+ /* wait for connect with 10 second timeout */
+ int start_time = time(NULL);
+ while(BIO_do_connect(*bio) <= 0 && (int)time(NULL) - start_time < 10);
+ if (BIO_do_connect(*bio) <= 0) {
+ printf("error: %s\n", ERR_reason_error_string(ERR_get_error()));
+ BIO_free_all(*bio);
+ SSL_CTX_free(*ssl_ctx);
+ *bio = NULL;
+ *ssl_ctx=NULL;
+ return;
+ }
+
+ /* verify certificate */
+ if (SSL_get_verify_result(ssl) != X509_V_OK) {
+ /* Handle the failed verification */
+ printf("error: x509 certificate verification failed\n");
+ exit(1);
+ }
+}
+
+#endif
\ No newline at end of file
diff --git a/system/mqttc/templates/posix_sockets.h b/system/mqttc/templates/posix_sockets.h
new file mode 100644
index 00000000..dcd172ac
--- /dev/null
+++ b/system/mqttc/templates/posix_sockets.h
@@ -0,0 +1,50 @@
+#ifndef __POSIX_SOCKET_TEMPLATE_H__
+#define __POSIX_SOCKET_TEMPLATE_H__
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <fcntl.h>
+
+/*
+ A template for opening a non-blocking POSIX socket.
+*/
+int open_nb_socket(const char* addr, const char* port) {
+ struct addrinfo hints = {0};
+
+ hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */
+ hints.ai_socktype = SOCK_STREAM; /* Must be TCP */
+ int sockfd = -1;
+ int rv;
+ struct addrinfo *p, *servinfo;
+
+ /* get address information */
+ rv = getaddrinfo(addr, port, &hints, &servinfo);
+ if(rv != 0) {
+ fprintf(stderr, "Failed to open socket (getaddrinfo): %s\n", gai_strerror(rv));
+ return -1;
+ }
+
+ /* open the first possible socket */
+ for(p = servinfo; p != NULL; p = p->ai_next) {
+ sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
+ if (sockfd == -1) continue;
+
+ /* connect to server */
+ rv = connect(sockfd, servinfo->ai_addr, servinfo->ai_addrlen);
+ if(rv == -1) continue;
+ break;
+ }
+
+ /* free servinfo */
+ freeaddrinfo(servinfo);
+
+ /* make non-blocking */
+ if (sockfd != -1) fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
+
+ /* return the new socket fd */
+ return sockfd;
+}
+
+#endif
\ No newline at end of file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment