Last active
August 18, 2020 01:49
-
-
Save mickey-happygolucky/8f311691be2b8000f82628b56ded435c to your computer and use it in GitHub Desktop.
a patch to work mqtt-c on NuttX(based on https://bitbucket.org/nuttx/nuttx/issues/attachments/100/nuttx/nuttx/1528910417.89/100/mqtt_nuttx_apps.patch)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/include/netutils/mqtt.h b/include/netutils/mqtt.h | |
new file mode 100644 | |
index 00000000..b2077e67 | |
--- /dev/null | |
+++ b/include/netutils/mqtt.h | |
@@ -0,0 +1,1509 @@ | |
+#ifndef __MQTT_H__ | |
+#define __MQTT_H__ | |
+ | |
+#include "mqtt_pal.h" | |
+ | |
+/** | |
+ * @file | |
+ * @brief Declares all the MQTT-C functions and datastructures. | |
+ * | |
+ * @note You should <code>\#include <mqtt.h></code>. | |
+ * | |
+ * @example simple_publisher.c | |
+ * A simple program to that publishes the current time whenever ENTER is pressed. | |
+ * | |
+ * Usage: | |
+ * \code{.sh} | |
+ * ./bin/simple_publisher [address [port [topic]]] | |
+ * \endcode | |
+ * | |
+ * Where \c address is the address of the MQTT broker, \c port is the port number the | |
+ * MQTT broker is running on, and \c topic is the name of the topic to publish with. Note | |
+ * that all these arguments are optional and the defaults are \c address = \c "test.mosquitto.org", | |
+ * \c port = \c "1883", and \c topic = "datetime". | |
+ * | |
+ * @example simple_subscriber.c | |
+ * A simple program that subscribes to a single topic and prints all updates that are received. | |
+ * | |
+ * Usage: | |
+ * \code{.sh} | |
+ * ./bin/simple_subscriber [address [port [topic]]] | |
+ * \endcode | |
+ * | |
+ * Where \c address is the address of the MQTT broker, \c port is the port number the | |
+ * MQTT broker is running on, and \c topic is the name of the topic subscribe to. Note | |
+ * that all these arguments are optional and the defaults are \c address = \c "test.mosquitto.org", | |
+ * \c port = \c "1883", and \c topic = "datetime". | |
+ * | |
+ * @example reconnect_subscriber.c | |
+ * Same program as \ref simple_subscriber.c, but using the automatic reconnect functionality. | |
+ * | |
+ * @example bio_publisher.c | |
+ * Same program as \ref simple_publisher.c, but uses a unencrypted BIO socket. | |
+ * | |
+ * @example openssl_publisher.c | |
+ * Same program as \ref simple_publisher.c, but over an encrypted connection using OpenSSL. | |
+ * | |
+ * Usage: | |
+ * \code{.sh} | |
+ * ./bin/openssl_publisher ca_file [address [port [topic]]] | |
+ * \endcode | |
+ * | |
+ * | |
+ * @defgroup api API | |
+ * @brief Documentation of everything you need to know to use the MQTT-C client. | |
+ * | |
+ * This module contains everything you need to know to use MQTT-C in your application. | |
+ * For usage examples see: | |
+ * - @ref simple_publisher.c | |
+ * - @ref simple_subscriber.c | |
+ * - @ref reconnect_subscriber.c | |
+ * - @ref bio_publisher.c | |
+ * - @ref openssl_publisher.c | |
+ * | |
+ * @note MQTT-C can be used in both single-threaded and multi-threaded applications. All | |
+ * the functions in \ref api are thread-safe. | |
+ * | |
+ * @defgroup packers Control Packet Serialization | |
+ * @brief Developer documentation of the functions and datastructures used for serializing MQTT | |
+ * control packets. | |
+ * | |
+ * @defgroup unpackers Control Packet Deserialization | |
+ * @brief Developer documentation of the functions and datastructures used for deserializing MQTT | |
+ * control packets. | |
+ * | |
+ * @defgroup details Utilities | |
+ * @brief Developer documentation for the utilities used to implement the MQTT-C client. | |
+ * | |
+ * @note To deserialize a packet from a buffer use \ref mqtt_unpack_response (it's the only | |
+ * function you need). | |
+ */ | |
+ | |
+ | |
+ /** | |
+ * @brief An enumeration of the MQTT control packet types. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718021"> | |
+ * MQTT v3.1.1: MQTT Control Packet Types | |
+ * </a> | |
+ */ | |
+ enum MQTTControlPacketType { | |
+ MQTT_CONTROL_CONNECT=1u, | |
+ MQTT_CONTROL_CONNACK=2u, | |
+ MQTT_CONTROL_PUBLISH=3u, | |
+ MQTT_CONTROL_PUBACK=4u, | |
+ MQTT_CONTROL_PUBREC=5u, | |
+ MQTT_CONTROL_PUBREL=6u, | |
+ MQTT_CONTROL_PUBCOMP=7u, | |
+ MQTT_CONTROL_SUBSCRIBE=8u, | |
+ MQTT_CONTROL_SUBACK=9u, | |
+ MQTT_CONTROL_UNSUBSCRIBE=10u, | |
+ MQTT_CONTROL_UNSUBACK=11u, | |
+ MQTT_CONTROL_PINGREQ=12u, | |
+ MQTT_CONTROL_PINGRESP=13u, | |
+ MQTT_CONTROL_DISCONNECT=14u | |
+}; | |
+ | |
+/** | |
+ * @brief The fixed header of an MQTT control packet. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020"> | |
+ * MQTT v3.1.1: Fixed Header | |
+ * </a> | |
+ */ | |
+struct mqtt_fixed_header { | |
+ /** The type of packet. */ | |
+ enum MQTTControlPacketType control_type; | |
+ | |
+ /** The packets control flags.*/ | |
+ uint8_t control_flags: 4; | |
+ | |
+ /** The remaining size of the packet in bytes (i.e. the size of variable header and payload).*/ | |
+ uint32_t remaining_length; | |
+}; | |
+ | |
+/** | |
+ * @brief The protocol identifier for MQTT v3.1.1. | |
+ * @ingroup packers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030"> | |
+ * MQTT v3.1.1: CONNECT Variable Header. | |
+ * </a> | |
+ */ | |
+#define MQTT_PROTOCOL_LEVEL 0x04 | |
+ | |
+/** | |
+ * @brief A macro used to declare the enum MQTTErrors and associated | |
+ * error messages (the members of the num) at the same time. | |
+ */ | |
+#define __ALL_MQTT_ERRORS(MQTT_ERROR) \ | |
+ MQTT_ERROR(MQTT_ERROR_NULLPTR) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONTROL_FORBIDDEN_TYPE) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONTROL_INVALID_FLAGS) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONTROL_WRONG_TYPE) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONNECT_NULL_CLIENT_ID) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONNECT_NULL_WILL_MESSAGE) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONNECT_FORBIDDEN_WILL_QOS) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONNACK_FORBIDDEN_FLAGS) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONNACK_FORBIDDEN_CODE) \ | |
+ MQTT_ERROR(MQTT_ERROR_PUBLISH_FORBIDDEN_QOS) \ | |
+ MQTT_ERROR(MQTT_ERROR_SUBSCRIBE_TOO_MANY_TOPICS) \ | |
+ MQTT_ERROR(MQTT_ERROR_MALFORMED_RESPONSE) \ | |
+ MQTT_ERROR(MQTT_ERROR_UNSUBSCRIBE_TOO_MANY_TOPICS) \ | |
+ MQTT_ERROR(MQTT_ERROR_RESPONSE_INVALID_CONTROL_TYPE) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONNECT_NOT_CALLED) \ | |
+ MQTT_ERROR(MQTT_ERROR_SEND_BUFFER_IS_FULL) \ | |
+ MQTT_ERROR(MQTT_ERROR_SOCKET_ERROR) \ | |
+ MQTT_ERROR(MQTT_ERROR_MALFORMED_REQUEST) \ | |
+ MQTT_ERROR(MQTT_ERROR_RECV_BUFFER_TOO_SMALL) \ | |
+ MQTT_ERROR(MQTT_ERROR_ACK_OF_UNKNOWN) \ | |
+ MQTT_ERROR(MQTT_ERROR_NOT_IMPLEMENTED) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONNECTION_REFUSED) \ | |
+ MQTT_ERROR(MQTT_ERROR_SUBSCRIBE_FAILED) \ | |
+ MQTT_ERROR(MQTT_ERROR_CONNECTION_CLOSED) \ | |
+ MQTT_ERROR(MQTT_ERROR_INITIAL_RECONNECT) \ | |
+ | |
+/* todo: add more connection refused errors */ | |
+ | |
+/** | |
+ * @brief A macro used to generate the enum MQTTErrors from | |
+ * \ref __ALL_MQTT_ERRORS | |
+ * @see __ALL_MQTT_ERRORS | |
+*/ | |
+#define GENERATE_ENUM(ENUM) ENUM, | |
+ | |
+/** | |
+ * @brief A macro used to generate the error messages associated with | |
+ * MQTTErrors from \ref __ALL_MQTT_ERRORS | |
+ * @see __ALL_MQTT_ERRORS | |
+*/ | |
+#define GENERATE_STRING(STRING) #STRING, | |
+ | |
+ | |
+/** | |
+ * @brief An enumeration of error codes. Error messages can be retrieved by calling \ref mqtt_error_str. | |
+ * @ingroup api | |
+ * | |
+ * @see mqtt_error_str | |
+ */ | |
+enum MQTTErrors { | |
+ MQTT_ERROR_UNKNOWN=INT_MIN, | |
+ __ALL_MQTT_ERRORS(GENERATE_ENUM) | |
+ MQTT_OK = 1 | |
+}; | |
+ | |
+/** | |
+ * @brief Returns an error message for error code, \p error. | |
+ * @ingroup api | |
+ * | |
+ * @param[in] error the error code. | |
+ * | |
+ * @returns The associated error message. | |
+ */ | |
+const char* mqtt_error_str(enum MQTTErrors error); | |
+ | |
+/** | |
+ * @brief Pack a MQTT string, given a c-string \p str. | |
+ * | |
+ * @param[out] buf the buffer that the MQTT string will be written to. | |
+ * @param[in] str the c-string to be written to \p buf. | |
+ * | |
+ * @warning This function provides no error checking. | |
+ * | |
+ * @returns strlen(str) + 2 | |
+*/ | |
+ssize_t __mqtt_pack_str(uint8_t *buf, const char* str); | |
+ | |
+/** @brief A macro to get the MQTT string length from a c-string. */ | |
+#define __mqtt_packed_cstrlen(x) (2 + strlen(x)) | |
+ | |
+/* RESPONSES */ | |
+ | |
+/** | |
+ * @brief An enumeration of the return codes returned in a CONNACK packet. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.1_-"> | |
+ * MQTT v3.1.1: CONNACK return codes. | |
+ * </a> | |
+ */ | |
+enum MQTTConnackReturnCode { | |
+ MQTT_CONNACK_ACCEPTED = 0u, | |
+ MQTT_CONNACK_REFUSED_PROTOCOL_VERSION = 1u, | |
+ MQTT_CONNACK_REFUSED_IDENTIFIER_REJECTED = 2u, | |
+ MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3u, | |
+ MQTT_CONNACK_REFUSED_BAD_USER_NAME_OR_PASSWORD = 4u, | |
+ MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5u | |
+}; | |
+ | |
+/** | |
+ * @brief A connection response datastructure. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033"> | |
+ * MQTT v3.1.1: CONNACK - Acknowledgement connection response. | |
+ * </a> | |
+ */ | |
+struct mqtt_response_connack { | |
+ /** | |
+ * @brief Allows client and broker to check if they have a consistent view about whether there is | |
+ * already a stored session state. | |
+ */ | |
+ uint8_t session_present_flag; | |
+ | |
+ /** | |
+ * @brief The return code of the connection request. | |
+ * | |
+ * @see MQTTConnackReturnCode | |
+ */ | |
+ enum MQTTConnackReturnCode return_code; | |
+}; | |
+ | |
+ /** | |
+ * @brief A publish packet received from the broker. | |
+ * @ingroup unpackers | |
+ * | |
+ * A publish packet is received from the broker when a client publishes to a topic that the | |
+ * \em {local client} is subscribed to. | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037"> | |
+ * MQTT v3.1.1: PUBLISH - Publish Message. | |
+ * </a> | |
+ */ | |
+struct mqtt_response_publish { | |
+ /** | |
+ * @brief The DUP flag. DUP flag is 0 if its the first attempt to send this publish packet. A DUP flag | |
+ * of 1 means that this might be a re-delivery of the packet. | |
+ */ | |
+ uint8_t dup_flag; | |
+ | |
+ /** | |
+ * @brief The quality of service level. | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.11_-"> | |
+ * MQTT v3.1.1: QoS Definitions | |
+ * </a> | |
+ */ | |
+ uint8_t qos_level; | |
+ | |
+ /** @brief The retain flag of this publish message. */ | |
+ uint8_t retain_flag; | |
+ | |
+ /** @brief Size of the topic name (number of characters). */ | |
+ uint16_t topic_name_size; | |
+ | |
+ /** | |
+ * @brief The topic name. | |
+ * @note topic_name is not null terminated. Therefore topic_name_size must be used to get the | |
+ * string length. | |
+ */ | |
+ const void* topic_name; | |
+ | |
+ /** @brief The publish message's packet ID. */ | |
+ uint16_t packet_id; | |
+ | |
+ /** @brief The publish message's application message.*/ | |
+ const void* application_message; | |
+ | |
+ /** @brief The size of the application message in bytes. */ | |
+ size_t application_message_size; | |
+}; | |
+ | |
+/** | |
+ * @brief A publish acknowledgement for messages that were published with QoS level 1. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043"> | |
+ * MQTT v3.1.1: PUBACK - Publish Acknowledgement. | |
+ * </a> | |
+ * | |
+ */ | |
+struct mqtt_response_puback { | |
+ /** @brief The published messages packet ID. */ | |
+ uint16_t packet_id; | |
+}; | |
+ | |
+/** | |
+ * @brief The response packet to a PUBLISH packet with QoS level 2. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048"> | |
+ * MQTT v3.1.1: PUBREC - Publish Received. | |
+ * </a> | |
+ * | |
+ */ | |
+struct mqtt_response_pubrec { | |
+ /** @brief The published messages packet ID. */ | |
+ uint16_t packet_id; | |
+}; | |
+ | |
+/** | |
+ * @brief The response to a PUBREC packet. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718053"> | |
+ * MQTT v3.1.1: PUBREL - Publish Release. | |
+ * </a> | |
+ * | |
+ */ | |
+struct mqtt_response_pubrel { | |
+ /** @brief The published messages packet ID. */ | |
+ uint16_t packet_id; | |
+}; | |
+ | |
+/** | |
+ * @brief The response to a PUBREL packet. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718058"> | |
+ * MQTT v3.1.1: PUBCOMP - Publish Complete. | |
+ * </a> | |
+ * | |
+ */ | |
+struct mqtt_response_pubcomp { | |
+ /** T@brief he published messages packet ID. */ | |
+ uint16_t packet_id; | |
+}; | |
+ | |
+/** | |
+ * @brief An enumeration of subscription acknowledgement return codes. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.26_-"> | |
+ * MQTT v3.1.1: SUBACK Return Codes. | |
+ * </a> | |
+ */ | |
+enum MQTTSubackReturnCodes { | |
+ MQTT_SUBACK_SUCCESS_MAX_QOS_0 = 0u, | |
+ MQTT_SUBACK_SUCCESS_MAX_QOS_1 = 1u, | |
+ MQTT_SUBACK_SUCCESS_MAX_QOS_2 = 2u, | |
+ MQTT_SUBACK_FAILURE = 128u | |
+}; | |
+ | |
+/** | |
+ * @brief The response to a subscription request. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718068"> | |
+ * MQTT v3.1.1: SUBACK - Subscription Acknowledgement. | |
+ * </a> | |
+ */ | |
+struct mqtt_response_suback { | |
+ /** @brief The published messages packet ID. */ | |
+ uint16_t packet_id; | |
+ | |
+ /** | |
+ * Array of return codes corresponding to the requested subscribe topics. | |
+ * | |
+ * @see MQTTSubackReturnCodes | |
+ */ | |
+ const uint8_t *return_codes; | |
+ | |
+ /** The number of return codes. */ | |
+ size_t num_return_codes; | |
+}; | |
+ | |
+/** | |
+ * @brief The brokers response to a UNSUBSCRIBE request. | |
+ * @ingroup unpackers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077"> | |
+ * MQTT v3.1.1: UNSUBACK - Unsubscribe Acknowledgement. | |
+ * </a> | |
+ */ | |
+struct mqtt_response_unsuback { | |
+ /** @brief The published messages packet ID. */ | |
+ uint16_t packet_id; | |
+}; | |
+ | |
+/** | |
+ * @brief The response to a ping request. | |
+ * @ingroup unpackers | |
+ * | |
+ * @note This response contains no members. | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718086"> | |
+ * MQTT v3.1.1: PINGRESP - Ping Response. | |
+ * </a> | |
+ */ | |
+struct mqtt_response_pingresp {}; | |
+ | |
+/** | |
+ * @brief A struct used to deserialize/interpret an incoming packet from the broker. | |
+ * @ingroup unpackers | |
+ */ | |
+struct mqtt_response { | |
+ /** @brief The mqtt_fixed_header of the deserialized packet. */ | |
+ struct mqtt_fixed_header fixed_header; | |
+ | |
+ /** | |
+ * @brief A union of the possible responses from the broker. | |
+ * | |
+ * @note The fixed_header contains the control type. This control type corresponds to the | |
+ * member of this union that should be accessed. For example if | |
+ * fixed_header#control_type == \c MQTT_CONTROL_PUBLISH then | |
+ * decoded#publish should be accessed. | |
+ */ | |
+ union { | |
+ struct mqtt_response_connack connack; | |
+ struct mqtt_response_publish publish; | |
+ struct mqtt_response_puback puback; | |
+ struct mqtt_response_pubrec pubrec; | |
+ struct mqtt_response_pubrel pubrel; | |
+ struct mqtt_response_pubcomp pubcomp; | |
+ struct mqtt_response_suback suback; | |
+ struct mqtt_response_unsuback unsuback; | |
+ struct mqtt_response_pingresp pingresp; | |
+ } decoded; | |
+}; | |
+ | |
+/** | |
+ * @brief Deserialize the contents of \p buf into an mqtt_fixed_header object. | |
+ * @ingroup unpackers | |
+ * | |
+ * @note This function performs complete error checking and a positive return value | |
+ * means the entire mqtt_response can be deserialized from \p buf. | |
+ * | |
+ * @param[out] response the response who's \ref mqtt_response.fixed_header will be initialized. | |
+ * @param[in] buf the buffer. | |
+ * @param[in] bufsz the total number of bytes in the buffer. | |
+ * | |
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough | |
+ * bytes to parse the packet, or a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t *buf, size_t bufsz); | |
+ | |
+/** | |
+ * @brief Deserialize a CONNACK response from \p buf. | |
+ * @ingroup unpackers | |
+ * | |
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the control packet type | |
+ * must be \c MQTT_CONTROL_CONNACK. | |
+ * | |
+ * @param[out] mqtt_response the mqtt_response that will be initialized. | |
+ * @param[in] buf the buffer that contains the variable header and payload of the packet. The | |
+ * first byte of \p buf should be the first byte of the variable header. | |
+ * | |
+ * @relates mqtt_response_connack | |
+ * | |
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough | |
+ * bytes to parse the packet, or a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_unpack_connack_response (struct mqtt_response *mqtt_response, const uint8_t *buf); | |
+ | |
+/** | |
+ * @brief Deserialize a publish response from \p buf. | |
+ * @ingroup unpackers | |
+ * | |
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must | |
+ * have a control type of \c MQTT_CONTROL_PUBLISH. | |
+ * | |
+ * @param[out] mqtt_response the response that is initialized from the contents of \p buf. | |
+ * @param[in] buf the buffer with the incoming data. | |
+ * | |
+ * @relates mqtt_response_publish | |
+ * | |
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough | |
+ * bytes to parse the packet, or a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_unpack_publish_response (struct mqtt_response *mqtt_response, const uint8_t *buf); | |
+ | |
+/** | |
+ * @brief Deserialize a PUBACK/PUBREC/PUBREL/PUBCOMP packet from \p buf. | |
+ * @ingroup unpackers | |
+ * | |
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must | |
+ * have a control type of \c MQTT_CONTROL_PUBACK, \c MQTT_CONTROL_PUBREC, \c MQTT_CONTROL_PUBREL | |
+ * or \c MQTT_CONTROL_PUBCOMP. | |
+ * | |
+ * @param[out] mqtt_response the response that is initialized from the contents of \p buf. | |
+ * @param[in] buf the buffer with the incoming data. | |
+ * | |
+ * @relates mqtt_response_puback mqtt_response_pubrec mqtt_response_pubrel mqtt_response_pubcomp | |
+ * | |
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough | |
+ * bytes to parse the packet, or a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf); | |
+ | |
+/** | |
+ * @brief Deserialize a SUBACK packet from \p buf. | |
+ * @ingroup unpacker | |
+ * | |
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must | |
+ * have a control type of \c MQTT_CONTROL_SUBACK. | |
+ * | |
+ * @param[out] mqtt_response the response that is initialized from the contents of \p buf. | |
+ * @param[in] buf the buffer with the incoming data. | |
+ * | |
+ * @relates mqtt_response_suback | |
+ * | |
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough | |
+ * bytes to parse the packet, or a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_unpack_suback_response(struct mqtt_response *mqtt_response, const uint8_t *buf); | |
+ | |
+/** | |
+ * @brief Deserialize an UNSUBACK packet from \p buf. | |
+ * @ingroup unpacker | |
+ * | |
+ * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must | |
+ * have a control type of \c MQTT_CONTROL_UNSUBACK. | |
+ * | |
+ * @param[out] mqtt_response the response that is initialized from the contents of \p buf. | |
+ * @param[in] buf the buffer with the incoming data. | |
+ * | |
+ * @relates mqtt_response_unsuback | |
+ * | |
+ * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough | |
+ * bytes to parse the packet, or a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf); | |
+ | |
+/** | |
+ * @brief Deserialize a packet from the broker. | |
+ * @ingroup unpackers | |
+ * | |
+ * @param[out] response the mqtt_response that will be initialize from \p buf. | |
+ * @param[in] buf the incoming data buffer. | |
+ * @param[in] bufsz the number of bytes available in the buffer. | |
+ * | |
+ * @relates mqtt_response | |
+ * | |
+ * @returns The number of bytes consumed on success, zero \p buf does not contain enough bytes | |
+ * to deserialize the packet, a negative value if a protocol violation was encountered. | |
+ */ | |
+ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, size_t bufsz); | |
+ | |
+/* REQUESTS */ | |
+ | |
+ /** | |
+ * @brief Serialize an mqtt_fixed_header and write it to \p buf. | |
+ * @ingroup packers | |
+ * | |
+ * @note This function performs complete error checking and a positive return value | |
+ * guarantees the entire packet will fit into the given buffer. | |
+ * | |
+ * @param[out] buf the buffer to write to. | |
+ * @param[in] bufsz the maximum number of bytes that can be put in to \p buf. | |
+ * @param[in] fixed_header the fixed header that will be serialized. | |
+ * | |
+ * @returns The number of bytes written to \p buf, or 0 if \p buf is too small, or a | |
+ * negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fixed_header *fixed_header); | |
+ | |
+/** | |
+ * @brief An enumeration of CONNECT packet flags. | |
+ * @ingroup packers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030"> | |
+ * MQTT v3.1.1: CONNECT Variable Header. | |
+ * </a> | |
+ */ | |
+enum MQTTConnectFlags { | |
+ MQTT_CONNECT_RESERVED = 1u, | |
+ MQTT_CONNECT_CLEAN_SESSION = 2u, | |
+ MQTT_CONNECT_WILL_FLAG = 4u, | |
+ MQTT_CONNECT_WILL_QOS_0 = (0u & 0x03) << 3, | |
+ MQTT_CONNECT_WILL_QOS_1 = (1u & 0x03) << 3, | |
+ MQTT_CONNECT_WILL_QOS_2 = (2u & 0x03) << 3, | |
+ MQTT_CONNECT_WILL_RETAIN = 32u, | |
+ MQTT_CONNECT_PASSWORD = 64u, | |
+ MQTT_CONNECT_USER_NAME = 128u, | |
+}; | |
+ | |
+/** | |
+ * @brief Serialize a connection request into a buffer. | |
+ * @ingroup packers | |
+ * | |
+ * @param[out] buf the buffer to pack the connection request packet into. | |
+ * @param[in] bufsz the number of bytes left in \p buf. | |
+ * @param[in] client_id the ID that identifies the local client. \p client_id is a required | |
+ * parameter. | |
+ * @param[in] will_topic the topic under which the local client's will message will be published. | |
+ * Set to \c NULL for no will message. If \p will_topic is not \c NULL a | |
+ * \p will_message must also be provided. | |
+ * @param[in] will_message the will message to be published upon a unsuccessful disconnection of | |
+ * the local client. Set to \c NULL if \p will_topic is \c NULL. | |
+ * \p will_message must \em not be \c NULL if \p will_topic is not | |
+ * \c NULL. | |
+ * @param[in] will_message_size The size of \p will_message in bytes. | |
+ * @param[in] user_name the username to be used to connect to the broker with. Set to \c NULL if | |
+ * no username is required. | |
+ * @param[in] password the password to be used to connect to the broker with. Set to \c NULL if | |
+ * no password is required. | |
+ * @param[in] connect_flags additional MQTTConnectFlags to be set. The only flags that need to be | |
+ * set manually are \c MQTT_CONNECT_CLEAN_SESSION, | |
+ * \c MQTT_CONNECT_WILL_QOS_X (for \c X ∈ {0, 1, 2}), and | |
+ * \c MQTT_CONNECT_WILL_RETAIN. Set to 0 if no additional flags are | |
+ * required. | |
+ * @param[in] keep_alive the keep alive time in seconds. It is the responsibility of the clinet | |
+ * to ensure packets are sent to the server \em {at least} this frequently. | |
+ * | |
+ * @note If there is a \p will_topic and no additional \p connect_flags are given, then by | |
+ * default \p will_message will be published at QoS level 0. | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028"> | |
+ * MQTT v3.1.1: CONNECT - Client Requests a Connection to a Server. | |
+ * </a> | |
+ * | |
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the CONNECT | |
+ * packet, a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, | |
+ const char* client_id, | |
+ const char* will_topic, | |
+ const void* will_message, | |
+ size_t will_message_size, | |
+ const char* user_name, | |
+ const char* password, | |
+ uint8_t connect_flags, | |
+ uint16_t keep_alive); | |
+ | |
+/** | |
+ * @brief An enumeration of the PUBLISH flags. | |
+ * @ingroup packers | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037"> | |
+ * MQTT v3.1.1: PUBLISH - Publish Message. | |
+ * </a> | |
+ */ | |
+enum MQTTPublishFlags { | |
+ MQTT_PUBLISH_DUP = 8u, | |
+ MQTT_PUBLISH_QOS_0 = ((0u << 1) & 0x06), | |
+ MQTT_PUBLISH_QOS_1 = ((1u << 1) & 0x06), | |
+ MQTT_PUBLISH_QOS_2 = ((2u << 1) & 0x06), | |
+ MQTT_PUBLISH_QOS_MASK = ((3u << 1) & 0x06), | |
+ MQTT_PUBLISH_RETAIN = 0x01 | |
+}; | |
+ | |
+/** | |
+ * @brief Serialize a PUBLISH request and put it in \p buf. | |
+ * @ingroup packers | |
+ * | |
+ * @param[out] buf the buffer to put the PUBLISH packet in. | |
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf. | |
+ * @param[in] topic_name the topic to publish \p application_message under. | |
+ * @param[in] packet_id this packets packet ID. | |
+ * @param[in] application_message the application message to be published. | |
+ * @param[in] application_message_size the size of \p application_message in bytes. | |
+ * @param[in] publish_flags The flags to publish \p application_message with. These include | |
+ * the \c MQTT_PUBLISH_DUP flag, \c MQTT_PUBLISH_QOS_X (\c X ∈ | |
+ * {0, 1, 2}), and \c MQTT_PUBLISH_RETAIN flag. | |
+ * | |
+ * @note The default QoS is level 0. | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037"> | |
+ * MQTT v3.1.1: PUBLISH - Publish Message. | |
+ * </a> | |
+ * | |
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBLISH | |
+ * packet, a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz, | |
+ const char* topic_name, | |
+ uint16_t packet_id, | |
+ void* application_message, | |
+ size_t application_message_size, | |
+ uint8_t publish_flags); | |
+ | |
+/** | |
+ * @brief Serialize a PUBACK, PUBREC, PUBREL, or PUBCOMP packet and put it in \p buf. | |
+ * @ingroup packers | |
+ * | |
+ * @param[out] buf the buffer to put the PUBXXX packet in. | |
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf. | |
+ * @param[in] control_type the type of packet. Must be one of: \c MQTT_CONTROL_PUBACK, | |
+ * \c MQTT_CONTROL_PUBREC, \c MQTT_CONTROL_PUBREL, | |
+ * or \c MQTT_CONTROL_PUBCOMP. | |
+ * @param[in] packet_id the packet ID of the packet being acknowledged. | |
+ * | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043"> | |
+ * MQTT v3.1.1: PUBACK - Publish Acknowledgement. | |
+ * </a> | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048"> | |
+ * MQTT v3.1.1: PUBREC - Publish Received. | |
+ * </a> | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718053"> | |
+ * MQTT v3.1.1: PUBREL - Publish Released. | |
+ * </a> | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718058"> | |
+ * MQTT v3.1.1: PUBCOMP - Publish Complete. | |
+ * </a> | |
+ * | |
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBXXX | |
+ * packet, a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz, | |
+ enum MQTTControlPacketType control_type, | |
+ uint16_t packet_id); | |
+ | |
+/** | |
+ * @brief The maximum number topics that can be subscribed to in a single call to | |
+ * mqtt_pack_subscribe_request. | |
+ * @ingroup packers | |
+ * | |
+ * @see mqtt_pack_subscribe_request | |
+ */ | |
+#define MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS 8 | |
+ | |
+/** | |
+ * @brief Serialize a SUBSCRIBE packet and put it in \p buf. | |
+ * @ingroup packers | |
+ * | |
+ * @param[out] buf the buffer to put the SUBSCRIBE packet in. | |
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf. | |
+ * @param[in] packet_id the packet ID to be used. | |
+ * @param[in] ... \c NULL terminated list of (\c {const char *topic_name}, \c {int max_qos_level}) | |
+ * pairs. | |
+ * | |
+ * @note The variadic arguments, \p ..., \em must be followed by a \c NULL. For example: | |
+ * @code | |
+ * ssize_t n = mqtt_pack_subscribe_request(buf, bufsz, 1234, "topic_1", 0, "topic_2", 2, NULL); | |
+ * @endcode | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063"> | |
+ * MQTT v3.1.1: SUBSCRIBE - Subscribe to Topics. | |
+ * </a> | |
+ * | |
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the SUBSCRIBE | |
+ * packet, a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, | |
+ uint16_t packet_id, | |
+ ...); /* null terminated */ | |
+ | |
+/** | |
+ * @brief The maximum number topics that can be subscribed to in a single call to | |
+ * mqtt_pack_unsubscribe_request. | |
+ * @ingroup packers | |
+ * | |
+ * @see mqtt_pack_unsubscribe_request | |
+ */ | |
+#define MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS 8 | |
+ | |
+/** | |
+ * @brief Serialize a UNSUBSCRIBE packet and put it in \p buf. | |
+ * @ingroup packers | |
+ * | |
+ * @param[out] buf the buffer to put the UNSUBSCRIBE packet in. | |
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf. | |
+ * @param[in] packet_id the packet ID to be used. | |
+ * @param[in] ... \c NULL terminated list of \c {const char *topic_name}'s to unsubscribe from. | |
+ * | |
+ * @note The variadic arguments, \p ..., \em must be followed by a \c NULL. For example: | |
+ * @code | |
+ * ssize_t n = mqtt_pack_unsubscribe_request(buf, bufsz, 4321, "topic_1", "topic_2", NULL); | |
+ * @endcode | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072"> | |
+ * MQTT v3.1.1: UNSUBSCRIBE - Unsubscribe from Topics. | |
+ * </a> | |
+ * | |
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the UNSUBSCRIBE | |
+ * packet, a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, | |
+ uint16_t packet_id, | |
+ ...); /* null terminated */ | |
+ | |
+/** | |
+ * @brief Serialize a PINGREQ and put it into \p buf. | |
+ * @ingroup packers | |
+ * | |
+ * @param[out] buf the buffer to put the PINGREQ packet in. | |
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf. | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718081"> | |
+ * MQTT v3.1.1: PINGREQ - Ping Request. | |
+ * </a> | |
+ * | |
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PINGREQ | |
+ * packet, a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_pack_ping_request(uint8_t *buf, size_t bufsz); | |
+ | |
+/** | |
+ * @brief Serialize a DISCONNECT and put it into \p buf. | |
+ * @ingroup packers | |
+ * | |
+ * @param[out] buf the buffer to put the DISCONNECT packet in. | |
+ * @param[in] bufsz the maximum number of bytes that can be put into \p buf. | |
+ * | |
+ * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090"> | |
+ * MQTT v3.1.1: DISCONNECT - Disconnect Notification. | |
+ * </a> | |
+ * | |
+ * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the DISCONNECT | |
+ * packet, a negative value if there was a protocol violation. | |
+ */ | |
+ssize_t mqtt_pack_disconnect(uint8_t *buf, size_t bufsz); | |
+ | |
+ | |
+/** | |
+ * @brief An enumeration of queued message states. | |
+ * @ingroup details | |
+ */ | |
+enum MQTTQueuedMessageState { | |
+ MQTT_QUEUED_UNSENT, | |
+ MQTT_QUEUED_AWAITING_ACK, | |
+ MQTT_QUEUED_COMPLETE | |
+}; | |
+ | |
+/** | |
+ * @brief A message in a mqtt_message_queue. | |
+ * @ingroup details | |
+ */ | |
+struct mqtt_queued_message { | |
+ /** @brief A pointer to the start of the message. */ | |
+ uint8_t *start; | |
+ | |
+ /** @brief The number of bytes in the message. */ | |
+ size_t size; | |
+ | |
+ /** @brief The state of the message. */ | |
+ enum MQTTQueuedMessageState state; | |
+ | |
+ /** | |
+ * @brief The time at which the message was sent.. | |
+ * | |
+ * @note A timeout will only occur if the message is in | |
+ * the MQTT_QUEUED_AWAITING_ACK \c state. | |
+ */ | |
+ mqtt_pal_time_t time_sent; | |
+ | |
+ /** | |
+ * @brief The control type of the message. | |
+ */ | |
+ enum MQTTControlPacketType control_type; | |
+ | |
+ /** | |
+ * @brief The packet id of the message. | |
+ * | |
+ * @note This field is only used if the associate \c control_type has a | |
+ * \c packet_id field. | |
+ */ | |
+ uint16_t packet_id; | |
+}; | |
+ | |
+/** | |
+ * @brief A message queue. | |
+ * @ingroup details | |
+ * | |
+ * @note This struct is used internally to manage sending messages. | |
+ * @note The only members the user should use are \c curr and \c curr_sz. | |
+ */ | |
+struct mqtt_message_queue { | |
+ /** | |
+ * @brief The start of the message queue's memory block. | |
+ * | |
+ * @warning This member should \em not be manually changed. | |
+ */ | |
+ void *mem_start; | |
+ | |
+ /** @brief The end of the message queue's memory block. */ | |
+ void *mem_end; | |
+ | |
+ /** | |
+ * @brief A pointer to the position in the buffer you can pack bytes at. | |
+ * | |
+ * @note Immediately after packing bytes at \c curr you \em must call | |
+ * mqtt_mq_register. | |
+ */ | |
+ uint8_t *curr; | |
+ | |
+ /** | |
+ * @brief The number of bytes that can be written to \c curr. | |
+ * | |
+ * @note curr_sz will decrease by more than the number of bytes you write to | |
+ * \c curr. This is because the mqtt_queued_message structs share the | |
+ * same memory (and thus, a mqtt_queued_message must be allocated in | |
+ * the message queue's memory whenever a new message is registered). | |
+ */ | |
+ size_t curr_sz; | |
+ | |
+ /** | |
+ * @brief The tail of the array of mqtt_queued_messages's. | |
+ * | |
+ * @note This member should not be used manually. | |
+ */ | |
+ struct mqtt_queued_message *queue_tail; | |
+}; | |
+ | |
+/** | |
+ * @brief Initialize a message queue. | |
+ * @ingroup details | |
+ * | |
+ * @param[out] mq The message queue to initialize. | |
+ * @param[in] buf The buffer for this message queue. | |
+ * @param[in] bufsz The number of bytes in the buffer. | |
+ * | |
+ * @relates mqtt_message_queue | |
+ */ | |
+void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz); | |
+ | |
+/** | |
+ * @brief Clear as many messages from the front of the queue as possible. | |
+ * @ingroup details | |
+ * | |
+ * @note Calls to this function are the \em only way to remove messages from the queue. | |
+ * | |
+ * @param mq The message queue. | |
+ * | |
+ * @relates mqtt_message_queue | |
+ */ | |
+void mqtt_mq_clean(struct mqtt_message_queue *mq); | |
+ | |
+/** | |
+ * @brief Register a message that was just added to the buffer. | |
+ * @ingroup details | |
+ * | |
+ * @note This function should be called immediately following a call to a packer function | |
+ * that returned a positive value. The positive value (number of bytes packed) should | |
+ * be passed to this function. | |
+ * | |
+ * @param mq The message queue. | |
+ * @param[in] nbytes The number of bytes that were just packed. | |
+ * | |
+ * @note This function will step mqtt_message_queue::curr and update mqtt_message_queue::curr_sz. | |
+ * @relates mqtt_message_queue | |
+ * | |
+ * @returns The newly added struct mqtt_queued_message. | |
+ */ | |
+struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes); | |
+ | |
+/** | |
+ * @brief Find a message in the message queue. | |
+ * @ingroup details | |
+ * | |
+ * @param mq The message queue. | |
+ * @param[in] control_type The control type of the message you want to find. | |
+ * @param[in] packet_id The packet ID of the message you want to find. Set to \c NULL if you | |
+ * don't want to specify a packet ID. | |
+ * | |
+ * @relates mqtt_message_queue | |
+ * @returns The found message. \c NULL if the message was not found. | |
+ */ | |
+struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQTTControlPacketType control_type, uint16_t *packet_id); | |
+ | |
+/** | |
+ * @brief Returns the mqtt_queued_message at \p index. | |
+ * @ingroup details | |
+ * | |
+ * @param mq_ptr A pointer to the message queue. | |
+ * @param index The index of the message. | |
+ * | |
+ * @returns The mqtt_queued_message at \p index. | |
+ */ | |
+#define mqtt_mq_get(mq_ptr, index) (((struct mqtt_queued_message*) ((mq_ptr)->mem_end)) - 1 - index) | |
+ | |
+/** | |
+ * @brief Returns the number of messages in the message queue, \p mq_ptr. | |
+ * @ingroup details | |
+ */ | |
+#define mqtt_mq_length(mq_ptr) (((struct mqtt_queued_message*) ((mq_ptr)->mem_end)) - (mq_ptr)->queue_tail) | |
+ | |
+/** | |
+ * @brief Used internally to recalculate the \c curr_sz. | |
+ * @ingroup details | |
+ */ | |
+#define mqtt_mq_currsz(mq_ptr) (mq_ptr->curr >= (uint8_t*) ((mq_ptr)->queue_tail - 1)) ? 0 : ((uint8_t*) ((mq_ptr)->queue_tail - 1)) - (mq_ptr)->curr | |
+ | |
+/* CLIENT */ | |
+ | |
+/** | |
+ * @brief An MQTT client. | |
+ * @ingroup details | |
+ * | |
+ * @note All members can be manipulated via the related functions. | |
+ */ | |
+struct mqtt_client { | |
+ /** @brief The socket connecting to the MQTT broker. */ | |
+ mqtt_pal_socket_handle socketfd; | |
+ | |
+ /** @brief The LFSR state used to generate packet ID's. */ | |
+ uint16_t pid_lfsr; | |
+ | |
+ /** @brief The keep-alive time in seconds. */ | |
+ uint16_t keep_alive; | |
+ | |
+ /** | |
+ * @brief A counter counting pings that have been sent to keep the connection alive. | |
+ * @see keep_alive | |
+ */ | |
+ int number_of_keep_alives; | |
+ | |
+ /** | |
+ * @brief The timestamp of the last message sent to the buffer. | |
+ * | |
+ * This is used to detect the need for keep-alive pings. | |
+ * | |
+ * @see keep_alive | |
+ */ | |
+ mqtt_pal_time_t time_of_last_send; | |
+ | |
+ /** | |
+ * @brief The error state of the client. | |
+ * | |
+ * error should be MQTT_OK for the entirety of the connection. | |
+ * | |
+ * @note The error state will be MQTT_ERROR_CONNECT_NOT_CALLED until | |
+ * you call mqtt_connect. | |
+ */ | |
+ enum MQTTErrors error; | |
+ | |
+ /** | |
+ * @brief The timeout period in seconds. | |
+ * | |
+ * If the broker doesn't return an ACK within response_timeout seconds a timeout | |
+ * will occur and the message will be retransmitted. | |
+ * | |
+ * @note The default value is 30 [seconds] but you can change it at any time. | |
+ */ | |
+ int response_timeout; | |
+ | |
+ /** @brief A counter counting the number of timeouts that have occurred. */ | |
+ int number_of_timeouts; | |
+ | |
+ /** | |
+ * @brief Approximately much time it has typically taken to receive responses from the | |
+ * broker. | |
+ * | |
+ * @note This is tracked using a exponential-averaging. | |
+ */ | |
+ double typical_response_time; | |
+ | |
+ /** | |
+ * @brief The callback that is called whenever a publish is received from the broker. | |
+ * | |
+ * Any topics that you have subscribed to will be returned from the broker as | |
+ * mqtt_response_publish messages. All the publishes received from the broker will | |
+ * be passed to this function. | |
+ * | |
+ * @note A pointer to publish_response_callback_state is always passed to the callback. | |
+ * Use publish_response_callback_state to keep track of any state information you | |
+ * need. | |
+ */ | |
+ void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish); | |
+ | |
+ /** | |
+ * @brief A pointer to any publish_response_callback state information you need. | |
+ * | |
+ * @note A pointer to this pointer will always be publish_response_callback upon | |
+ * receiving a publish message from the broker. | |
+ */ | |
+ void* publish_response_callback_state; | |
+ | |
+ /** | |
+ * @brief A user-specified callback, triggered on each \ref mqtt_sync, allowing | |
+ * the user to perform state inspections (and custom socket error detection) | |
+ * on the client. | |
+ * | |
+ * This callback is triggered on each call to \ref mqtt_sync. If it returns MQTT_OK | |
+ * then \ref mqtt_sync will continue normally (performing reads and writes). If it | |
+ * returns an error then \ref mqtt_sync will not call reads and writes. | |
+ * | |
+ * This callback can be used to perform custom error detection, namely platform | |
+ * specific socket error detection, and force the client into an error state. | |
+ * | |
+ * This member is always initialized to NULL but it can be manually set at any | |
+ * time. | |
+ */ | |
+ enum MQTTErrors (*inspector_callback)(struct mqtt_client*); | |
+ | |
+ /** | |
+ * @brief A callback that is called whenever the client is in an error state. | |
+ * | |
+ * This callback is responsible for: application level error handling, closing | |
+ * previous sockets, and reestabilishing the connection to the broker and | |
+ * session configurations (i.e. subscriptions). | |
+ */ | |
+ void (*reconnect_callback)(struct mqtt_client*, void**); | |
+ | |
+ /** | |
+ * @brief A pointer to some state. A pointer to this member is passed to | |
+ * \ref mqtt_client.reconnect_callback. | |
+ */ | |
+ void* reconnect_state; | |
+ | |
+ /** | |
+ * @brief The buffer where ingress data is temporarily stored. | |
+ */ | |
+ struct { | |
+ /** @brief The start of the receive buffer's memory. */ | |
+ uint8_t *mem_start; | |
+ | |
+ /** @brief The size of the receive buffer's memory. */ | |
+ size_t mem_size; | |
+ | |
+ /** @brief A pointer to the next writtable location in the receive buffer. */ | |
+ uint8_t *curr; | |
+ | |
+ /** @brief The number of bytes that are still writable at curr. */ | |
+ size_t curr_sz; | |
+ } recv_buffer; | |
+ | |
+ /** | |
+ * @brief A variable passed to support thread-safety. | |
+ * | |
+ * A pointer to this variable is passed to \c MQTT_PAL_MUTEX_LOCK, and | |
+ * \c MQTT_PAL_MUTEX_UNLOCK. | |
+ */ | |
+ mqtt_pal_mutex_t mutex; | |
+ | |
+ /** @brief The sending message queue. */ | |
+ struct mqtt_message_queue mq; | |
+}; | |
+ | |
+/** | |
+ * @brief Generate a new next packet ID. | |
+ * @ingroup details | |
+ * | |
+ * Packet ID's are generated using a max-length LFSR. | |
+ * | |
+ * @param client The MQTT client. | |
+ * | |
+ * @returns The new packet ID that should be used. | |
+ */ | |
+uint16_t __mqtt_next_pid(struct mqtt_client *client); | |
+ | |
+/** | |
+ * @brief Handles egress client traffic. | |
+ * @ingroup details | |
+ * | |
+ * @param client The MQTT client. | |
+ * | |
+ * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+ssize_t __mqtt_send(struct mqtt_client *client); | |
+ | |
+/** | |
+ * @brief Handles ingress client traffic. | |
+ * @ingroup details | |
+ * | |
+ * @param client The MQTT client. | |
+ * | |
+ * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+ssize_t __mqtt_recv(struct mqtt_client *client); | |
+ | |
+/** | |
+ * @brief Function that does the actual sending and receiving of | |
+ * traffic from the network. | |
+ * @ingroup api | |
+ * | |
+ * All the other functions in the @ref api simply stage messages for | |
+ * being sent to the broker. This function does the actual sending of | |
+ * those messages. Additionally this function receives traffic (responses and | |
+ * acknowledgements) from the broker and responds to that traffic accordingly. | |
+ * Lastly this function also calls the \c publish_response_callback when | |
+ * any \c MQTT_CONTROL_PUBLISH messages are received. | |
+ * | |
+ * @pre mqtt_init must have been called. | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * | |
+ * @attention It is the responsibility of the application programmer to | |
+ * call this function periodically. All functions in the @ref api are | |
+ * thread-safe so it is perfectly reasonable to have a thread dedicated | |
+ * to calling this function every 200 ms or so. MQTT-C can be used in single | |
+ * threaded application though by simply calling this functino periodically | |
+ * inside your main thread. See @ref simple_publisher.c and @ref simple_subscriber.c | |
+ * for examples (specifically the \c client_refresher functions). | |
+ * | |
+ * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+enum MQTTErrors mqtt_sync(struct mqtt_client *client); | |
+ | |
+/** | |
+ * @brief Initializes an MQTT client. | |
+ * @ingroup api | |
+ * | |
+ * This function \em must be called before any other API function calls. | |
+ * | |
+ * @pre None. | |
+ * | |
+ * @param[out] client The MQTT client. | |
+ * @param[in] sockfd The socket file descriptor (or equivalent socket handle, e.g. BIO pointer | |
+ * for OpenSSL sockets) connected to the MQTT broker. | |
+ * @param[in] sendbuf A buffer that will be used for sending messages to the broker. | |
+ * @param[in] sendbufsz The size of \p sendbuf in bytes. | |
+ * @param[in] recvbuf A buffer that will be used for receiving messages from the broker. | |
+ * @param[in] recvbufsz The size of \p recvbuf in bytes. | |
+ * @param[in] publish_response_callback The callback to call whenever application messages | |
+ * are received from the broker. | |
+ * | |
+ * @post mqtt_connect must be called. | |
+ * | |
+ * @note \p sockfd is a non-blocking TCP connection. | |
+ * @note If \p sendbuf fills up completely during runtime a \c MQTT_ERROR_SEND_BUFFER_IS_FULL | |
+ * error will be set. Similarly if \p recvbuf is ever to small to receive a message from | |
+ * the broker an MQTT_ERROR_RECV_BUFFER_TOO_SMALL error will be set. | |
+ * @note A pointer to \ref mqtt_client.publish_response_callback_state is always passed as the | |
+ * \c state argument to \p publish_response_callback. Note that the second argument is | |
+ * the mqtt_response_publish that was received from the broker. | |
+ * | |
+ * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or | |
+ * \ref mqtt_init_reconnect more than once per client). | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+enum MQTTErrors mqtt_init(struct mqtt_client *client, | |
+ mqtt_pal_socket_handle sockfd, | |
+ uint8_t *sendbuf, size_t sendbufsz, | |
+ uint8_t *recvbuf, size_t recvbufsz, | |
+ void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)); | |
+ | |
+/** | |
+ * @brief Initializes an MQTT client and enables automatic reconnections. | |
+ * @ingroup api | |
+ * | |
+ * An alternative to \ref mqtt_init that allows the client to automatically reconnect to the | |
+ * broker after an error occurs (e.g. socket error or internal buffer overflows). | |
+ * | |
+ * This is accomplished by calling the \p reconnect_callback whenever the client enters an error | |
+ * state. The job of the \p reconnect_callback is to: (1) perform error handling/logging, | |
+ * (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the | |
+ * client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other | |
+ * API calls such as \ref mqtt_subscribe. | |
+ * | |
+ * The first argument to the \p reconnect_callback is the client (which will be in an error | |
+ * state) and the second argument is a pointer to a void pointer where you can store some state | |
+ * information. Internally, MQTT-C calls the reconnect callback like so: | |
+ * | |
+ * \code | |
+ * client->reconnect_callback(client, &client->reconnect_state) | |
+ * \endcode | |
+ * | |
+ * Note that the \p reconnect_callback is also called to setup the initial session. After | |
+ * calling \ref mqtt_init_reconnect the client will be in the error state | |
+ * \c MQTT_ERROR_INITIAL_RECONNECT. | |
+ * | |
+ * @pre None. | |
+ * | |
+ * @param[in,out] client The MQTT client that will be initialized. | |
+ * @param[in] reconnect_callback The callback that will be called to connect/reconnect the | |
+ * client to the broker and perform application level error handling. | |
+ * @param[in] reconnect_state A pointer to some state data for your \p reconnect_callback. | |
+ * If your \p reconnect_callback does not require any state information set this | |
+ * to NULL. A pointer to the memory address where the client stores a copy of this | |
+ * pointer is passed as the second argumnet to \p reconnect_callback. | |
+ * @param[in] publish_response_callback The callback to call whenever application messages | |
+ * are received from the broker. | |
+ * | |
+ * @post Call \p reconnect_callback yourself, or call \ref mqtt_sync | |
+ * (which will trigger the call to \p reconnect_callback). | |
+ * | |
+ * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or | |
+ * \ref mqtt_init_reconnect more than once per client). | |
+ * | |
+ */ | |
+void mqtt_init_reconnect(struct mqtt_client *client, | |
+ void (*reconnect_callback)(struct mqtt_client *client, void** state), | |
+ void *reconnect_state, | |
+ void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)); | |
+ | |
+/** | |
+ * @brief Safely assign/reassign a socket and buffers to an new/existing client. | |
+ * @ingroup api | |
+ * | |
+ * This function also clears the \p client error state. Upon exiting this function | |
+ * \c client->error will be \c MQTT_ERROR_CONNECT_NOT_CALLED (which will be cleared) | |
+ * as soon as \ref mqtt_connect is called. | |
+ * | |
+ * @pre This function must be called BEFORE \ref mqtt_connect. | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * @param[in] socketfd The new socket connected to the broker. | |
+ * @param[in] sendbuf The buffer that will be used to buffer egress traffic to the broker. | |
+ * @param[in] sendbufsz The size of \p sendbuf in bytes. | |
+ * @param[in] recvbuf The buffer that will be used to buffer ingress traffic from the broker. | |
+ * @param[in] recvbufsz The size of \p recvbuf in bytes. | |
+ * | |
+ * @post Call \ref mqtt_connect. | |
+ * | |
+ * @attention This function should be used in conjunction with clients that have been | |
+ * initialzed with \ref mqtt_init_reconnect. | |
+ */ | |
+void mqtt_reinit(struct mqtt_client* client, | |
+ mqtt_pal_socket_handle socketfd, | |
+ uint8_t *sendbuf, size_t sendbufsz, | |
+ uint8_t *recvbuf, size_t recvbufsz); | |
+ | |
+/** | |
+ * @brief Establishes a session with the MQTT broker. | |
+ * @ingroup api | |
+ * | |
+ * @pre mqtt_init must have been called. | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * @param[in] client_id The unique name identifying the client. | |
+ * @param[in] will_topic The topic name of client's \p will_message. If no will message is | |
+ * desired set to \c NULL. | |
+ * @param[in] will_message The application message (data) to be published in the event the | |
+ * client ungracefully disconnects. Set to \c NULL if \p will_topic is \c NULL. | |
+ * @param[in] will_message_size The size of \p will_message in bytes. | |
+ * @param[in] user_name The username to use when establishing the session with the MQTT broker. | |
+ * Set to \c NULL if a username is not required. | |
+ * @param[in] password The password to use when establishing the session with the MQTT broker. | |
+ * Set to \c NULL if a password is not required. | |
+ * @param[in] connect_flags Additional \ref MQTTConnectFlags to use when establishing the connection. | |
+ * These flags are for forcing the session to start clean, | |
+ * \c MQTT_CONNECT_CLEAN_SESSION, the QOS level to publish the \p will_message with | |
+ * (provided \c will_message != \c NULL), MQTT_CONNECT_WILL_QOS_[0,1,2], and whether | |
+ * or not the broker should retain the \c will_message, MQTT_CONNECT_WILL_RETAIN. | |
+ * @param[in] keep_alive The keep-alive time in seconds. A reasonable value for this is 400 [seconds]. | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+enum MQTTErrors mqtt_connect(struct mqtt_client *client, | |
+ const char* client_id, | |
+ const char* will_topic, | |
+ const void* will_message, | |
+ size_t will_message_size, | |
+ const char* user_name, | |
+ const char* password, | |
+ uint8_t connect_flags, | |
+ uint16_t keep_alive); | |
+ | |
+/* | |
+ todo: will_message should be a void* | |
+*/ | |
+ | |
+/** | |
+ * @brief Publish an application message. | |
+ * @ingroup api | |
+ * | |
+ * Publishes an application message to the MQTT broker. | |
+ * | |
+ * @pre mqtt_connect must have been called. | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * @param[in] topic_name The name of the topic. | |
+ * @param[in] application_message The data to be published. | |
+ * @param[in] application_message_size The size of \p application_message in bytes. | |
+ * @param[in] publish_flags \ref MQTTPublishFlags to be used, namely the QOS level to | |
+ * publish at (MQTT_PUBLISH_QOS_[0,1,2]) or whether or not the broker should | |
+ * retain the publish (MQTT_PUBLISH_RETAIN). | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+enum MQTTErrors mqtt_publish(struct mqtt_client *client, | |
+ const char* topic_name, | |
+ void* application_message, | |
+ size_t application_message_size, | |
+ uint8_t publish_flags); | |
+ | |
+/** | |
+ * @brief Acknowledge an ingree publish with QOS==1. | |
+ * @ingroup details | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * @param[in] packet_id The packet ID of the ingress publish being acknowledged. | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id); | |
+ | |
+/** | |
+ * @brief Acknowledge an ingree publish with QOS==2. | |
+ * @ingroup details | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * @param[in] packet_id The packet ID of the ingress publish being acknowledged. | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id); | |
+ | |
+/** | |
+ * @brief Acknowledge an ingree PUBREC packet. | |
+ * @ingroup details | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * @param[in] packet_id The packet ID of the ingress PUBREC being acknowledged. | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id); | |
+ | |
+/** | |
+ * @brief Acknowledge an ingree PUBREL packet. | |
+ * @ingroup details | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * @param[in] packet_id The packet ID of the ingress PUBREL being acknowledged. | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id); | |
+ | |
+ | |
+/** | |
+ * @brief Subscribe to a topic. | |
+ * @ingroup api | |
+ * | |
+ * @pre mqtt_connect must have been called. | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * @param[in] topic_name The name of the topic to subscribe to. | |
+ * @param[in] max_qos_level The maximum QOS level with which the broker can send application | |
+ * messages for this topic. | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+enum MQTTErrors mqtt_subscribe(struct mqtt_client *client, | |
+ const char* topic_name, | |
+ int max_qos_level); | |
+ | |
+/** | |
+ * @brief Unsubscribe from a topic. | |
+ * @ingroup api | |
+ * | |
+ * @pre mqtt_connect must have been called. | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * @param[in] topic_name The name of the topic to unsubscribe from. | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client, | |
+ const char* topic_name); | |
+ | |
+/** | |
+ * @brief Ping the broker. | |
+ * @ingroup api | |
+ * | |
+ * @pre mqtt_connect must have been called. | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+enum MQTTErrors mqtt_ping(struct mqtt_client *client); | |
+ | |
+/** | |
+ * @brief Ping the broker without locking/unlocking the mutex. | |
+ * @see mqtt_ping | |
+ */ | |
+enum MQTTErrors __mqtt_ping(struct mqtt_client *client); | |
+ | |
+/** | |
+ * @brief Terminate the session with the MQTT broker. | |
+ * @ingroup api | |
+ * | |
+ * @pre mqtt_connect must have been called. | |
+ * | |
+ * @param[in,out] client The MQTT client. | |
+ * | |
+ * @note To re-establish the session, mqtt_connect must be called. | |
+ * | |
+ * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. | |
+ */ | |
+enum MQTTErrors mqtt_disconnect(struct mqtt_client *client); | |
+ | |
+#endif | |
diff --git a/include/netutils/mqtt_pal.h b/include/netutils/mqtt_pal.h | |
new file mode 100644 | |
index 00000000..5c83d64a | |
--- /dev/null | |
+++ b/include/netutils/mqtt_pal.h | |
@@ -0,0 +1,99 @@ | |
+#ifndef __MQTT_PAL_H__ | |
+#define __MQTT_PAL_H__ | |
+ | |
+/** | |
+ * @file | |
+ * @brief Includes/supports the types/calls required by the MQTT-C client. | |
+ * | |
+ * @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore | |
+ * responsible for including/supporting all the required types and calls. | |
+ * | |
+ * @defgroup pal Platform abstraction layer | |
+ * @brief Documentation of the types and calls required to port MQTT-C to a new platform. | |
+ * | |
+ * mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a | |
+ * new platform the following types, functions, constants, and macros must be defined in | |
+ * mqtt_pal.h: | |
+ * - Types: | |
+ * - \c size_t, \c ssize_t | |
+ * - \c uint8_t, \c uint16_t, \c uint32_t | |
+ * - \c va_list | |
+ * - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME() | |
+ * - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and | |
+ * \c MQTT_PAL_MUTEX_RELEASE | |
+ * - Functions: | |
+ * - \c memcpy, \c strlen | |
+ * - \c va_start, \c va_arg, \c va_end | |
+ * - Constants: | |
+ * - \c INT_MIN | |
+ * | |
+ * Additionally, three macro's are required: | |
+ * - \c MQTT_PAL_HTONS(s) : host-to-network endian conversion for uint16_t. | |
+ * - \c MQTT_PAL_NTOHS(s) : network-to-host endian conversion for uint16_t. | |
+ * - \c MQTT_PAL_TIME() : returns [type: \c mqtt_pal_time_t] current time in seconds. | |
+ * - \c MQTT_PAL_MUTEX_LOCK(mtx_pointer) : macro that locks the mutex pointed to by \c mtx_pointer. | |
+ * - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by | |
+ * \c mtx_pointer. | |
+ * | |
+ * Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c | |
+ * for sending and receiving data using the platforms socket calls. | |
+ */ | |
+ | |
+ | |
+/* UNIX-like platform support */ | |
+#define __unix__ 1 | |
+#ifdef __unix__ | |
+ #include <limits.h> | |
+ #include <string.h> | |
+ #include <stdarg.h> | |
+ #include <time.h> | |
+ #include <arpa/inet.h> | |
+ #include <pthread.h> | |
+ | |
+ #define MQTT_PAL_HTONS(s) htons(s) | |
+ #define MQTT_PAL_NTOHS(s) ntohs(s) | |
+ | |
+ #define MQTT_PAL_TIME() time(NULL) | |
+ | |
+ typedef time_t mqtt_pal_time_t; | |
+ typedef pthread_mutex_t mqtt_pal_mutex_t; | |
+ | |
+ #define MQTT_PAL_MUTEX_INIT(mtx_ptr) pthread_mutex_init(mtx_ptr, NULL) | |
+ #define MQTT_PAL_MUTEX_LOCK(mtx_ptr) pthread_mutex_lock(mtx_ptr) | |
+ #define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) pthread_mutex_unlock(mtx_ptr) | |
+ | |
+ #ifdef MQTT_USE_BIO | |
+ #include <openssl/bio.h> | |
+ typedef BIO* mqtt_pal_socket_handle; | |
+ #else | |
+ typedef int mqtt_pal_socket_handle; | |
+ #endif | |
+#endif | |
+ | |
+/** | |
+ * @brief Sends all the bytes in a buffer. | |
+ * @ingroup pal | |
+ * | |
+ * @param[in] fd The file-descriptor (or handle) of the socket. | |
+ * @param[in] buf A pointer to the first byte in the buffer to send. | |
+ * @param[in] len The number of bytes to send (starting at \p buf). | |
+ * @param[in] flags Flags which are passed to the underlying socket. | |
+ * | |
+ * @returns The number of bytes sent if successful, an \ref MQTTErrors otherwise. | |
+ */ | |
+ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags); | |
+ | |
+/** | |
+ * @brief Non-blocking receive all the byte available. | |
+ * @ingroup pal | |
+ * | |
+ * @param[in] fd The file-descriptor (or handle) of the socket. | |
+ * @param[in] buf A pointer to the receive buffer. | |
+ * @param[in] bufsz The max number of bytes that can be put into \p buf. | |
+ * @param[in] flags Flags which are passed to the underlying socket. | |
+ * | |
+ * @returns The number of bytes received if successful, an \ref MQTTErrors otherwise. | |
+ */ | |
+ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags); | |
+ | |
+#endif | |
diff --git a/netutils/mqttc/Kconfig b/netutils/mqttc/Kconfig | |
new file mode 100644 | |
index 00000000..35723ba6 | |
--- /dev/null | |
+++ b/netutils/mqttc/Kconfig | |
@@ -0,0 +1,19 @@ | |
+# | |
+# For a description of the syntax of this configuration file, | |
+# see the file kconfig-language.txt in the NuttX tools repository. | |
+# | |
+ | |
+config NETUTILS_MQTTC | |
+ bool "MQTTC client library" | |
+ default n | |
+ depends on NET && NET_TCP | |
+ ---help--- | |
+ Enable support for the mqtt. This is a public domain | |
+ Telnet client library available from https://github.com/LiamBindle/MQTT-C | |
+ modified for use with NuttX. Original Authors: | |
+ | |
+ Liam Bindle | |
+ Demilade Adeoye | |
+ | |
+if NETUTILS_MQTTC | |
+endif | |
diff --git a/netutils/mqttc/Make.defs b/netutils/mqttc/Make.defs | |
new file mode 100644 | |
index 00000000..3832c0fd | |
--- /dev/null | |
+++ b/netutils/mqttc/Make.defs | |
@@ -0,0 +1,39 @@ | |
+# apps/netutils/telnetc/Make.defs | |
+# Adds selected applications to apps/ build | |
+# | |
+# Copyright (C) 2017 Gregory Nutt. All rights reserved. | |
+# Author: Gregory Nutt <gnutt@nuttx.org> | |
+# | |
+# Redistribution and use in source and binary forms, with or without | |
+# modification, are permitted provided that the following conditions | |
+# are met: | |
+# | |
+# 1. Redistributions of source code must retain the above copyright | |
+# notice, this list of conditions and the following disclaimer. | |
+# 2. Redistributions in binary form must reproduce the above copyright | |
+# notice, this list of conditions and the following disclaimer in | |
+# the documentation and/or other materials provided with the | |
+# distribution. | |
+# 3. Neither the name NuttX nor the names of its contributors may be | |
+# used to endorse or promote products derived from this software | |
+# without specific prior written permission. | |
+# | |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS | |
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE | |
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, | |
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, | |
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS | |
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED | |
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN | |
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
+# POSSIBILITY OF SUCH DAMAGE. | |
+# | |
+############################################################################ | |
+ | |
+ifeq ($(CONFIG_NETUTILS_MQTTC),y) | |
+CONFIGURED_APPS += netutils/mqttc | |
+endif | |
+ | |
diff --git a/netutils/mqttc/Makefile b/netutils/mqttc/Makefile | |
new file mode 100644 | |
index 00000000..b29bc0e5 | |
--- /dev/null | |
+++ b/netutils/mqttc/Makefile | |
@@ -0,0 +1,106 @@ | |
+############################################################################ | |
+# apps/netutils/mqttc/Makefile | |
+# | |
+# Copyright (C) 2015 Max Nekludov. All rights reserved. | |
+# Author: Max Nekludov <macscomp@gmail.com> | |
+# | |
+# Redistribution and use in source and binary forms, with or without | |
+# modification, are permitted provided that the following conditions | |
+# are met: | |
+# | |
+# 1. Redistributions of source code must retain the above copyright | |
+# notice, this list of conditions and the following disclaimer. | |
+# 2. Redistributions in binary form must reproduce the above copyright | |
+# notice, this list of conditions and the following disclaimer in | |
+# the documentation and/or other materials provided with the | |
+# distribution. | |
+# 3. Neither the name NuttX nor the names of its contributors may be | |
+# used to endorse or promote products derived from this software | |
+# without specific prior written permission. | |
+# | |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS | |
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE | |
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, | |
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, | |
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS | |
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED | |
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN | |
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
+# POSSIBILITY OF SUCH DAMAGE. | |
+# | |
+############################################################################ | |
+ | |
+-include $(TOPDIR)/.config | |
+-include $(TOPDIR)/Make.defs | |
+include $(APPDIR)/Make.defs | |
+ | |
+ASRCS = | |
+CSRCS = | |
+ | |
+CSRCS += mqtt.c mqtt_pal.c | |
+ | |
+ifeq ($(CONFIG_NETUTILS_MQTTC),y) | |
+#CSRCS += pap.c | |
+CSRCS += | |
+endif | |
+ | |
+AOBJS = $(ASRCS:.S=$(OBJEXT)) | |
+COBJS = $(CSRCS:.c=$(OBJEXT)) | |
+ | |
+SRCS = $(ASRCS) $(CSRCS) | |
+OBJS = $(AOBJS) $(COBJS) | |
+ | |
+ifeq ($(CONFIG_WINDOWS_NATIVE),y) | |
+ BIN = ..\..\libapps$(LIBEXT) | |
+else | |
+ifeq ($(WINTOOL),y) | |
+ BIN = ..\\..\\libapps$(LIBEXT) | |
+else | |
+ BIN = ../../libapps$(LIBEXT) | |
+endif | |
+endif | |
+ | |
+ROOTDEPPATH = --dep-path . | |
+ | |
+# Common build | |
+ | |
+VPATH = | |
+ | |
+all: .built | |
+.PHONY: context depend clean distclean preconfig | |
+.PRECIOUS: ../../libapps$(LIBEXT) | |
+ | |
+$(AOBJS): %$(OBJEXT): %.S | |
+ $(call ASSEMBLE, $<, $@) | |
+ | |
+$(COBJS): %$(OBJEXT): %.c | |
+ $(call COMPILE, $<, $@) | |
+ | |
+.built: $(OBJS) | |
+ $(call ARCHIVE, $(BIN), $(OBJS)) | |
+ $(Q) touch .built | |
+ | |
+install: | |
+ | |
+context: | |
+ | |
+.depend: Makefile $(SRCS) | |
+ $(Q) $(MKDEP) $(ROOTDEPPATH) "$(CC)" -- $(CFLAGS) -- $(SRCS) >Make.dep | |
+ $(Q) touch $@ | |
+ | |
+depend: .depend | |
+ | |
+clean: | |
+ $(call DELFILE, .built) | |
+ $(call CLEAN) | |
+ | |
+distclean: clean | |
+ $(call DELFILE, Make.dep) | |
+ $(call DELFILE, .depend) | |
+ | |
+preconfig: | |
+ | |
+-include Make.dep | |
diff --git a/netutils/mqttc/mqtt.c b/netutils/mqttc/mqtt.c | |
new file mode 100644 | |
index 00000000..d876abb4 | |
--- /dev/null | |
+++ b/netutils/mqttc/mqtt.c | |
@@ -0,0 +1,1663 @@ | |
+#include "netutils/mqtt.h" | |
+ | |
+/** | |
+ * @file | |
+ * @brief Implements the functionality of MQTT-C. | |
+ * @note The only files that are included are mqtt.h and mqtt_pal.h. | |
+ * | |
+ * @cond Doxygen_Suppress | |
+ */ | |
+ | |
+enum MQTTErrors mqtt_sync(struct mqtt_client *client) { | |
+ /* Recover from any errors */ | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); | |
+ if (client->error != MQTT_OK && client->reconnect_callback != NULL) { | |
+ client->reconnect_callback(client, &client->reconnect_state); | |
+ /* unlocked during CONNECT */ | |
+ } else { | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ } | |
+ | |
+ /* Call inspector callback if necessary */ | |
+ enum MQTTErrors err; | |
+ if (client->inspector_callback != NULL) { | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); | |
+ err = client->inspector_callback(client); | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ if (err != MQTT_OK) return err; | |
+ } | |
+ /* Call receive */ | |
+ err = __mqtt_recv(client); | |
+ if (err != MQTT_OK) return err; | |
+ /* Call send */ | |
+ err = __mqtt_send(client); | |
+ return err; | |
+} | |
+ | |
+uint16_t __mqtt_next_pid(struct mqtt_client *client) { | |
+ if (client->pid_lfsr == 0) { | |
+ client->pid_lfsr = 163u; | |
+ } | |
+ /* LFSR taps taken from: https://en.wikipedia.org/wiki/Linear-feedback_shift_register */ | |
+ int pid_exists = 0; | |
+ do { | |
+ unsigned lsb = client->pid_lfsr & 1; | |
+ (client->pid_lfsr) >>= 1; | |
+ if (lsb) { | |
+ client->pid_lfsr ^= 0xB400u; | |
+ } | |
+ | |
+ /* check that the PID is unique */ | |
+ pid_exists = 0; | |
+ struct mqtt_queued_message *curr; | |
+ for(curr = mqtt_mq_get(&(client->mq), 0); curr >= client->mq.queue_tail; --curr) { | |
+ if (curr->packet_id == client->pid_lfsr) { | |
+ pid_exists = 1; | |
+ break; | |
+ } | |
+ } | |
+ | |
+ } while(pid_exists); | |
+ return client->pid_lfsr; | |
+} | |
+ | |
+enum MQTTErrors mqtt_init(struct mqtt_client *client, | |
+ mqtt_pal_socket_handle sockfd, | |
+ uint8_t *sendbuf, size_t sendbufsz, | |
+ uint8_t *recvbuf, size_t recvbufsz, | |
+ void (*publish_response_callback)(void** state,struct mqtt_response_publish *publish)) | |
+{ | |
+ if (client == NULL || sendbuf == NULL || recvbuf == NULL) { | |
+ return MQTT_ERROR_NULLPTR; | |
+ } | |
+ | |
+ /* initialize mutex */ | |
+ MQTT_PAL_MUTEX_INIT(&client->mutex); | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); /* unlocked during CONNECT */ | |
+ | |
+ client->socketfd = sockfd; | |
+ | |
+ mqtt_mq_init(&client->mq, sendbuf, sendbufsz); | |
+ | |
+ client->recv_buffer.mem_start = recvbuf; | |
+ client->recv_buffer.mem_size = recvbufsz; | |
+ client->recv_buffer.curr = client->recv_buffer.mem_start; | |
+ client->recv_buffer.curr_sz = client->recv_buffer.mem_size; | |
+ | |
+ client->error = MQTT_ERROR_CONNECT_NOT_CALLED; | |
+ client->response_timeout = 30; | |
+ client->number_of_timeouts = 0; | |
+ client->number_of_keep_alives = 0; | |
+ client->typical_response_time = -1.0; | |
+ client->publish_response_callback = publish_response_callback; | |
+ | |
+ client->inspector_callback = NULL; | |
+ client->reconnect_callback = NULL; | |
+ client->reconnect_state = NULL; | |
+ | |
+ return MQTT_OK; | |
+} | |
+ | |
+void mqtt_init_reconnect(struct mqtt_client *client, | |
+ void (*reconnect)(struct mqtt_client *, void**), | |
+ void *reconnect_state, | |
+ void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)) | |
+{ | |
+ /* initialize mutex */ | |
+ MQTT_PAL_MUTEX_INIT(&client->mutex); | |
+ | |
+ client->socketfd = (mqtt_pal_socket_handle) -1; | |
+ | |
+ mqtt_mq_init(&client->mq, NULL, 0); | |
+ | |
+ client->recv_buffer.mem_start = NULL; | |
+ client->recv_buffer.mem_size = 0; | |
+ client->recv_buffer.curr = NULL; | |
+ client->recv_buffer.curr_sz = 0; | |
+ | |
+ client->error = MQTT_ERROR_INITIAL_RECONNECT; | |
+ client->response_timeout = 30; | |
+ client->number_of_timeouts = 0; | |
+ client->number_of_keep_alives = 0; | |
+ client->typical_response_time = -1.0; | |
+ client->publish_response_callback = publish_response_callback; | |
+ | |
+ client->inspector_callback = NULL; | |
+ client->reconnect_callback = reconnect; | |
+ client->reconnect_state = reconnect_state; | |
+} | |
+ | |
+void mqtt_reinit(struct mqtt_client* client, | |
+ mqtt_pal_socket_handle socketfd, | |
+ uint8_t *sendbuf, size_t sendbufsz, | |
+ uint8_t *recvbuf, size_t recvbufsz) | |
+{ | |
+ client->error = MQTT_ERROR_CONNECT_NOT_CALLED; | |
+ client->socketfd = socketfd; | |
+ | |
+ mqtt_mq_init(&client->mq, sendbuf, sendbufsz); | |
+ | |
+ client->recv_buffer.mem_start = recvbuf; | |
+ client->recv_buffer.mem_size = recvbufsz; | |
+ client->recv_buffer.curr = client->recv_buffer.mem_start; | |
+ client->recv_buffer.curr_sz = client->recv_buffer.mem_size; | |
+} | |
+ | |
+/** | |
+ * A macro function that: | |
+ * 1) Checks that the client isn't in an error state. | |
+ * 2) Attempts to pack to client's message queue. | |
+ * a) handles errors | |
+ * b) if mq buffer is too small, cleans it and tries again | |
+ * 3) Upon successful pack, registers the new message. | |
+ */ | |
+#define MQTT_CLIENT_TRY_PACK(tmp, msg, client, pack_call, release) \ | |
+ if (client->error < 0) { \ | |
+ if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \ | |
+ return client->error; \ | |
+ } \ | |
+ tmp = pack_call; \ | |
+ if (tmp < 0) { \ | |
+ client->error = tmp; \ | |
+ if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \ | |
+ return tmp; \ | |
+ } else if (tmp == 0) { \ | |
+ mqtt_mq_clean(&client->mq); \ | |
+ tmp = pack_call; \ | |
+ if (tmp < 0) { \ | |
+ client->error = tmp; \ | |
+ if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \ | |
+ return tmp; \ | |
+ } else if(tmp == 0) { \ | |
+ client->error = MQTT_ERROR_SEND_BUFFER_IS_FULL; \ | |
+ if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \ | |
+ return MQTT_ERROR_SEND_BUFFER_IS_FULL; \ | |
+ } \ | |
+ } \ | |
+ msg = mqtt_mq_register(&client->mq, tmp); \ | |
+ | |
+ | |
+enum MQTTErrors mqtt_connect(struct mqtt_client *client, | |
+ const char* client_id, | |
+ const char* will_topic, | |
+ const void* will_message, | |
+ size_t will_message_size, | |
+ const char* user_name, | |
+ const char* password, | |
+ uint8_t connect_flags, | |
+ uint16_t keep_alive) | |
+{ | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ | |
+ /* Note: Current thread already has mutex locked. */ | |
+ | |
+ /* update the client's state */ | |
+ client->keep_alive = keep_alive; | |
+ if (client->error == MQTT_ERROR_CONNECT_NOT_CALLED) { | |
+ client->error = MQTT_OK; | |
+ } | |
+ | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK(rv, msg, client, | |
+ mqtt_pack_connection_request( | |
+ client->mq.curr, client->mq.curr_sz, | |
+ client_id, will_topic, will_message, | |
+ will_message_size,user_name, password, | |
+ connect_flags, keep_alive | |
+ ), | |
+ 1 | |
+ ); | |
+ /* save the control type of the message */ | |
+ msg->control_type = MQTT_CONTROL_CONNECT; | |
+ | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_OK; | |
+} | |
+ | |
+enum MQTTErrors mqtt_publish(struct mqtt_client *client, | |
+ const char* topic_name, | |
+ void* application_message, | |
+ size_t application_message_size, | |
+ uint8_t publish_flags) | |
+{ | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); | |
+ uint16_t packet_id = __mqtt_next_pid(client); | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ printf("\npacket_id = 0x%08X,%d\n",packet_id); | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK( | |
+ rv, msg, client, | |
+ mqtt_pack_publish_request( | |
+ client->mq.curr, client->mq.curr_sz, | |
+ topic_name, | |
+ packet_id, | |
+ application_message, | |
+ application_message_size, | |
+ publish_flags | |
+ ), | |
+ 1 | |
+ ); | |
+ /* save the control type and packet id of the message */ | |
+ msg->control_type = MQTT_CONTROL_PUBLISH; | |
+ msg->packet_id = packet_id; | |
+ | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_OK; | |
+} | |
+ | |
+ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id) { | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK( | |
+ rv, msg, client, | |
+ mqtt_pack_pubxxx_request( | |
+ client->mq.curr, client->mq.curr_sz, | |
+ MQTT_CONTROL_PUBACK, | |
+ packet_id | |
+ ), | |
+ 0 | |
+ ); | |
+ /* save the control type and packet id of the message */ | |
+ msg->control_type = MQTT_CONTROL_PUBACK; | |
+ msg->packet_id = packet_id; | |
+ | |
+ return MQTT_OK; | |
+} | |
+ | |
+ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id) { | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK( | |
+ rv, msg, client, | |
+ mqtt_pack_pubxxx_request( | |
+ client->mq.curr, client->mq.curr_sz, | |
+ MQTT_CONTROL_PUBREC, | |
+ packet_id | |
+ ), | |
+ 0 | |
+ ); | |
+ /* save the control type and packet id of the message */ | |
+ msg->control_type = MQTT_CONTROL_PUBREC; | |
+ msg->packet_id = packet_id; | |
+ | |
+ return MQTT_OK; | |
+} | |
+ | |
+ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id) { | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK( | |
+ rv, msg, client, | |
+ mqtt_pack_pubxxx_request( | |
+ client->mq.curr, client->mq.curr_sz, | |
+ MQTT_CONTROL_PUBREL, | |
+ packet_id | |
+ ), | |
+ 0 | |
+ ); | |
+ /* save the control type and packet id of the message */ | |
+ msg->control_type = MQTT_CONTROL_PUBREL; | |
+ msg->packet_id = packet_id; | |
+ | |
+ return MQTT_OK; | |
+} | |
+ | |
+ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id) { | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK( | |
+ rv, msg, client, | |
+ mqtt_pack_pubxxx_request( | |
+ client->mq.curr, client->mq.curr_sz, | |
+ MQTT_CONTROL_PUBCOMP, | |
+ packet_id | |
+ ), | |
+ 0 | |
+ ); | |
+ /* save the control type and packet id of the message */ | |
+ msg->control_type = MQTT_CONTROL_PUBCOMP; | |
+ msg->packet_id = packet_id; | |
+ | |
+ return MQTT_OK; | |
+} | |
+ | |
+enum MQTTErrors mqtt_subscribe(struct mqtt_client *client, | |
+ const char* topic_name, | |
+ int max_qos_level) | |
+{ | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); | |
+ uint16_t packet_id = __mqtt_next_pid(client); | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK( | |
+ rv, msg, client, | |
+ mqtt_pack_subscribe_request( | |
+ client->mq.curr, client->mq.curr_sz, | |
+ packet_id, | |
+ topic_name, | |
+ max_qos_level, | |
+ NULL | |
+ ), | |
+ 1 | |
+ ); | |
+ /* save the control type and packet id of the message */ | |
+ msg->control_type = MQTT_CONTROL_SUBSCRIBE; | |
+ msg->packet_id = packet_id; | |
+ | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_OK; | |
+} | |
+ | |
+enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client, | |
+ const char* topic_name) | |
+{ | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); | |
+ uint16_t packet_id = __mqtt_next_pid(client); | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK( | |
+ rv, msg, client, | |
+ mqtt_pack_unsubscribe_request( | |
+ client->mq.curr, client->mq.curr_sz, | |
+ packet_id, | |
+ topic_name, | |
+ NULL | |
+ ), | |
+ 1 | |
+ ); | |
+ /* save the control type and packet id of the message */ | |
+ msg->control_type = MQTT_CONTROL_UNSUBSCRIBE; | |
+ msg->packet_id = packet_id; | |
+ | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_OK; | |
+} | |
+ | |
+enum MQTTErrors mqtt_ping(struct mqtt_client *client) { | |
+ enum MQTTErrors rv; | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); | |
+ rv = __mqtt_ping(client); | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return rv; | |
+} | |
+ | |
+enum MQTTErrors __mqtt_ping(struct mqtt_client *client) | |
+{ | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK( | |
+ rv, msg, client, | |
+ mqtt_pack_ping_request( | |
+ client->mq.curr, client->mq.curr_sz | |
+ ), | |
+ 0 | |
+ ); | |
+ /* save the control type and packet id of the message */ | |
+ msg->control_type = MQTT_CONTROL_PINGREQ; | |
+ | |
+ | |
+ return MQTT_OK; | |
+} | |
+ | |
+enum MQTTErrors mqtt_disconnect(struct mqtt_client *client) | |
+{ | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); | |
+ ssize_t rv; | |
+ struct mqtt_queued_message *msg; | |
+ | |
+ /* try to pack the message */ | |
+ MQTT_CLIENT_TRY_PACK( | |
+ rv, msg, client, | |
+ mqtt_pack_disconnect( | |
+ client->mq.curr, client->mq.curr_sz | |
+ ), | |
+ 1 | |
+ ); | |
+ /* save the control type and packet id of the message */ | |
+ msg->control_type = MQTT_CONTROL_DISCONNECT; | |
+ | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_OK; | |
+} | |
+ | |
+ssize_t __mqtt_send(struct mqtt_client *client) | |
+{ | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); | |
+ uint8_t inspected; | |
+ int i; | |
+ if (client->error < 0 && client->error != MQTT_ERROR_SEND_BUFFER_IS_FULL) { | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return client->error; | |
+ } | |
+ | |
+ /* loop through all messages in the queue */ | |
+ int len = mqtt_mq_length(&client->mq); | |
+ int inflight_qos2 = 0; | |
+ for(i = 0; i < len; ++i) { | |
+ struct mqtt_queued_message *msg = mqtt_mq_get(&client->mq, i); | |
+ int resend = 0; | |
+ if (msg->state == MQTT_QUEUED_UNSENT) { | |
+ /* message has not been sent to lets send it */ | |
+ resend = 1; | |
+ } else if (msg->state == MQTT_QUEUED_AWAITING_ACK) { | |
+ /* check for timeout */ | |
+ if (MQTT_PAL_TIME() > msg->time_sent + client->response_timeout) { | |
+ resend = 1; | |
+ client->number_of_timeouts += 1; | |
+ } | |
+ } | |
+ | |
+ /* only send QoS 2 message if there are no inflight QoS 2 PUBLISH messages */ | |
+ if (msg->control_type == MQTT_CONTROL_PUBLISH | |
+ && (msg->state == MQTT_QUEUED_UNSENT || msg->state == MQTT_QUEUED_AWAITING_ACK)) | |
+ { | |
+ inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */ | |
+ if (inspected == 2) { | |
+ if (inflight_qos2) { | |
+ resend = 0; | |
+ } | |
+ inflight_qos2 = 1; | |
+ } | |
+ } | |
+ | |
+ /* goto next message if we don't need to send */ | |
+ if (!resend) { | |
+ continue; | |
+ } | |
+ | |
+ /* we're sending the message */ | |
+ ssize_t tmp = mqtt_pal_sendall(client->socketfd, msg->start, msg->size, 0); | |
+ if (tmp < 0) { | |
+ client->error = tmp; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return tmp; | |
+ } | |
+ | |
+ /* update timeout watcher */ | |
+ client->time_of_last_send = MQTT_PAL_TIME(); | |
+ msg->time_sent = client->time_of_last_send; | |
+ | |
+ /* | |
+ Determine the state to put the message in. | |
+ Control Types: | |
+ MQTT_CONTROL_CONNECT -> awaiting | |
+ MQTT_CONTROL_CONNACK -> n/a | |
+ MQTT_CONTROL_PUBLISH -> qos == 0 ? complete : awaiting | |
+ MQTT_CONTROL_PUBACK -> complete | |
+ MQTT_CONTROL_PUBREC -> awaiting | |
+ MQTT_CONTROL_PUBREL -> awaiting | |
+ MQTT_CONTROL_PUBCOMP -> complete | |
+ MQTT_CONTROL_SUBSCRIBE -> awaiting | |
+ MQTT_CONTROL_SUBACK -> n/a | |
+ MQTT_CONTROL_UNSUBSCRIBE -> awaiting | |
+ MQTT_CONTROL_UNSUBACK -> n/a | |
+ MQTT_CONTROL_PINGREQ -> awaiting | |
+ MQTT_CONTROL_PINGRESP -> n/a | |
+ MQTT_CONTROL_DISCONNECT -> complete | |
+ */ | |
+ switch (msg->control_type) { | |
+ case MQTT_CONTROL_PUBACK: | |
+ case MQTT_CONTROL_PUBCOMP: | |
+ case MQTT_CONTROL_DISCONNECT: | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ break; | |
+ case MQTT_CONTROL_PUBLISH: | |
+ inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */ | |
+ if (inspected == 0) { | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ } else if (inspected == 1) { | |
+ msg->state = MQTT_QUEUED_AWAITING_ACK; | |
+ /*set DUP flag for subsequent sends */ | |
+ msg->start[1] |= MQTT_PUBLISH_DUP; | |
+ } else { | |
+ msg->state = MQTT_QUEUED_AWAITING_ACK; | |
+ } | |
+ break; | |
+ case MQTT_CONTROL_CONNECT: | |
+ case MQTT_CONTROL_PUBREC: | |
+ case MQTT_CONTROL_PUBREL: | |
+ case MQTT_CONTROL_SUBSCRIBE: | |
+ case MQTT_CONTROL_UNSUBSCRIBE: | |
+ case MQTT_CONTROL_PINGREQ: | |
+ msg->state = MQTT_QUEUED_AWAITING_ACK; | |
+ break; | |
+ default: | |
+ client->error = MQTT_ERROR_MALFORMED_REQUEST; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_MALFORMED_REQUEST; | |
+ } | |
+ } | |
+ | |
+ /* check for keep-alive */ | |
+ mqtt_pal_time_t keep_alive_timeout = client->time_of_last_send + (mqtt_pal_time_t) ((float) (client->keep_alive) * 0.75); | |
+ if (MQTT_PAL_TIME() > keep_alive_timeout) { | |
+ ssize_t rv = __mqtt_ping(client); | |
+ if (rv != MQTT_OK) { | |
+ client->error = rv; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return rv; | |
+ } | |
+ } | |
+ | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_OK; | |
+} | |
+ | |
+ssize_t __mqtt_recv(struct mqtt_client *client) | |
+{ | |
+ MQTT_PAL_MUTEX_LOCK(&client->mutex); | |
+ struct mqtt_response response; | |
+ /* read until there is nothing left to read */ | |
+ while(1) { | |
+ /* read in as many bytes as possible */ | |
+ ssize_t rv, consumed; | |
+ | |
+ rv = mqtt_pal_recvall(client->socketfd, client->recv_buffer.curr, client->recv_buffer.curr_sz, 0); | |
+ if (rv < 0) { | |
+ /* an error occurred */ | |
+ client->error = rv; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return rv; | |
+ } else { | |
+ client->recv_buffer.curr += rv; | |
+ client->recv_buffer.curr_sz -= rv; | |
+ } | |
+ | |
+ /* attempt to parse */ | |
+ consumed = mqtt_unpack_response(&response, client->recv_buffer.mem_start, client->recv_buffer.curr - client->recv_buffer.mem_start); | |
+ | |
+ if (consumed < 0) { | |
+ client->error = consumed; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return consumed; | |
+ } else if (consumed == 0) { | |
+ /* if curr_sz is 0 then the buffer is too small to ever fit the message */ | |
+ if (client->recv_buffer.curr_sz == 0) { | |
+ client->error = MQTT_ERROR_RECV_BUFFER_TOO_SMALL; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_RECV_BUFFER_TOO_SMALL; | |
+ } | |
+ | |
+ /* just need to wait for the rest of the data */ | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_OK; | |
+ } | |
+ | |
+ /* response was unpacked successfully */ | |
+ struct mqtt_queued_message *msg = NULL; | |
+ | |
+ /* | |
+ The switch statement below manages how the client responds to messages from the broker. | |
+ | |
+ Control Types (that we expect to receive from the broker): | |
+ MQTT_CONTROL_CONNACK: | |
+ -> release associated CONNECT | |
+ -> handle response | |
+ MQTT_CONTROL_PUBLISH: | |
+ -> stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2 | |
+ -> call publish callback | |
+ MQTT_CONTROL_PUBACK: | |
+ -> release associated PUBLISH | |
+ MQTT_CONTROL_PUBREC: | |
+ -> release PUBLISH | |
+ -> stage PUBREL | |
+ MQTT_CONTROL_PUBREL: | |
+ -> release associated PUBREC | |
+ -> stage PUBCOMP | |
+ MQTT_CONTROL_PUBCOMP: | |
+ -> release PUBREL | |
+ MQTT_CONTROL_SUBACK: | |
+ -> release SUBSCRIBE | |
+ -> handle response | |
+ MQTT_CONTROL_UNSUBACK: | |
+ -> release UNSUBSCRIBE | |
+ MQTT_CONTROL_PINGRESP: | |
+ -> release PINGREQ | |
+ */ | |
+ switch (response.fixed_header.control_type) { | |
+ case MQTT_CONTROL_CONNACK: | |
+ /* release associated CONNECT */ | |
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_CONNECT, NULL); | |
+ if (msg == NULL) { | |
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ } | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ /* initialize typical response time */ | |
+ client->typical_response_time = (double) (MQTT_PAL_TIME() - msg->time_sent); | |
+ /* check that connection was successful */ | |
+ if (response.decoded.connack.return_code != MQTT_CONNACK_ACCEPTED) { | |
+ client->error = MQTT_ERROR_CONNECTION_REFUSED; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_CONNECTION_REFUSED; | |
+ } | |
+ break; | |
+ case MQTT_CONTROL_PUBLISH: | |
+ /* stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2 */ | |
+ if (response.decoded.publish.qos_level == 1) { | |
+ rv = __mqtt_puback(client, response.decoded.publish.packet_id); | |
+ if (rv != MQTT_OK) { | |
+ client->error = rv; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return rv; | |
+ } | |
+ } else if (response.decoded.publish.qos_level == 2) { | |
+ /* check if this is a duplicate */ | |
+ if (mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREC, &response.decoded.publish.packet_id) != NULL) { | |
+ break; | |
+ } | |
+ | |
+ rv = __mqtt_pubrec(client, response.decoded.publish.packet_id); | |
+ if (rv != MQTT_OK) { | |
+ client->error = rv; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return rv; | |
+ } | |
+ } | |
+ /* call publish callback */ | |
+ client->publish_response_callback(&client->publish_response_callback_state, &response.decoded.publish); | |
+ break; | |
+ case MQTT_CONTROL_PUBACK: | |
+ /* release associated PUBLISH */ | |
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.puback.packet_id); | |
+ if (msg == NULL) { | |
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ } | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ /* update response time */ | |
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); | |
+ break; | |
+ case MQTT_CONTROL_PUBREC: | |
+ /* check if this is a duplicate */ | |
+ if (mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREL, &response.decoded.pubrec.packet_id) != NULL) { | |
+ break; | |
+ } | |
+ /* release associated PUBLISH */ | |
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.pubrec.packet_id); | |
+ if (msg == NULL) { | |
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ } | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ /* update response time */ | |
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); | |
+ /* stage PUBREL */ | |
+ rv = __mqtt_pubrel(client, response.decoded.pubrec.packet_id); | |
+ if (rv != MQTT_OK) { | |
+ client->error = rv; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return rv; | |
+ } | |
+ break; | |
+ case MQTT_CONTROL_PUBREL: | |
+ /* release associated PUBREC */ | |
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREC, &response.decoded.pubrel.packet_id); | |
+ if (msg == NULL) { | |
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ } | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ /* update response time */ | |
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); | |
+ /* stage PUBCOMP */ | |
+ rv = __mqtt_pubcomp(client, response.decoded.pubrec.packet_id); | |
+ if (rv != MQTT_OK) { | |
+ client->error = rv; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return rv; | |
+ } | |
+ break; | |
+ case MQTT_CONTROL_PUBCOMP: | |
+ /* release associated PUBREL */ | |
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREL, &response.decoded.pubcomp.packet_id); | |
+ if (msg == NULL) { | |
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ } | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ /* update response time */ | |
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); | |
+ break; | |
+ case MQTT_CONTROL_SUBACK: | |
+ /* release associated SUBSCRIBE */ | |
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_SUBSCRIBE, &response.decoded.suback.packet_id); | |
+ if (msg == NULL) { | |
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ } | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ /* update response time */ | |
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); | |
+ /* check that subscription was successful (not currently only one subscribe at a time) */ | |
+ if (response.decoded.suback.return_codes[0] == MQTT_SUBACK_FAILURE) { | |
+ client->error = MQTT_ERROR_SUBSCRIBE_FAILED; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_SUBSCRIBE_FAILED; | |
+ } | |
+ break; | |
+ case MQTT_CONTROL_UNSUBACK: | |
+ /* release associated UNSUBSCRIBE */ | |
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_UNSUBSCRIBE, &response.decoded.unsuback.packet_id); | |
+ if (msg == NULL) { | |
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ } | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ /* update response time */ | |
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); | |
+ break; | |
+ case MQTT_CONTROL_PINGRESP: | |
+ /* release associated PINGREQ */ | |
+ msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PINGREQ, NULL); | |
+ if (msg == NULL) { | |
+ client->error = MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_ACK_OF_UNKNOWN; | |
+ } | |
+ msg->state = MQTT_QUEUED_COMPLETE; | |
+ /* update response time */ | |
+ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); | |
+ break; | |
+ default: | |
+ client->error = MQTT_ERROR_MALFORMED_RESPONSE; | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_ERROR_MALFORMED_RESPONSE; | |
+ } | |
+ | |
+ /* we've handled the response, now clean the buffer */ | |
+ void *dest = client->recv_buffer.mem_start; | |
+ void *src = client->recv_buffer.mem_start + consumed; | |
+ size_t n = client->recv_buffer.curr - client->recv_buffer.mem_start - consumed; | |
+ memmove(dest, src, n); | |
+ client->recv_buffer.curr -= consumed; | |
+ client->recv_buffer.curr_sz += consumed; | |
+ } | |
+ | |
+ /* never hit (always return once there's nothing left. */ | |
+ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); | |
+ return MQTT_OK; | |
+} | |
+ | |
+/* FIXED HEADER */ | |
+ | |
+#define MQTT_BITFIELD_RULE_VIOLOATION(bitfield, rule_value, rule_mask) ((bitfield ^ rule_value) & rule_mask) | |
+ | |
+struct { | |
+ const uint8_t control_type_is_valid[16]; | |
+ const uint8_t required_flags[16]; | |
+ const uint8_t mask_required_flags[16]; | |
+} mqtt_fixed_header_rules = { | |
+ { /* boolean value, true if type is valid */ | |
+ 0x00, /* MQTT_CONTROL_RESERVED */ | |
+ 0x01, /* MQTT_CONTROL_CONNECT */ | |
+ 0x01, /* MQTT_CONTROL_CONNACK */ | |
+ 0x01, /* MQTT_CONTROL_PUBLISH */ | |
+ 0x01, /* MQTT_CONTROL_PUBACK */ | |
+ 0x01, /* MQTT_CONTROL_PUBREC */ | |
+ 0x01, /* MQTT_CONTROL_PUBREL */ | |
+ 0x01, /* MQTT_CONTROL_PUBCOMP */ | |
+ 0x01, /* MQTT_CONTROL_SUBSCRIBE */ | |
+ 0x01, /* MQTT_CONTROL_SUBACK */ | |
+ 0x01, /* MQTT_CONTROL_UNSUBSCRIBE */ | |
+ 0x01, /* MQTT_CONTROL_UNSUBACK */ | |
+ 0x01, /* MQTT_CONTROL_PINGREQ */ | |
+ 0x01, /* MQTT_CONTROL_PINGRESP */ | |
+ 0x01, /* MQTT_CONTROL_DISCONNECT */ | |
+ 0x00 /* MQTT_CONTROL_RESERVED */ | |
+ }, | |
+ { /* flags that must be set for the associated control type */ | |
+ 0x00, /* MQTT_CONTROL_RESERVED */ | |
+ 0x00, /* MQTT_CONTROL_CONNECT */ | |
+ 0x00, /* MQTT_CONTROL_CONNACK */ | |
+ 0x00, /* MQTT_CONTROL_PUBLISH */ | |
+ 0x00, /* MQTT_CONTROL_PUBACK */ | |
+ 0x00, /* MQTT_CONTROL_PUBREC */ | |
+ 0x02, /* MQTT_CONTROL_PUBREL */ | |
+ 0x00, /* MQTT_CONTROL_PUBCOMP */ | |
+ 0x02, /* MQTT_CONTROL_SUBSCRIBE */ | |
+ 0x00, /* MQTT_CONTROL_SUBACK */ | |
+ 0x02, /* MQTT_CONTROL_UNSUBSCRIBE */ | |
+ 0x00, /* MQTT_CONTROL_UNSUBACK */ | |
+ 0x00, /* MQTT_CONTROL_PINGREQ */ | |
+ 0x00, /* MQTT_CONTROL_PINGRESP */ | |
+ 0x00, /* MQTT_CONTROL_DISCONNECT */ | |
+ 0x00 /* MQTT_CONTROL_RESERVED */ | |
+ }, | |
+ { /* mask of flags that must be specific values for the associated control type*/ | |
+ 0x00, /* MQTT_CONTROL_RESERVED */ | |
+ 0x0F, /* MQTT_CONTROL_CONNECT */ | |
+ 0x0F, /* MQTT_CONTROL_CONNACK */ | |
+ 0x00, /* MQTT_CONTROL_PUBLISH */ | |
+ 0x0F, /* MQTT_CONTROL_PUBACK */ | |
+ 0x0F, /* MQTT_CONTROL_PUBREC */ | |
+ 0x0F, /* MQTT_CONTROL_PUBREL */ | |
+ 0x0F, /* MQTT_CONTROL_PUBCOMP */ | |
+ 0x0F, /* MQTT_CONTROL_SUBSCRIBE */ | |
+ 0x0F, /* MQTT_CONTROL_SUBACK */ | |
+ 0x0F, /* MQTT_CONTROL_UNSUBSCRIBE */ | |
+ 0x0F, /* MQTT_CONTROL_UNSUBACK */ | |
+ 0x0F, /* MQTT_CONTROL_PINGREQ */ | |
+ 0x0F, /* MQTT_CONTROL_PINGRESP */ | |
+ 0x0F, /* MQTT_CONTROL_DISCONNECT */ | |
+ 0x00 /* MQTT_CONTROL_RESERVED */ | |
+ } | |
+}; | |
+ | |
+ssize_t mqtt_fixed_header_rule_violation(const struct mqtt_fixed_header *fixed_header) { | |
+ uint8_t control_type; | |
+ uint8_t control_flags; | |
+ uint8_t required_flags; | |
+ uint8_t mask_required_flags; | |
+ | |
+ /* get value and rules */ | |
+ control_type = fixed_header->control_type; | |
+ control_flags = fixed_header->control_flags; | |
+ required_flags = mqtt_fixed_header_rules.required_flags[control_type]; | |
+ mask_required_flags = mqtt_fixed_header_rules.mask_required_flags[control_type]; | |
+ | |
+ /* check for valid type */ | |
+ if (!mqtt_fixed_header_rules.control_type_is_valid[control_type]) { | |
+ return MQTT_ERROR_CONTROL_FORBIDDEN_TYPE; | |
+ } | |
+ | |
+ /* check that flags are appropriate */ | |
+ if(MQTT_BITFIELD_RULE_VIOLOATION(control_flags, required_flags, mask_required_flags)) { | |
+ return MQTT_ERROR_CONTROL_INVALID_FLAGS; | |
+ } | |
+ | |
+ return 0; | |
+} | |
+ | |
+ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t *buf, size_t bufsz) { | |
+ struct mqtt_fixed_header *fixed_header; | |
+ const uint8_t *start = buf; | |
+ int lshift; | |
+ ssize_t errcode; | |
+ | |
+ /* check for null pointers or empty buffer */ | |
+ if (response == NULL || buf == NULL) { | |
+ return MQTT_ERROR_NULLPTR; | |
+ } | |
+ fixed_header = &(response->fixed_header); | |
+ | |
+ /* check that bufsz is not zero */ | |
+ if (bufsz == 0) return 0; | |
+ | |
+ /* parse control type and flags */ | |
+ fixed_header->control_type = *buf >> 4; | |
+ fixed_header->control_flags = *buf & 0x0F; | |
+ | |
+ /* parse remaining size */ | |
+ fixed_header->remaining_length = 0; | |
+ | |
+ lshift = 0; | |
+ do { | |
+ /* consume byte and assert at least 1 byte left */ | |
+ --bufsz; | |
+ ++buf; | |
+ if (bufsz == 0) return 0; | |
+ | |
+ /* parse next byte*/ | |
+ fixed_header->remaining_length += (*buf & 0x7F) << lshift; | |
+ lshift += 7; | |
+ } while(*buf & 0x80); /* while continue bit is set */ | |
+ | |
+ /* consume last byte */ | |
+ --bufsz; | |
+ ++buf; | |
+ | |
+ /* check that the fixed header is valid */ | |
+ errcode = mqtt_fixed_header_rule_violation(fixed_header); | |
+ if (errcode) { | |
+ return errcode; | |
+ } | |
+ | |
+ /* check that the buffer size if GT remaining length */ | |
+ if (bufsz < fixed_header->remaining_length) { | |
+ return 0; | |
+ } | |
+ | |
+ /* return how many bytes were consumed */ | |
+ return buf - start; | |
+} | |
+ | |
+ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fixed_header *fixed_header) { | |
+ const uint8_t *start = buf; | |
+ ssize_t errcode; | |
+ uint32_t remaining_length; | |
+ | |
+ /* check for null pointers or empty buffer */ | |
+ if (fixed_header == NULL || buf == NULL) { | |
+ return MQTT_ERROR_NULLPTR; | |
+ } | |
+ | |
+ /* check that the fixed header is valid */ | |
+ errcode = mqtt_fixed_header_rule_violation(fixed_header); | |
+ if (errcode) { | |
+ return errcode; | |
+ } | |
+ | |
+ /* check that bufsz is not zero */ | |
+ if (bufsz == 0) return 0; | |
+ | |
+ /* pack control type and flags */ | |
+ *buf = (((uint8_t) fixed_header->control_type) << 4) & 0xF0; | |
+ *buf |= ((uint8_t) fixed_header->control_flags) & 0x0F; | |
+ | |
+ remaining_length = fixed_header->remaining_length; | |
+ do { | |
+ /* consume byte and assert at least 1 byte left */ | |
+ --bufsz; | |
+ ++buf; | |
+ if (bufsz == 0) return 0; | |
+ | |
+ /* pack next byte */ | |
+ *buf = remaining_length & 0x7F; | |
+ if(remaining_length > 127) *buf |= 0x80; | |
+ remaining_length = remaining_length >> 7; | |
+ } while(*buf & 0x80); | |
+ | |
+ /* consume last byte */ | |
+ --bufsz; | |
+ ++buf; | |
+ | |
+ /* check that there's still enough space in buffer for packet */ | |
+ if (bufsz < fixed_header->remaining_length) { | |
+ return 0; | |
+ } | |
+ | |
+ /* return how many bytes were consumed */ | |
+ return buf - start; | |
+} | |
+ | |
+/* CONNECT */ | |
+ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, | |
+ const char* client_id, | |
+ const char* will_topic, | |
+ const void* will_message, | |
+ size_t will_message_size, | |
+ const char* user_name, | |
+ const char* password, | |
+ uint8_t connect_flags, | |
+ uint16_t keep_alive) | |
+{ | |
+ struct mqtt_fixed_header fixed_header; | |
+ uint32_t remaining_length; | |
+ const uint8_t const* start = buf; | |
+ ssize_t rv; | |
+ uint8_t temp; | |
+ | |
+ /* pack the fixed headr */ | |
+ fixed_header.control_type = MQTT_CONTROL_CONNECT; | |
+ fixed_header.control_flags = 0x00; | |
+ | |
+ /* calculate remaining length and build connect_flags at the same time */ | |
+ connect_flags = connect_flags & ~MQTT_CONNECT_RESERVED; | |
+ remaining_length = 10; /* size of variable header */ | |
+ | |
+ if (client_id == NULL) { | |
+ /* client_id is a mandatory parameter */ | |
+ return MQTT_ERROR_CONNECT_NULL_CLIENT_ID; | |
+ } else { | |
+ /* mqtt_string length is strlen + 2 */ | |
+ remaining_length += __mqtt_packed_cstrlen(client_id); | |
+ } | |
+ | |
+ if (will_topic != NULL) { | |
+ /* there is a will */ | |
+ connect_flags |= MQTT_CONNECT_WILL_FLAG; | |
+ remaining_length += __mqtt_packed_cstrlen(will_topic); | |
+ | |
+ if (will_message == NULL) { | |
+ /* if there's a will there MUST be a will message */ | |
+ return MQTT_ERROR_CONNECT_NULL_WILL_MESSAGE; | |
+ } | |
+ remaining_length += 2 + will_message_size; /* size of will_message */ | |
+ | |
+ /* assert that the will QOS is valid (i.e. not 3) */ | |
+ temp = connect_flags & 0x18; /* mask to QOS */ | |
+ if (temp == 0x18) { | |
+ /* bitwise equality with QoS 3 (invalid)*/ | |
+ return MQTT_ERROR_CONNECT_FORBIDDEN_WILL_QOS; | |
+ } | |
+ } else { | |
+ /* there is no will so set all will flags to zero */ | |
+ connect_flags &= ~MQTT_CONNECT_WILL_FLAG; | |
+ connect_flags &= ~0x18; | |
+ connect_flags &= ~MQTT_CONNECT_WILL_RETAIN; | |
+ } | |
+ | |
+ if (user_name != NULL) { | |
+ /* a user name is present */ | |
+ connect_flags |= MQTT_CONNECT_USER_NAME; | |
+ remaining_length += __mqtt_packed_cstrlen(user_name); | |
+ } else { | |
+ connect_flags &= ~MQTT_CONNECT_USER_NAME; | |
+ } | |
+ | |
+ if (password != NULL) { | |
+ /* a password is present */ | |
+ connect_flags |= MQTT_CONNECT_PASSWORD; | |
+ remaining_length += __mqtt_packed_cstrlen(password); | |
+ } else { | |
+ connect_flags &= ~MQTT_CONNECT_PASSWORD; | |
+ } | |
+ | |
+ /* fixed header length is now calculated*/ | |
+ fixed_header.remaining_length = remaining_length; | |
+ | |
+ /* pack fixed header and perform error checks */ | |
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header); | |
+ if (rv <= 0) { | |
+ /* something went wrong */ | |
+ return rv; | |
+ } | |
+ buf += rv; | |
+ bufsz -= rv; | |
+ | |
+ /* check that the buffer has enough space to fit the remaining length */ | |
+ if (bufsz < fixed_header.remaining_length) { | |
+ return 0; | |
+ } | |
+ | |
+ /* pack the variable header */ | |
+ *buf++ = 0x00; | |
+ *buf++ = 0x04; | |
+ *buf++ = (uint8_t) 'M'; | |
+ *buf++ = (uint8_t) 'Q'; | |
+ *buf++ = (uint8_t) 'T'; | |
+ *buf++ = (uint8_t) 'T'; | |
+ *buf++ = MQTT_PROTOCOL_LEVEL; | |
+ *buf++ = connect_flags; | |
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(keep_alive); | |
+ buf += 2; | |
+ | |
+ /* pack the payload */ | |
+ buf += __mqtt_pack_str(buf, client_id); | |
+ if (connect_flags & MQTT_CONNECT_WILL_FLAG) { | |
+ buf += __mqtt_pack_str(buf, will_topic); | |
+ memcpy(buf, will_message, will_message_size); | |
+ buf += will_message_size; | |
+ } | |
+ if (connect_flags & MQTT_CONNECT_USER_NAME) { | |
+ buf += __mqtt_pack_str(buf, user_name); | |
+ } | |
+ if (connect_flags & MQTT_CONNECT_PASSWORD) { | |
+ buf += __mqtt_pack_str(buf, password); | |
+ } | |
+ | |
+ /* return the number of bytes that were consumed */ | |
+ return buf - start; | |
+} | |
+ | |
+/* CONNACK */ | |
+ssize_t mqtt_unpack_connack_response(struct mqtt_response *mqtt_response, const uint8_t *buf) { | |
+ const uint8_t const *start = buf; | |
+ struct mqtt_response_connack *response; | |
+ | |
+ /* check that remaining length is 2 */ | |
+ if (mqtt_response->fixed_header.remaining_length != 2) { | |
+ return MQTT_ERROR_MALFORMED_RESPONSE; | |
+ } | |
+ | |
+ response = &(mqtt_response->decoded.connack); | |
+ /* unpack */ | |
+ if (*buf & 0xFE) { | |
+ /* only bit 1 can be set */ | |
+ return MQTT_ERROR_CONNACK_FORBIDDEN_FLAGS; | |
+ } else { | |
+ response->session_present_flag = *buf++; | |
+ } | |
+ | |
+ if (*buf > 5u) { | |
+ /* only bit 1 can be set */ | |
+ return MQTT_ERROR_CONNACK_FORBIDDEN_CODE; | |
+ } else { | |
+ response->return_code = (enum MQTTConnackReturnCode) *buf++; | |
+ } | |
+ return buf - start; | |
+} | |
+ | |
+/* DISCONNECT */ | |
+ssize_t mqtt_pack_disconnect(uint8_t *buf, size_t bufsz) { | |
+ struct mqtt_fixed_header fixed_header; | |
+ fixed_header.control_type = MQTT_CONTROL_DISCONNECT; | |
+ fixed_header.control_flags = 0; | |
+ fixed_header.remaining_length = 0; | |
+ return mqtt_pack_fixed_header(buf, bufsz, &fixed_header); | |
+} | |
+ | |
+/* PING */ | |
+ssize_t mqtt_pack_ping_request(uint8_t *buf, size_t bufsz) { | |
+ struct mqtt_fixed_header fixed_header; | |
+ fixed_header.control_type = MQTT_CONTROL_PINGREQ; | |
+ fixed_header.control_flags = 0; | |
+ fixed_header.remaining_length = 0; | |
+ return mqtt_pack_fixed_header(buf, bufsz, &fixed_header); | |
+} | |
+ | |
+/* PUBLISH */ | |
+ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz, | |
+ const char* topic_name, | |
+ uint16_t packet_id, | |
+ void* application_message, | |
+ size_t application_message_size, | |
+ uint8_t publish_flags) | |
+{ | |
+ const uint8_t const *start = buf; | |
+ ssize_t rv; | |
+ struct mqtt_fixed_header fixed_header; | |
+ uint16_t remaining_length; | |
+ uint8_t inspected_qos; | |
+ | |
+ /* check for null pointers */ | |
+ if(buf == NULL || topic_name == NULL) { | |
+ return MQTT_ERROR_NULLPTR; | |
+ } | |
+ | |
+ /* inspect QoS level */ | |
+ inspected_qos = (publish_flags & 0x06) >> 1; /* mask */ | |
+ | |
+ /* build the fixed header */ | |
+ fixed_header.control_type = MQTT_CONTROL_PUBLISH; | |
+ | |
+ /* calculate remaining length */ | |
+ remaining_length = __mqtt_packed_cstrlen(topic_name); | |
+ if (inspected_qos > 0) { | |
+ remaining_length += 2; | |
+ } | |
+ remaining_length += application_message_size; | |
+ fixed_header.remaining_length = remaining_length; | |
+ | |
+ /* force dup to 0 if qos is 0 */ | |
+ if (inspected_qos == 0) { | |
+ publish_flags &= ~MQTT_PUBLISH_DUP; | |
+ } | |
+ | |
+ /* make sure that qos is not 3 */ | |
+ if (inspected_qos == 3) { | |
+ return MQTT_ERROR_PUBLISH_FORBIDDEN_QOS; | |
+ } | |
+ fixed_header.control_flags = publish_flags; | |
+ | |
+ /* pack fixed header */ | |
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header); | |
+ if (rv <= 0) { | |
+ /* something went wrong */ | |
+ return rv; | |
+ } | |
+ buf += rv; | |
+ bufsz -= rv; | |
+ | |
+ /* check that buffer is big enough */ | |
+ if (bufsz < remaining_length) { | |
+ return 0; | |
+ } | |
+ | |
+ /* pack variable header */ | |
+ buf += __mqtt_pack_str(buf, topic_name); | |
+ if (inspected_qos > 0) { | |
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id); | |
+ buf += 2; | |
+ } | |
+ | |
+ /* pack payload */ | |
+ memcpy(buf, application_message, application_message_size); | |
+ buf += application_message_size; | |
+ | |
+ return buf - start; | |
+} | |
+ | |
+ssize_t mqtt_unpack_publish_response(struct mqtt_response *mqtt_response, const uint8_t *buf) | |
+{ | |
+ const uint8_t const *start = buf; | |
+ struct mqtt_fixed_header *fixed_header; | |
+ struct mqtt_response_publish *response; | |
+ | |
+ fixed_header = &(mqtt_response->fixed_header); | |
+ response = &(mqtt_response->decoded.publish); | |
+ | |
+ /* get flags */ | |
+ response->dup_flag = (fixed_header->control_flags & MQTT_PUBLISH_DUP) >> 3; | |
+ response->qos_level = (fixed_header->control_flags & 0x06) >> 1; | |
+ response->retain_flag = fixed_header->control_flags & MQTT_PUBLISH_RETAIN; | |
+ | |
+ /* make sure that remaining length is valid */ | |
+ if (mqtt_response->fixed_header.remaining_length < 4) { | |
+ return MQTT_ERROR_MALFORMED_RESPONSE; | |
+ } | |
+ | |
+ /* parse variable header */ | |
+ response->topic_name_size = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); | |
+ buf += 2; | |
+ response->topic_name = buf; | |
+ buf += response->topic_name_size; | |
+ | |
+ if (response->qos_level > 0) { | |
+ response->packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); | |
+ buf += 2; | |
+ } | |
+ | |
+ /* get payload */ | |
+ response->application_message = buf; | |
+ if (response->qos_level == 0) { | |
+ response->application_message_size = fixed_header->remaining_length - response->topic_name_size - 2; | |
+ } else { | |
+ response->application_message_size = fixed_header->remaining_length - response->topic_name_size - 4; | |
+ } | |
+ buf += response->application_message_size; | |
+ | |
+ /* return number of bytes consumed */ | |
+ return buf - start; | |
+} | |
+ | |
+/* PUBXXX */ | |
+ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz, | |
+ enum MQTTControlPacketType control_type, | |
+ uint16_t packet_id) | |
+{ | |
+ const uint8_t const *start = buf; | |
+ struct mqtt_fixed_header fixed_header; | |
+ ssize_t rv; | |
+ if (buf == NULL) { | |
+ return MQTT_ERROR_NULLPTR; | |
+ } | |
+ | |
+ /* pack fixed header */ | |
+ fixed_header.control_type = control_type; | |
+ if (control_type == MQTT_CONTROL_PUBREL) { | |
+ fixed_header.control_flags = 0x02; | |
+ } else { | |
+ fixed_header.control_flags = 0; | |
+ } | |
+ fixed_header.remaining_length = 2; | |
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header); | |
+ if (rv <= 0) { | |
+ return rv; | |
+ } | |
+ buf += rv; | |
+ bufsz -= rv; | |
+ | |
+ if (bufsz < fixed_header.remaining_length) { | |
+ return 0; | |
+ } | |
+ | |
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id); | |
+ buf += 2; | |
+ | |
+ return buf - start; | |
+} | |
+ | |
+ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf) | |
+{ | |
+ const uint8_t const *start = buf; | |
+ uint16_t packet_id; | |
+ | |
+ /* assert remaining length is correct */ | |
+ if (mqtt_response->fixed_header.remaining_length != 2) { | |
+ return MQTT_ERROR_MALFORMED_RESPONSE; | |
+ } | |
+ | |
+ /* parse packet_id */ | |
+ packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); | |
+ buf += 2; | |
+ | |
+ if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBACK) { | |
+ mqtt_response->decoded.puback.packet_id = packet_id; | |
+ } else if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBREC) { | |
+ mqtt_response->decoded.pubrec.packet_id = packet_id; | |
+ } else if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBREL) { | |
+ mqtt_response->decoded.pubrel.packet_id = packet_id; | |
+ } else { | |
+ mqtt_response->decoded.pubcomp.packet_id = packet_id; | |
+ } | |
+ | |
+ return buf - start; | |
+} | |
+ | |
+/* SUBACK */ | |
+ssize_t mqtt_unpack_suback_response (struct mqtt_response *mqtt_response, const uint8_t *buf) { | |
+ const uint8_t const *start = buf; | |
+ uint32_t remaining_length = mqtt_response->fixed_header.remaining_length; | |
+ | |
+ /* assert remaining length is at least 3 (for packet id and at least 1 topic) */ | |
+ if (remaining_length < 3) { | |
+ return MQTT_ERROR_MALFORMED_RESPONSE; | |
+ } | |
+ | |
+ /* unpack packet_id */ | |
+ mqtt_response->decoded.suback.packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); | |
+ buf += 2; | |
+ remaining_length -= 2; | |
+ | |
+ /* unpack return codes */ | |
+ mqtt_response->decoded.suback.num_return_codes = (size_t) remaining_length; | |
+ mqtt_response->decoded.suback.return_codes = buf; | |
+ buf += remaining_length; | |
+ | |
+ return buf - start; | |
+} | |
+ | |
+/* SUBSCRIBE */ | |
+ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_id, ...) { | |
+ va_list args; | |
+ const uint8_t const *start = buf; | |
+ ssize_t rv; | |
+ struct mqtt_fixed_header fixed_header; | |
+ unsigned int num_subs = 0; | |
+ unsigned int i; | |
+ const char *topic[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS]; | |
+ uint8_t max_qos[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS]; | |
+ | |
+ /* parse all subscriptions */ | |
+ va_start(args, packet_id); | |
+ while(1) { | |
+ topic[num_subs] = va_arg(args, const char*); | |
+ if (topic[num_subs] == NULL) { | |
+ /* end of list */ | |
+ break; | |
+ } | |
+ | |
+ max_qos[num_subs] = (uint8_t) va_arg(args, unsigned int); | |
+ | |
+ ++num_subs; | |
+ if (num_subs >= MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS) { | |
+ return MQTT_ERROR_SUBSCRIBE_TOO_MANY_TOPICS; | |
+ } | |
+ } | |
+ va_end(args); | |
+ | |
+ /* build the fixed header */ | |
+ fixed_header.control_type = MQTT_CONTROL_SUBSCRIBE; | |
+ fixed_header.control_flags = 2u; | |
+ fixed_header.remaining_length = 2u; /* size of variable header */ | |
+ for(i = 0; i < num_subs; ++i) { | |
+ /* payload is topic name + max qos (1 byte) */ | |
+ fixed_header.remaining_length += __mqtt_packed_cstrlen(topic[i]) + 1; | |
+ } | |
+ | |
+ /* pack the fixed header */ | |
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header); | |
+ if (rv <= 0) { | |
+ return rv; | |
+ } | |
+ buf += rv; | |
+ bufsz -= rv; | |
+ | |
+ /* check that the buffer has enough space */ | |
+ if (bufsz < fixed_header.remaining_length) { | |
+ return 0; | |
+ } | |
+ | |
+ /* pack variable header */ | |
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id); | |
+ buf += 2; | |
+ | |
+ | |
+ /* pack payload */ | |
+ for(i = 0; i < num_subs; ++i) { | |
+ buf += __mqtt_pack_str(buf, topic[i]); | |
+ *buf++ = max_qos[i]; | |
+ } | |
+ | |
+ return buf - start; | |
+} | |
+ | |
+/* UNSUBACK */ | |
+ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf) | |
+{ | |
+ const uint8_t const *start = buf; | |
+ | |
+ if (mqtt_response->fixed_header.remaining_length != 2) { | |
+ return MQTT_ERROR_MALFORMED_RESPONSE; | |
+ } | |
+ | |
+ /* parse packet_id */ | |
+ mqtt_response->decoded.unsuback.packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); | |
+ buf += 2; | |
+ | |
+ return buf - start; | |
+} | |
+ | |
+/* UNSUBSCRIBE */ | |
+ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_id, ...) { | |
+ va_list args; | |
+ const uint8_t const *start = buf; | |
+ ssize_t rv; | |
+ struct mqtt_fixed_header fixed_header; | |
+ unsigned int num_subs = 0; | |
+ unsigned int i; | |
+ const char *topic[MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS]; | |
+ | |
+ /* parse all subscriptions */ | |
+ va_start(args, packet_id); | |
+ while(1) { | |
+ topic[num_subs] = va_arg(args, const char*); | |
+ if (topic[num_subs] == NULL) { | |
+ /* end of list */ | |
+ break; | |
+ } | |
+ | |
+ ++num_subs; | |
+ if (num_subs >= MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS) { | |
+ return MQTT_ERROR_UNSUBSCRIBE_TOO_MANY_TOPICS; | |
+ } | |
+ } | |
+ va_end(args); | |
+ | |
+ /* build the fixed header */ | |
+ fixed_header.control_type = MQTT_CONTROL_UNSUBSCRIBE; | |
+ fixed_header.control_flags = 2u; | |
+ fixed_header.remaining_length = 2u; /* size of variable header */ | |
+ for(i = 0; i < num_subs; ++i) { | |
+ /* payload is topic name */ | |
+ fixed_header.remaining_length += __mqtt_packed_cstrlen(topic[i]); | |
+ } | |
+ | |
+ /* pack the fixed header */ | |
+ rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header); | |
+ if (rv <= 0) { | |
+ return rv; | |
+ } | |
+ buf += rv; | |
+ bufsz -= rv; | |
+ | |
+ /* check that the buffer has enough space */ | |
+ if (bufsz < fixed_header.remaining_length) { | |
+ return 0; | |
+ } | |
+ | |
+ /* pack variable header */ | |
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id); | |
+ buf += 2; | |
+ | |
+ | |
+ /* pack payload */ | |
+ for(i = 0; i < num_subs; ++i) { | |
+ buf += __mqtt_pack_str(buf, topic[i]); | |
+ } | |
+ | |
+ return buf - start; | |
+} | |
+ | |
+/* MESSAGE QUEUE */ | |
+void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz) | |
+{ | |
+ mq->mem_start = buf; | |
+ mq->mem_end = buf + bufsz; | |
+ mq->curr = buf; | |
+ mq->queue_tail = mq->mem_end; | |
+ mq->curr_sz = mqtt_mq_currsz(mq); | |
+} | |
+ | |
+struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes) | |
+{ | |
+ /* make queued message header */ | |
+ --(mq->queue_tail); | |
+ mq->queue_tail->start = mq->curr; | |
+ mq->queue_tail->size = nbytes; | |
+ mq->queue_tail->state = MQTT_QUEUED_UNSENT; | |
+ | |
+ /* move curr and recalculate curr_sz */ | |
+ mq->curr += nbytes; | |
+ mq->curr_sz = mqtt_mq_currsz(mq); | |
+ | |
+ return mq->queue_tail; | |
+} | |
+ | |
+void mqtt_mq_clean(struct mqtt_message_queue *mq) { | |
+ struct mqtt_queued_message *new_head; | |
+ ssize_t i; | |
+ | |
+ for(new_head = mqtt_mq_get(mq, 0); new_head >= mq->queue_tail; --new_head) { | |
+ if (new_head->state != MQTT_QUEUED_COMPLETE) break; | |
+ } | |
+ | |
+ /* check if everything can be removed */ | |
+ if (new_head < mq->queue_tail) { | |
+ mq->curr = mq->mem_start; | |
+ mq->queue_tail = mq->mem_end; | |
+ mq->curr_sz = mqtt_mq_currsz(mq); | |
+ return; | |
+ } else if (new_head == mqtt_mq_get(mq, 0)) { | |
+ /* do nothing */ | |
+ return; | |
+ } | |
+ | |
+ /* move buffered data */ | |
+ size_t n = mq->curr - new_head->start; | |
+ size_t removing = new_head->start - (uint8_t*) mq->mem_start; | |
+ memmove(mq->mem_start, new_head->start, n); | |
+ mq->curr = mq->mem_start + n; | |
+ | |
+ /* move queue */ | |
+ ssize_t new_tail_idx = new_head - mq->queue_tail; | |
+ memmove(mqtt_mq_get(mq, new_tail_idx), mq->queue_tail, sizeof(struct mqtt_queued_message) * (new_tail_idx + 1)); | |
+ mq->queue_tail = mqtt_mq_get(mq, new_tail_idx); | |
+ | |
+ /* bump back start's */ | |
+ for(i = 0; i < new_tail_idx + 1; ++i) { | |
+ mqtt_mq_get(mq, i)->start -= removing; | |
+ } | |
+ | |
+ /* get curr_sz */ | |
+ mq->curr_sz = mqtt_mq_currsz(mq); | |
+} | |
+ | |
+struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQTTControlPacketType control_type, uint16_t *packet_id) | |
+{ | |
+ struct mqtt_queued_message *curr; | |
+ for(curr = mqtt_mq_get(mq, 0); curr >= mq->queue_tail; --curr) { | |
+ if (curr->control_type == control_type) { | |
+ if (packet_id == NULL || (packet_id != NULL && *packet_id == curr->packet_id)) { | |
+ return curr; | |
+ } | |
+ } | |
+ } | |
+ return NULL; | |
+} | |
+ | |
+ | |
+/* RESPONSE UNPACKING */ | |
+ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, size_t bufsz) { | |
+ const uint8_t const *start = buf; | |
+ ssize_t rv = mqtt_unpack_fixed_header(response, buf, bufsz); | |
+ if (rv <= 0) return rv; | |
+ else buf += rv; | |
+ switch(response->fixed_header.control_type) { | |
+ case MQTT_CONTROL_CONNACK: | |
+ rv = mqtt_unpack_connack_response(response, buf); | |
+ break; | |
+ case MQTT_CONTROL_PUBLISH: | |
+ rv = mqtt_unpack_publish_response(response, buf); | |
+ break; | |
+ case MQTT_CONTROL_PUBACK: | |
+ rv = mqtt_unpack_pubxxx_response(response, buf); | |
+ break; | |
+ case MQTT_CONTROL_PUBREC: | |
+ rv = mqtt_unpack_pubxxx_response(response, buf); | |
+ break; | |
+ case MQTT_CONTROL_PUBREL: | |
+ rv = mqtt_unpack_pubxxx_response(response, buf); | |
+ break; | |
+ case MQTT_CONTROL_PUBCOMP: | |
+ rv = mqtt_unpack_pubxxx_response(response, buf); | |
+ break; | |
+ case MQTT_CONTROL_SUBACK: | |
+ rv = mqtt_unpack_suback_response(response, buf); | |
+ break; | |
+ case MQTT_CONTROL_UNSUBACK: | |
+ rv = mqtt_unpack_unsuback_response(response, buf); | |
+ break; | |
+ case MQTT_CONTROL_PINGRESP: | |
+ return rv; | |
+ default: | |
+ return MQTT_ERROR_RESPONSE_INVALID_CONTROL_TYPE; | |
+ } | |
+ | |
+ if (rv < 0) return rv; | |
+ buf += rv; | |
+ return buf - start; | |
+} | |
+ | |
+/* EXTRA DETAILS */ | |
+ssize_t __mqtt_pack_str(uint8_t *buf, const char* str) { | |
+ uint16_t length = strlen(str); | |
+ int i; | |
+ | |
+ /* pack string length */ | |
+ *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(length); | |
+ buf += 2; | |
+ | |
+ /* pack string */ | |
+ for(i = 0; i < length; ++i) { | |
+ *(buf++) = str[i]; | |
+ } | |
+ | |
+ /* return number of bytes consumed */ | |
+ return length + 2; | |
+} | |
+ | |
+static const char *MQTT_ERRORS_STR[] = { | |
+ "MQTT_UNKNOWN_ERROR", | |
+ __ALL_MQTT_ERRORS(GENERATE_STRING) | |
+}; | |
+ | |
+const char* mqtt_error_str(enum MQTTErrors error) { | |
+ int offset = error - MQTT_ERROR_UNKNOWN; | |
+ if (offset >= 0) { | |
+ return MQTT_ERRORS_STR[offset]; | |
+ } else if (error == 0) { | |
+ return "MQTT_ERROR: Buffer too small."; | |
+ } else if (error > 0) { | |
+ return "MQTT_OK"; | |
+ } else { | |
+ return MQTT_ERRORS_STR[0]; | |
+ } | |
+} | |
+ | |
+/** @endcond*/ | |
diff --git a/netutils/mqttc/mqtt_pal.c b/netutils/mqttc/mqtt_pal.c | |
new file mode 100644 | |
index 00000000..4c6a534d | |
--- /dev/null | |
+++ b/netutils/mqttc/mqtt_pal.c | |
@@ -0,0 +1,90 @@ | |
+#include "netutils/mqtt.h" | |
+ | |
+/** | |
+ * @file | |
+ * @brief Implements @ref mqtt_pal_sendall and @ref mqtt_pal_recvall and | |
+ * any platform-specific helpers you'd like. | |
+ * @cond Doxygen_Suppress | |
+ */ | |
+ | |
+//#define __unix__ 1 | |
+#ifdef __unix__ | |
+ | |
+#ifdef MQTT_USE_BIO | |
+#include <openssl/bio.h> | |
+#include <openssl/ssl.h> | |
+#include <openssl/err.h> | |
+ | |
+ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) { | |
+ size_t sent = 0; | |
+ | |
+ while(sent < len) { | |
+ int tmp = BIO_write(fd, buf + sent, len - sent); | |
+ if (tmp > 0) { | |
+ sent += (size_t) tmp; | |
+ } else if (tmp <= 0 && !BIO_should_retry(fd)) { | |
+ return MQTT_ERROR_SOCKET_ERROR; | |
+ } | |
+ } | |
+ | |
+ return sent; | |
+} | |
+ | |
+ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) { | |
+ const void const *start = buf; | |
+ int rv; | |
+ do { | |
+ rv = BIO_read(fd, buf, bufsz); | |
+ if (rv > 0) { | |
+ /* successfully read bytes from the socket */ | |
+ buf += rv; | |
+ bufsz -= rv; | |
+ } else if (!BIO_should_retry(fd)) { | |
+ /* an error occurred that wasn't "nothing to read". */ | |
+ return MQTT_ERROR_SOCKET_ERROR; | |
+ } | |
+ } while (!BIO_should_read(fd)); | |
+ | |
+ return (ssize_t)(buf - start); | |
+} | |
+ | |
+#else | |
+#include <errno.h> | |
+ | |
+ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) { | |
+ size_t sent = 0; | |
+ | |
+ while(sent < len) { | |
+ ssize_t tmp = send(fd, buf + sent, len - sent, flags); | |
+ if (tmp < 1) { | |
+ return MQTT_ERROR_SOCKET_ERROR; | |
+ } | |
+ sent += (size_t) tmp; | |
+ } | |
+ return sent; | |
+} | |
+ | |
+ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) { | |
+ const void const *start = buf; | |
+ ssize_t rv; | |
+ | |
+ do { | |
+ rv = recv(fd, buf, bufsz, flags); | |
+ if (rv > 0) { | |
+ /* successfully read bytes from the socket */ | |
+ buf += rv; | |
+ bufsz -= rv; | |
+ } else if (rv < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { | |
+ /* an error occurred that wasn't "nothing to read". */ | |
+ return MQTT_ERROR_SOCKET_ERROR; | |
+ } | |
+ } while (rv > 0); | |
+ | |
+ return buf - start; | |
+} | |
+ | |
+#endif | |
+ | |
+#endif | |
+ | |
+/** @endcond */ | |
diff --git a/system/mqttc/.gitignore b/system/mqttc/.gitignore | |
new file mode 100644 | |
index 00000000..83bd7b81 | |
--- /dev/null | |
+++ b/system/mqttc/.gitignore | |
@@ -0,0 +1,11 @@ | |
+/Make.dep | |
+/.depend | |
+/.built | |
+/*.asm | |
+/*.rel | |
+/*.lst | |
+/*.sym | |
+/*.adb | |
+/*.lib | |
+/*.src | |
+/*.obj | |
diff --git a/system/mqttc/Kconfig b/system/mqttc/Kconfig | |
new file mode 100644 | |
index 00000000..d5bc2737 | |
--- /dev/null | |
+++ b/system/mqttc/Kconfig | |
@@ -0,0 +1,31 @@ | |
+# | |
+# For a description of the syntax of this configuration file, | |
+# see the file kconfig-language.txt in the NuttX tools repository. | |
+# | |
+ | |
+config SYSTEM_MQTTC | |
+ bool "'mqttc' command" | |
+ default n | |
+ depends on NET | |
+ ---help--- | |
+ Enable support for the 'mqttc' command. | |
+ | |
+if SYSTEM_MQTTC | |
+config SYSTEM_MQTTC_PROGNAME | |
+ string "MQTTC program name" | |
+ default "mqttc" | |
+ depends on BUILD_KERNEL | |
+ ---help--- | |
+ This is the name of the program that will be use when the NSH ELF | |
+ program is installed. | |
+ | |
+config SYSTEM_MQTTC_PRIORITY | |
+ int "MQTTC task priority" | |
+ default 100 | |
+ | |
+config SYSTEM_MQTTC_STACKSIZE | |
+ int "MQTTC stack size" | |
+ default 8192 | |
+ | |
+endif | |
+ | |
diff --git a/system/mqttc/Make.defs b/system/mqttc/Make.defs | |
new file mode 100644 | |
index 00000000..07090fcc | |
--- /dev/null | |
+++ b/system/mqttc/Make.defs | |
@@ -0,0 +1,40 @@ | |
+############################################################################ | |
+# apps/system/mqttc/Make.defs | |
+# Adds selected applications to apps/ build | |
+# | |
+# Copyright (C) 2017 Gregory Nutt. All rights reserved. | |
+# Author: Gregory Nutt <gnutt@nuttx.org> | |
+# | |
+# Redistribution and use in source and binary forms, with or without | |
+# modification, are permitted provided that the following conditions | |
+# are met: | |
+# | |
+# 1. Redistributions of source code must retain the above copyright | |
+# notice, this list of conditions and the following disclaimer. | |
+# 2. Redistributions in binary form must reproduce the above copyright | |
+# notice, this list of conditions and the following disclaimer in | |
+# the documentation and/or other materials provided with the | |
+# distribution. | |
+# 3. Neither the name NuttX nor the names of its contributors may be | |
+# used to endorse or promote products derived from this software | |
+# without specific prior written permission. | |
+# | |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS | |
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE | |
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, | |
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, | |
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS | |
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED | |
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN | |
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
+# POSSIBILITY OF SUCH DAMAGE. | |
+# | |
+############################################################################ | |
+ | |
+ifeq ($(CONFIG_SYSTEM_MQTTC),y) | |
+CONFIGURED_APPS += system/mqttc | |
+endif | |
+ | |
diff --git a/system/mqttc/Makefile b/system/mqttc/Makefile | |
new file mode 100644 | |
index 00000000..27286eea | |
--- /dev/null | |
+++ b/system/mqttc/Makefile | |
@@ -0,0 +1,147 @@ | |
+############################################################################ | |
+# apps/system/mqttc/Makefile | |
+# | |
+# Copyright (C) 2017 Gregory Nutt. All rights reserved. | |
+# Author: Gregory Nutt <gnutt@nuttx.org> | |
+# | |
+# Redistribution and use in source and binary forms, with or without | |
+# modification, are permitted provided that the following conditions | |
+# are met: | |
+# | |
+# 1. Redistributions of source code must retain the above copyright | |
+# notice, this list of conditions and the following disclaimer. | |
+# 2. Redistributions in binary form must reproduce the above copyright | |
+# notice, this list of conditions and the following disclaimer in | |
+# the documentation and/or other materials provided with the | |
+# distribution. | |
+# 3. Neither the name NuttX nor the names of its contributors may be | |
+# used to endorse or promote products derived from this software | |
+# without specific prior written permission. | |
+# | |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS | |
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE | |
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, | |
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, | |
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS | |
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED | |
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN | |
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
+# POSSIBILITY OF SUCH DAMAGE. | |
+# | |
+############################################################################ | |
+ | |
+-include $(TOPDIR)/.config | |
+-include $(TOPDIR)/Make.defs | |
+include $(APPDIR)/Make.defs | |
+ | |
+# mqttc command | |
+ | |
+CONFIG_SYSTEM_MQTTC_PRIORITY ?= SCHED_PRIORITY_DEFAULT | |
+CONFIG_SYSTEM_MQTTC_STACKSIZE ?= 2048 | |
+ | |
+APPNAME = mqttc | |
+PRIORITY = $(CONFIG_SYSTEM_MQTTC_PRIORITY) | |
+STACKSIZE = $(CONFIG_SYSTEM_MQTTC_STACKSIZE) | |
+ | |
+CONFIG_SYSTEM_MQTTC_PROGNAME ?= mqttc$(EXEEXT) | |
+PROGNAME = $(CONFIG_SYSTEM_MQTTC_PROGNAME) | |
+ | |
+# Files | |
+ | |
+ASRCS = | |
+CSRCS = | |
+MAINSRC = simple_publisher.c | |
+ | |
+AOBJS = $(ASRCS:.S=$(OBJEXT)) | |
+COBJS = $(CSRCS:.c=$(OBJEXT)) | |
+MAINOBJ = $(MAINSRC:.c=$(OBJEXT)) | |
+ | |
+SRCS = $(ASRCS) $(CSRCS) $(MAINSRC) | |
+OBJS = $(AOBJS) $(COBJS) | |
+ | |
+ifneq ($(CONFIG_BUILD_KERNEL),y) | |
+ OBJS += $(MAINOBJ) | |
+endif | |
+ | |
+ifeq ($(CONFIG_WINDOWS_NATIVE),y) | |
+ BIN = ..\..\libapps$(LIBEXT) | |
+else | |
+ifeq ($(WINTOOL),y) | |
+ BIN = ..\\..\\libapps$(LIBEXT) | |
+else | |
+ BIN = ../../libapps$(LIBEXT) | |
+endif | |
+endif | |
+ | |
+ifeq ($(WINTOOL),y) | |
+ INSTALL_DIR = "${shell cygpath -w $(BIN_DIR)}" | |
+else | |
+ INSTALL_DIR = $(BIN_DIR) | |
+endif | |
+ | |
+ROOTDEPPATH = --dep-path . | |
+ | |
+# Common build | |
+ | |
+VPATH = | |
+ | |
+all: .built | |
+.PHONY: context depend clean distclean preconfig | |
+.PRECIOUS: ../../libapps$(LIBEXT) | |
+ | |
+$(AOBJS): %$(OBJEXT): %.S | |
+ $(call ASSEMBLE, $<, $@) | |
+ | |
+$(COBJS) $(MAINOBJ): %$(OBJEXT): %.c | |
+ $(call COMPILE, $<, $@) | |
+ | |
+.built: $(OBJS) | |
+ $(call ARCHIVE, $(BIN), $(OBJS)) | |
+ $(Q) touch .built | |
+ | |
+ifeq ($(CONFIG_BUILD_KERNEL),y) | |
+$(BIN_DIR)$(DELIM)$(PROGNAME): $(OBJS) $(MAINOBJ) | |
+ @echo "LD: $(PROGNAME)" | |
+ $(Q) $(LD) $(LDELFFLAGS) $(LDLIBPATH) -o $(INSTALL_DIR)$(DELIM)$(PROGNAME) $(ARCHCRT0OBJ) $(MAINOBJ) $(LDLIBS) | |
+ $(Q) $(NM) -u $(INSTALL_DIR)$(DELIM)$(PROGNAME) | |
+ | |
+install: $(BIN_DIR)$(DELIM)$(PROGNAME) | |
+ | |
+else | |
+install: | |
+ | |
+endif | |
+ | |
+# Register application | |
+ | |
+ifeq ($(CONFIG_NSH_BUILTIN_APPS),y) | |
+$(BUILTIN_REGISTRY)$(DELIM)$(APPNAME)_main.bdat: $(DEPCONFIG) Makefile | |
+ $(call REGISTER,$(APPNAME),$(PRIORITY),$(STACKSIZE),$(APPNAME)_main) | |
+ | |
+context: $(BUILTIN_REGISTRY)$(DELIM)$(APPNAME)_main.bdat | |
+else | |
+context: | |
+endif | |
+ | |
+# Create dependencies | |
+ | |
+.depend: Makefile $(SRCS) | |
+ $(Q) $(MKDEP) $(ROOTDEPPATH) "$(CC)" -- $(CFLAGS) -- $(SRCS) >Make.dep | |
+ $(Q) touch $@ | |
+ | |
+depend: .depend | |
+ | |
+clean: | |
+ $(call DELFILE, .built) | |
+ $(call CLEAN) | |
+ | |
+distclean: clean | |
+ $(call DELFILE, Make.dep) | |
+ $(call DELFILE, .depend) | |
+ | |
+preconfig: | |
+ | |
+-include Make.dep | |
diff --git a/system/mqttc/bio_publisher.c b/system/mqttc/bio_publisher.c | |
new file mode 100644 | |
index 00000000..8c92b549 | |
--- /dev/null | |
+++ b/system/mqttc/bio_publisher.c | |
@@ -0,0 +1,156 @@ | |
+ | |
+/** | |
+ * @file | |
+ * A simple program to that publishes the current time whenever ENTER is pressed. | |
+ */ | |
+#include <unistd.h> | |
+#include <stdlib.h> | |
+#include <stdio.h> | |
+ | |
+#include <mqtt.h> | |
+#include "templates/bio_sockets.h" | |
+ | |
+ | |
+/** | |
+ * @brief The function that would be called whenever a PUBLISH is received. | |
+ * | |
+ * @note This function is not used in this example. | |
+ */ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published); | |
+ | |
+/** | |
+ * @brief The client's refresher. This function triggers back-end routines to | |
+ * handle ingress/egress traffic to the broker. | |
+ * | |
+ * @note All this function needs to do is call \ref __mqtt_recv and | |
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that | |
+ * client ingress/egress traffic will be handled every 100 ms. | |
+ */ | |
+void* client_refresher(void* client); | |
+ | |
+/** | |
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. | |
+ */ | |
+void exit_example(int status, BIO* sockfd, pthread_t *client_daemon); | |
+ | |
+/** | |
+ * A simple program to that publishes the current time whenever ENTER is pressed. | |
+ */ | |
+int main(int argc, const char *argv[]) | |
+{ | |
+ const char* addr; | |
+ const char* port; | |
+ const char* topic; | |
+ | |
+ /* Load OpenSSL */ | |
+ SSL_load_error_strings(); | |
+ ERR_load_BIO_strings(); | |
+ OpenSSL_add_all_algorithms(); | |
+ | |
+ /* get address (argv[1] if present) */ | |
+ if (argc > 1) { | |
+ addr = argv[1]; | |
+ } else { | |
+ addr = "test.mosquitto.org"; | |
+ } | |
+ | |
+ /* get port number (argv[2] if present) */ | |
+ if (argc > 2) { | |
+ port = argv[2]; | |
+ } else { | |
+ port = "1883"; | |
+ } | |
+ | |
+ /* get the topic name to publish */ | |
+ if (argc > 3) { | |
+ topic = argv[3]; | |
+ } else { | |
+ topic = "datetime"; | |
+ } | |
+ | |
+ /* open the non-blocking TCP socket (connecting to the broker) */ | |
+ BIO* sockfd = open_nb_socket(addr, port); | |
+ | |
+ if (sockfd == NULL) { | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ } | |
+ | |
+ /* setup a client */ | |
+ struct mqtt_client client; | |
+ uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */ | |
+ uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */ | |
+ mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback); | |
+ mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400); | |
+ | |
+ /* check that we don't have any errors */ | |
+ if (client.error != MQTT_OK) { | |
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ } | |
+ | |
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */ | |
+ pthread_t client_daemon; | |
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) { | |
+ fprintf(stderr, "Failed to start client daemon.\n"); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ | |
+ } | |
+ | |
+ /* start publishing the time */ | |
+ printf("%s is ready to begin publishing the time.\n", argv[0]); | |
+ printf("Press ENTER to publish the current time.\n"); | |
+ printf("Press CTRL-D (or any other key) to exit.\n\n"); | |
+ while(fgetc(stdin) == '\n') { | |
+ /* get the current time */ | |
+ time_t timer; | |
+ time(&timer); | |
+ struct tm* tm_info = localtime(&timer); | |
+ char timebuf[26]; | |
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info); | |
+ | |
+ /* print a message */ | |
+ char application_message[256]; | |
+ snprintf(application_message, sizeof(application_message), "The time is %s", timebuf); | |
+ printf("%s published : \"%s\"", argv[0], application_message); | |
+ | |
+ /* publish the time */ | |
+ mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2); | |
+ | |
+ /* check for errors */ | |
+ if (client.error != MQTT_OK) { | |
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); | |
+ exit_example(EXIT_FAILURE, sockfd, &client_daemon); | |
+ } | |
+ } | |
+ | |
+ /* disconnect */ | |
+ printf("\n%s disconnecting from %s\n", argv[0], addr); | |
+ sleep(1); | |
+ | |
+ /* exit */ | |
+ exit_example(EXIT_SUCCESS, sockfd, &client_daemon); | |
+} | |
+ | |
+void exit_example(int status, BIO* sockfd, pthread_t *client_daemon) | |
+{ | |
+ if (sockfd != NULL) BIO_free_all(sockfd); | |
+ if (client_daemon != NULL) pthread_cancel(*client_daemon); | |
+ exit(status); | |
+} | |
+ | |
+ | |
+ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published) | |
+{ | |
+ /* not used in this example */ | |
+} | |
+ | |
+void* client_refresher(void* client) | |
+{ | |
+ while(1) | |
+ { | |
+ mqtt_sync((struct mqtt_client*) client); | |
+ usleep(100000U); | |
+ } | |
+ return NULL; | |
+} | |
\ No newline at end of file | |
diff --git a/system/mqttc/openssl_publisher.c b/system/mqttc/openssl_publisher.c | |
new file mode 100644 | |
index 00000000..a09fbbe8 | |
--- /dev/null | |
+++ b/system/mqttc/openssl_publisher.c | |
@@ -0,0 +1,167 @@ | |
+ | |
+/** | |
+ * @file | |
+ */ | |
+#include <unistd.h> | |
+#include <stdlib.h> | |
+#include <stdio.h> | |
+ | |
+#include <mqtt.h> | |
+#include "templates/openssl_sockets.h" | |
+ | |
+ | |
+/** | |
+ * @brief The function that would be called whenever a PUBLISH is received. | |
+ * | |
+ * @note This function is not used in this example. | |
+ */ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published); | |
+ | |
+/** | |
+ * @brief The client's refresher. This function triggers back-end routines to | |
+ * handle ingress/egress traffic to the broker. | |
+ * | |
+ * @note All this function needs to do is call \ref __mqtt_recv and | |
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that | |
+ * client ingress/egress traffic will be handled every 100 ms. | |
+ */ | |
+void* client_refresher(void* client); | |
+ | |
+/** | |
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. | |
+ */ | |
+void exit_example(int status, BIO* sockfd, pthread_t *client_daemon); | |
+ | |
+/** | |
+ * A simple program to that publishes the current time whenever ENTER is pressed. | |
+ */ | |
+int main(int argc, const char *argv[]) | |
+{ | |
+ const char* addr; | |
+ const char* port; | |
+ const char* topic; | |
+ const char* ca_file; | |
+ | |
+ /* Load OpenSSL */ | |
+ SSL_load_error_strings(); | |
+ ERR_load_BIO_strings(); | |
+ OpenSSL_add_all_algorithms(); | |
+ SSL_library_init(); | |
+ | |
+ SSL_CTX* ssl_ctx; | |
+ BIO* sockfd; | |
+ | |
+ if (argc > 1) { | |
+ ca_file = argv[1]; | |
+ } else { | |
+ printf("error: path to the CA certificate to use\n"); | |
+ exit(1); | |
+ } | |
+ | |
+ /* get address (argv[2] if present) */ | |
+ if (argc > 2) { | |
+ addr = argv[2]; | |
+ } else { | |
+ addr = "test.mosquitto.org"; | |
+ } | |
+ | |
+ /* get port number (argv[3] if present) */ | |
+ if (argc > 3) { | |
+ port = argv[3]; | |
+ } else { | |
+ port = "8883"; | |
+ } | |
+ | |
+ /* get the topic name to publish */ | |
+ if (argc > 4) { | |
+ topic = argv[4]; | |
+ } else { | |
+ topic = "datetime"; | |
+ } | |
+ | |
+ /* open the non-blocking TCP socket (connecting to the broker) */ | |
+ open_nb_socket(&sockfd, &ssl_ctx, addr, port, ca_file, NULL); | |
+ | |
+ if (sockfd == NULL) { | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ } | |
+ | |
+ /* setup a client */ | |
+ struct mqtt_client client; | |
+ uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */ | |
+ uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */ | |
+ mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback); | |
+ mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400); | |
+ | |
+ /* check that we don't have any errors */ | |
+ if (client.error != MQTT_OK) { | |
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ } | |
+ | |
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */ | |
+ pthread_t client_daemon; | |
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) { | |
+ fprintf(stderr, "Failed to start client daemon.\n"); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ | |
+ } | |
+ | |
+ /* start publishing the time */ | |
+ printf("%s is ready to begin publishing the time.\n", argv[0]); | |
+ printf("Press ENTER to publish the current time.\n"); | |
+ printf("Press CTRL-D (or any other key) to exit.\n\n"); | |
+ while(fgetc(stdin) == '\n') { | |
+ /* get the current time */ | |
+ time_t timer; | |
+ time(&timer); | |
+ struct tm* tm_info = localtime(&timer); | |
+ char timebuf[26]; | |
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info); | |
+ | |
+ /* print a message */ | |
+ char application_message[256]; | |
+ snprintf(application_message, sizeof(application_message), "The time is %s", timebuf); | |
+ printf("%s published : \"%s\"", argv[0], application_message); | |
+ | |
+ /* publish the time */ | |
+ mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2); | |
+ | |
+ /* check for errors */ | |
+ if (client.error != MQTT_OK) { | |
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); | |
+ exit_example(EXIT_FAILURE, sockfd, &client_daemon); | |
+ } | |
+ } | |
+ | |
+ /* disconnect */ | |
+ printf("\n%s disconnecting from %s\n", argv[0], addr); | |
+ sleep(1); | |
+ | |
+ /* exit */ | |
+ exit_example(EXIT_SUCCESS, sockfd, &client_daemon); | |
+} | |
+ | |
+void exit_example(int status, BIO* sockfd, pthread_t *client_daemon) | |
+{ | |
+ if (sockfd != NULL) BIO_free_all(sockfd); | |
+ if (client_daemon != NULL) pthread_cancel(*client_daemon); | |
+ exit(status); | |
+} | |
+ | |
+ | |
+ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published) | |
+{ | |
+ /* not used in this example */ | |
+} | |
+ | |
+void* client_refresher(void* client) | |
+{ | |
+ while(1) | |
+ { | |
+ mqtt_sync((struct mqtt_client*) client); | |
+ usleep(100000U); | |
+ } | |
+ return NULL; | |
+} | |
\ No newline at end of file | |
diff --git a/system/mqttc/ping.c b/system/mqttc/ping.c | |
new file mode 100644 | |
index 00000000..b99c5af4 | |
--- /dev/null | |
+++ b/system/mqttc/ping.c | |
@@ -0,0 +1,573 @@ | |
+/**************************************************************************** | |
+ * apps/system/ping/ping.c | |
+ * | |
+ * Copyright (C) 2017-2018 Gregory Nutt. All rights reserved. | |
+ * Author: Gregory Nutt <gnutt@nuttx.org> | |
+ * | |
+ * Redistribution and use in source and binary forms, with or without | |
+ * modification, are permitted provided that the following conditions | |
+ * are met: | |
+ * | |
+ * 1. Redistributions of source code must retain the above copyright | |
+ * notice, this list of conditions and the following disclaimer. | |
+ * 2. Redistributions in binary form must reproduce the above copyright | |
+ * notice, this list of conditions and the following disclaimer in | |
+ * the documentation and/or other materials provided with the | |
+ * distribution. | |
+ * 3. Neither the name NuttX nor the names of its contributors may be | |
+ * used to endorse or promote products derived from this software | |
+ * without specific prior written permission. | |
+ * | |
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | |
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | |
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS | |
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE | |
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, | |
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, | |
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS | |
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED | |
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN | |
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
+ * POSSIBILITY OF SUCH DAMAGE. | |
+ * | |
+ ****************************************************************************/ | |
+ | |
+/**************************************************************************** | |
+ * Included Files | |
+ ****************************************************************************/ | |
+ | |
+#include <nuttx/config.h> | |
+ | |
+#include <sys/socket.h> | |
+#include <sys/socket.h> | |
+ | |
+#include <unistd.h> | |
+#include <stdlib.h> | |
+#include <stdio.h> | |
+#include <time.h> | |
+#include <poll.h> | |
+#include <string.h> | |
+#include <errno.h> | |
+ | |
+#if defined(CONFIG_LIBC_NETDB) && defined(CONFIG_NETDB_DNSCLIENT) | |
+# include <netdb.h> | |
+#endif | |
+ | |
+#include <netinet/in.h> | |
+#include <arpa/inet.h> | |
+ | |
+#include <nuttx/net/icmp.h> | |
+ | |
+/**************************************************************************** | |
+ * Pre-processor Definitions | |
+ ****************************************************************************/ | |
+ | |
+#define ICMP_PING_DATALEN 56 | |
+#define ICMP_IOBUFFER_SIZE sizeof(struct icmp_hdr_s) + ICMP_PING_DATALEN | |
+ | |
+#define ICMP_NPINGS 10 /* Default number of pings */ | |
+#define ICMP_POLL_DELAY 1000 /* 1 second in milliseconds */ | |
+ | |
+/**************************************************************************** | |
+ * Private Types | |
+ ****************************************************************************/ | |
+ | |
+struct ping_info_s | |
+{ | |
+ int sockfd; /* Open IPPROTO_ICMP socket */ | |
+ FAR struct in_addr dest; /* Target address to ping */ | |
+ uint16_t count; /* Number of pings requested */ | |
+ uint16_t nrequests; /* Number of ICMP ECHO requests sent */ | |
+ uint16_t nreplies; /* Number of matching ICMP ECHO replies received */ | |
+ int16_t delay; /* Deciseconds to delay between pings */ | |
+ | |
+ /* I/O buffer for data transfers */ | |
+ | |
+ uint8_t iobuffer[ICMP_IOBUFFER_SIZE]; | |
+}; | |
+ | |
+/**************************************************************************** | |
+ * Private Data | |
+ ****************************************************************************/ | |
+ | |
+/* NOTE: This will not work in the kernel build where there will be a | |
+ * separate instance of g_pingid in every process space. | |
+ */ | |
+ | |
+static uint16_t g_pingid = 0; | |
+ | |
+/**************************************************************************** | |
+ * Private Functions | |
+ ****************************************************************************/ | |
+ | |
+/**************************************************************************** | |
+ * Name: ping_newid | |
+ ****************************************************************************/ | |
+ | |
+static inline uint16_t ping_newid(void) | |
+{ | |
+ /* Revisit: No thread safe */ | |
+ | |
+ return ++g_pingid; | |
+} | |
+ | |
+/**************************************************************************** | |
+ * Name: ping_gethostip | |
+ * | |
+ * Description: | |
+ * Call gethostbyname() to get the IP address associated with a hostname. | |
+ * | |
+ * Input Parameters | |
+ * hostname - The host name to use in the nslookup. | |
+ * ipv4addr - The location to return the IPv4 address. | |
+ * | |
+ * Returned Value: | |
+ * Zero (OK) on success; a negated errno value on failure. | |
+ * | |
+ ****************************************************************************/ | |
+ | |
+static int ping_gethostip(FAR char *hostname, FAR struct ping_info_s *info) | |
+{ | |
+#if defined(CONFIG_LIBC_NETDB) && defined(CONFIG_NETDB_DNSCLIENT) | |
+ /* Netdb DNS client support is enabled */ | |
+ | |
+ FAR struct hostent *he; | |
+ | |
+ he = gethostbyname(hostname); | |
+ if (he == NULL) | |
+ { | |
+ nerr("ERROR: gethostbyname failed: %d\n", h_errno); | |
+ return -ENOENT; | |
+ } | |
+ else if (he->h_addrtype == AF_INET) | |
+ { | |
+ memcpy(&info->dest, he->h_addr, sizeof(in_addr_t)); | |
+ } | |
+ else | |
+ { | |
+ nerr("ERROR: gethostbyname returned an address of type: %d\n", | |
+ he->h_addrtype); | |
+ return -ENOEXEC; | |
+ } | |
+ | |
+ return OK; | |
+ | |
+#else /* CONFIG_LIBC_NETDB */ | |
+ | |
+ /* No host name support */ | |
+ /* Convert strings to numeric IPv6 address */ | |
+ | |
+ int ret = inet_pton(AF_INET, hostname, &info->dest); | |
+ | |
+ /* The inet_pton() function returns 1 if the conversion succeeds. It will | |
+ * return 0 if the input is not a valid IPv4 dotted-decimal string or -1 | |
+ * with errno set to EAFNOSUPPORT if the address family argument is | |
+ * unsupported. | |
+ */ | |
+ | |
+ return (ret > 0) ? OK : ERROR; | |
+ | |
+#endif /* CONFIG_LIBC_NETDB */ | |
+} | |
+ | |
+/**************************************************************************** | |
+ * Name: icmp_ping | |
+ ****************************************************************************/ | |
+ | |
+static void icmp_ping(FAR struct ping_info_s *info) | |
+{ | |
+ struct sockaddr_in destaddr; | |
+ struct sockaddr_in fromaddr; | |
+ struct icmp_hdr_s outhdr; | |
+ FAR struct icmp_hdr_s *inhdr; | |
+ struct pollfd recvfd; | |
+ FAR uint8_t *ptr; | |
+ int32_t elapsed; | |
+ clock_t start; | |
+ socklen_t addrlen; | |
+ ssize_t nsent; | |
+ ssize_t nrecvd; | |
+ size_t outsize; | |
+ bool retry; | |
+ int delay; | |
+ int ret; | |
+ int ch; | |
+ int i; | |
+ | |
+ memset(&destaddr, 0, sizeof(struct sockaddr_in)); | |
+ destaddr.sin_family = AF_INET; | |
+ destaddr.sin_port = 0; | |
+ destaddr.sin_addr.s_addr = info->dest.s_addr; | |
+ | |
+ memset(&outhdr, 0, sizeof(struct icmp_hdr_s)); | |
+ outhdr.type = ICMP_ECHO_REQUEST; | |
+ outhdr.id = ping_newid(); | |
+ outhdr.seqno = 0; | |
+ | |
+ printf("PING %u.%u.%u.%u %d bytes of data\n", | |
+ (info->dest.s_addr ) & 0xff, | |
+ (info->dest.s_addr >> 8 ) & 0xff, | |
+ (info->dest.s_addr >> 16) & 0xff, | |
+ (info->dest.s_addr >> 24) & 0xff, | |
+ ICMP_PING_DATALEN); | |
+ | |
+ while (info->nrequests < info->count) | |
+ { | |
+ /* Copy the ICMP header into the I/O buffer */ | |
+ | |
+ memcpy(info->iobuffer, &outhdr, sizeof(struct icmp_hdr_s)); | |
+ | |
+ /* Add some easily verifiable payload data */ | |
+ | |
+ ptr = &info->iobuffer[sizeof(struct icmp_hdr_s)]; | |
+ ch = 0x20; | |
+ | |
+ for (i = 0; i < ICMP_PING_DATALEN; i++) | |
+ { | |
+ *ptr++ = ch; | |
+ if (++ch > 0x7e) | |
+ { | |
+ ch = 0x20; | |
+ } | |
+ } | |
+ | |
+ start = clock(); | |
+ outsize = sizeof(struct icmp_hdr_s) + ICMP_PING_DATALEN; | |
+ nsent = sendto(info->sockfd, info->iobuffer, outsize, 0, | |
+ (FAR struct sockaddr*)&destaddr, | |
+ sizeof(struct sockaddr_in)); | |
+ if (nsent < 0) | |
+ { | |
+ fprintf(stderr, "ERROR: sendto failed at seqno %u: %d\n", | |
+ outhdr.seqno, errno); | |
+ return; | |
+ } | |
+ else if (nsent != outsize) | |
+ { | |
+ fprintf(stderr, "ERROR: sendto returned %ld, expected %lu\n", | |
+ (long)nsent, (unsigned long)outsize); | |
+ return; | |
+ } | |
+ | |
+ info->nrequests++; | |
+ | |
+ delay = info->delay; | |
+ do | |
+ { | |
+ /* Wait for a reply with a timeout */ | |
+ | |
+ retry = false; | |
+ | |
+ recvfd.fd = info->sockfd; | |
+ recvfd.events = POLLIN; | |
+ recvfd.revents = 0; | |
+ | |
+ ret = poll(&recvfd, 1, delay); | |
+ if (ret < 0) | |
+ { | |
+ fprintf(stderr, "ERROR: poll failed: %d\n", errno); | |
+ return; | |
+ } | |
+ else if (ret == 0) | |
+ { | |
+ printf("No response from %u.%u.%u.%u: icmp_seq=%u time=%u ms\n", | |
+ (info->dest.s_addr ) & 0xff, | |
+ (info->dest.s_addr >> 8 ) & 0xff, | |
+ (info->dest.s_addr >> 16) & 0xff, | |
+ (info->dest.s_addr >> 24) & 0xff, | |
+ outhdr.seqno, info->delay); | |
+ continue; | |
+ } | |
+ | |
+ /* Get the ICMP response (ignoring the sender) */ | |
+ | |
+ addrlen = sizeof(struct sockaddr_in); | |
+ nrecvd = recvfrom(info->sockfd, info->iobuffer, | |
+ ICMP_IOBUFFER_SIZE, 0, | |
+ (FAR struct sockaddr *)&fromaddr, &addrlen); | |
+ if (nrecvd < 0) | |
+ { | |
+ fprintf(stderr, "ERROR: recvfrom failed: %d\n", errno); | |
+ return; | |
+ } | |
+ else if (nrecvd < sizeof(struct icmp_hdr_s)) | |
+ { | |
+ fprintf(stderr, "ERROR: short ICMP packet: %ld\n", (long)nrecvd); | |
+ return; | |
+ } | |
+ | |
+ elapsed = (unsigned int)TICK2MSEC(clock() - start); | |
+ inhdr = (FAR struct icmp_hdr_s *)info->iobuffer; | |
+ | |
+ if (inhdr->type == ICMP_ECHO_REPLY) | |
+ { | |
+ if (inhdr->id != outhdr.id) | |
+ { | |
+ fprintf(stderr, | |
+ "WARNING: Ignoring ICMP reply with ID %u. " | |
+ "Expected %u\n", | |
+ inhdr->id, outhdr.id); | |
+ retry = true; | |
+ } | |
+ else if (inhdr->seqno > outhdr.seqno) | |
+ { | |
+ fprintf(stderr, | |
+ "WARNING: Ignoring ICMP reply to sequence %u. " | |
+ "Expected <= &u\n", | |
+ inhdr->seqno, outhdr.seqno); | |
+ retry = true; | |
+ } | |
+ else | |
+ { | |
+ bool verified = true; | |
+ int32_t pktdelay = elapsed; | |
+ | |
+ if (inhdr->seqno < outhdr.seqno) | |
+ { | |
+ fprintf(stderr, "WARNING: Received after timeout\n"); | |
+ pktdelay += info->delay; | |
+ retry = true; | |
+ } | |
+ | |
+ printf("%ld bytes from %u.%u.%u.%u: icmp_seq=%u time=%u ms\n", | |
+ nrecvd - sizeof(struct icmp_hdr_s), | |
+ (info->dest.s_addr ) & 0xff, | |
+ (info->dest.s_addr >> 8 ) & 0xff, | |
+ (info->dest.s_addr >> 16) & 0xff, | |
+ (info->dest.s_addr >> 24) & 0xff, | |
+ inhdr->seqno, pktdelay); | |
+ | |
+ /* Verify the payload data */ | |
+ | |
+ if (nrecvd != outsize) | |
+ { | |
+ fprintf(stderr, | |
+ "WARNING: Ignoring ICMP reply with different payload " | |
+ "size: %ld vs %lu\n", | |
+ (long)nrecvd, (unsigned long)outsize); | |
+ verified = false; | |
+ } | |
+ else | |
+ { | |
+ ptr = &info->iobuffer[sizeof(struct icmp_hdr_s)]; | |
+ ch = 0x20; | |
+ | |
+ for (i = 0; i < ICMP_PING_DATALEN; i++, ptr++) | |
+ { | |
+ if (*ptr != ch) | |
+ { | |
+ fprintf(stderr, "WARNING: Echoed data corrupted\n"); | |
+ verified = false; | |
+ break; | |
+ } | |
+ | |
+ if (++ch > 0x7e) | |
+ { | |
+ ch = 0x20; | |
+ } | |
+ } | |
+ } | |
+ | |
+ /* Only count the number of good replies */ | |
+ | |
+ if (verified) | |
+ { | |
+ info->nreplies++; | |
+ } | |
+ } | |
+ } | |
+ else | |
+ { | |
+ fprintf(stderr, "WARNING: ICMP packet with unknown type: %u\n", | |
+ inhdr->type); | |
+ } | |
+ | |
+ delay -= elapsed; | |
+ } | |
+ while (retry && delay > 0); | |
+ | |
+ /* Wait if necessary to preserved the requested ping rate */ | |
+ | |
+ elapsed = (unsigned int)TICK2MSEC(clock() - start); | |
+ if (elapsed < info->delay) | |
+ { | |
+ struct timespec rqt; | |
+ unsigned int remaining; | |
+ unsigned int sec; | |
+ unsigned int frac; /* In deciseconds */ | |
+ | |
+ remaining = info->delay - elapsed; | |
+ sec = remaining / MSEC_PER_SEC; | |
+ frac = remaining - MSEC_PER_SEC * sec; | |
+ | |
+ rqt.tv_sec = sec; | |
+ rqt.tv_nsec = frac * NSEC_PER_MSEC; | |
+ | |
+ (void)nanosleep(&rqt, NULL); | |
+ } | |
+ | |
+ outhdr.seqno++; | |
+ } | |
+} | |
+ | |
+/**************************************************************************** | |
+ * Name: show_usage | |
+ ****************************************************************************/ | |
+ | |
+static void show_usage(FAR const char *progname, int exitcode) noreturn_function; | |
+static void show_usage(FAR const char *progname, int exitcode) | |
+{ | |
+#if defined(CONFIG_LIBC_NETDB) && defined(CONFIG_NETDB_DNSCLIENT) | |
+ printf("\nUsage: %s [-c <count>] [-i <interval>] <hostname>\n", progname); | |
+ printf(" %s -h\n", progname); | |
+ printf("\nWhere:\n"); | |
+ printf(" <hostname> is either an IPv6 address or the name of the remote host\n"); | |
+ printf(" that is requested the ICMPv6 ECHO reply.\n"); | |
+#else | |
+ printf("\nUsage: %s [-c <count>] [-i <interval>] <ip-address>\n", progname); | |
+ printf(" %s -h\n", progname); | |
+ printf("\nWhere:\n"); | |
+ printf(" <ip-address> is the IPv4 address request the ICMP ECHO reply.\n"); | |
+#endif | |
+ printf(" -c <count> determines the number of pings. Default %u.\n", | |
+ ICMP_NPINGS); | |
+ printf(" -i <interval> is the default delay between pings (milliseconds).\n"); | |
+ printf(" Default %d.\n", ICMP_POLL_DELAY); | |
+ printf(" -h shows this text and exits.\n"); | |
+ exit(exitcode); | |
+} | |
+ | |
+/**************************************************************************** | |
+ * Public Functions | |
+ ****************************************************************************/ | |
+ | |
+#ifdef CONFIG_BUILD_KERNEL | |
+int main(int argc, FAR char *argv[]) | |
+#else | |
+int ping_main(int argc, char **argv) | |
+#endif | |
+{ | |
+ FAR struct ping_info_s *info; | |
+ FAR char *endptr; | |
+ clock_t start; | |
+ int32_t elapsed; | |
+ int exitcode; | |
+ int option; | |
+ | |
+ /* Allocate memory to hold ping information */ | |
+ | |
+ info = (FAR struct ping_info_s *)zalloc(sizeof(struct ping_info_s)); | |
+ if (info == NULL) | |
+ { | |
+ fprintf(stderr, "ERROR: Failed to allocate memory\n", argv[1]); | |
+ return EXIT_FAILURE; | |
+ } | |
+ | |
+ info->count = ICMP_NPINGS; | |
+ info->delay = ICMP_POLL_DELAY; | |
+ | |
+ /* Parse command line options */ | |
+ | |
+ exitcode = EXIT_FAILURE; | |
+ | |
+ while ((option = getopt(argc, argv, ":c:i:h")) != ERROR) | |
+ { | |
+ switch (option) | |
+ { | |
+ case 'c': | |
+ { | |
+ long count = strtol(optarg, &endptr, 10); | |
+ if (count < 1 || count > UINT16_MAX) | |
+ { | |
+ fprintf(stderr, "ERROR: <count> out of range: %ld\n", count); | |
+ goto errout_with_usage; | |
+ } | |
+ | |
+ info->count = (uint16_t)count; | |
+ } | |
+ break; | |
+ | |
+ case 'i': | |
+ { | |
+ long delay = strtol(optarg, &endptr, 10); | |
+ if (delay < 1 || delay > INT16_MAX) | |
+ { | |
+ fprintf(stderr, "ERROR: <interval> out of range: %ld\n", delay); | |
+ goto errout_with_usage; | |
+ } | |
+ | |
+ info->delay = (int16_t)delay; | |
+ } | |
+ break; | |
+ | |
+ case 'h': | |
+ exitcode = EXIT_SUCCESS; | |
+ goto errout_with_usage; | |
+ | |
+ case ':': | |
+ fprintf(stderr, "ERROR: Missing required argument\n"); | |
+ goto errout_with_usage; | |
+ | |
+ case '?': | |
+ default: | |
+ fprintf(stderr, "ERROR: Unrecognized option\n"); | |
+ goto errout_with_usage; | |
+ } | |
+ } | |
+ | |
+ /* There should be one final parameters remaining on the command line */ | |
+ | |
+ if (optind >= argc) | |
+ { | |
+ printf("ERROR: Missing required <ip-address> argument\n"); | |
+ free(info); | |
+ show_usage(argv[0], EXIT_FAILURE); | |
+ } | |
+ | |
+ if (ping_gethostip(argv[optind], info) < 0) | |
+ { | |
+ fprintf(stderr, "ERROR: ping_gethostip(%s) failed\n", argv[optind]); | |
+ goto errout_with_info; | |
+ } | |
+ | |
+ info->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP); | |
+ if (info->sockfd < 0) | |
+ { | |
+ fprintf(stderr, "ERROR: socket() failed: %d\n", errno); | |
+ goto errout_with_info; | |
+ } | |
+ | |
+ start = clock(); | |
+ icmp_ping(info); | |
+ | |
+ /* Get the total elapsed time */ | |
+ | |
+ elapsed = (int32_t)TICK2MSEC(clock() - start); | |
+ | |
+ if (info->nrequests > 0) | |
+ { | |
+ unsigned int tmp; | |
+ | |
+ /* Calculate the percentage of lost packets */ | |
+ | |
+ tmp = (100 * (info->nrequests - info->nreplies) + (info->nrequests >> 1)) / | |
+ info->nrequests; | |
+ | |
+ printf("%u packets transmitted, %u received, %u%% packet loss, time %ld ms\n", | |
+ info->nrequests, info->nreplies, tmp, (long)elapsed); | |
+ } | |
+ | |
+ close(info->sockfd); | |
+ free(info); | |
+ return EXIT_SUCCESS; | |
+ | |
+errout_with_usage: | |
+ free(info); | |
+ show_usage(argv[0], exitcode); | |
+ return exitcode; /* Not reachable */ | |
+ | |
+errout_with_info: | |
+ free(info); | |
+ return EXIT_FAILURE; | |
+} | |
diff --git a/system/mqttc/reconnect_subscriber.c b/system/mqttc/reconnect_subscriber.c | |
new file mode 100644 | |
index 00000000..04748abe | |
--- /dev/null | |
+++ b/system/mqttc/reconnect_subscriber.c | |
@@ -0,0 +1,195 @@ | |
+ | |
+/** | |
+ * @file | |
+ * A simple subscriber program that performs automatic reconnections. | |
+ */ | |
+#include <unistd.h> | |
+#include <stdlib.h> | |
+#include <stdio.h> | |
+ | |
+#include <mqtt.h> | |
+#include "templates/posix_sockets.h" | |
+ | |
+/** | |
+ * @brief A structure that I will use to keep track of some data needed | |
+ * to setup the connection to the broker. | |
+ * | |
+ * An instance of this struct will be created in my \c main(). Then, whenever | |
+ * \ref reconnect_client is called, this instance will be passed. | |
+ */ | |
+struct reconnect_state_t { | |
+ const char* hostname; | |
+ const char* port; | |
+ const char* topic; | |
+ uint8_t* sendbuf; | |
+ size_t sendbufsz; | |
+ uint8_t* recvbuf; | |
+ size_t recvbufsz; | |
+}; | |
+ | |
+ | |
+/** | |
+ * @brief My reconnect callback. It will reestablish the connection whenever | |
+ * an error occurs. | |
+ */ | |
+void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr); | |
+ | |
+/** | |
+ * @brief The function will be called whenever a PUBLISH message is received. | |
+ */ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published); | |
+ | |
+/** | |
+ * @brief The client's refresher. This function triggers back-end routines to | |
+ * handle ingress/egress traffic to the broker. | |
+ * | |
+ * @note All this function needs to do is call \ref __mqtt_recv and | |
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that | |
+ * client ingress/egress traffic will be handled every 100 ms. | |
+ */ | |
+void* client_refresher(void* client); | |
+ | |
+/** | |
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. | |
+ */ | |
+void exit_example(int status, int sockfd, pthread_t *client_daemon); | |
+ | |
+ | |
+int main(int argc, const char *argv[]) | |
+{ | |
+ const char* addr; | |
+ const char* port; | |
+ const char* topic; | |
+ | |
+ /* get address (argv[1] if present) */ | |
+ if (argc > 1) { | |
+ addr = argv[1]; | |
+ } else { | |
+ addr = "test.mosquitto.org"; | |
+ } | |
+ | |
+ /* get port number (argv[2] if present) */ | |
+ if (argc > 2) { | |
+ port = argv[2]; | |
+ } else { | |
+ port = "1883"; | |
+ } | |
+ | |
+ /* get the topic name to publish */ | |
+ if (argc > 3) { | |
+ topic = argv[3]; | |
+ } else { | |
+ topic = "datetime"; | |
+ } | |
+ | |
+ /* build the reconnect_state structure which will be passed to reconnect */ | |
+ struct reconnect_state_t reconnect_state; | |
+ reconnect_state.hostname = addr; | |
+ reconnect_state.port = port; | |
+ reconnect_state.topic = topic; | |
+ uint8_t sendbuf[2048]; | |
+ uint8_t recvbuf[1024]; | |
+ reconnect_state.sendbuf = sendbuf; | |
+ reconnect_state.sendbufsz = sizeof(sendbuf); | |
+ reconnect_state.recvbuf = recvbuf; | |
+ reconnect_state.recvbufsz = sizeof(recvbuf); | |
+ | |
+ /* setup a client */ | |
+ struct mqtt_client client; | |
+ | |
+ mqtt_init_reconnect(&client, | |
+ reconnect_client, &reconnect_state, | |
+ publish_callback | |
+ ); | |
+ | |
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */ | |
+ pthread_t client_daemon; | |
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) { | |
+ fprintf(stderr, "Failed to start client daemon.\n"); | |
+ exit_example(EXIT_FAILURE, -1, NULL); | |
+ | |
+ } | |
+ | |
+ /* start publishing the time */ | |
+ printf("%s listening for '%s' messages.\n", argv[0], topic); | |
+ printf("Press ENTER to inject an error.\n"); | |
+ printf("Press CTRL-D to exit.\n\n"); | |
+ | |
+ /* block */ | |
+ while(fgetc(stdin) != EOF) { | |
+ printf("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\"\n"); | |
+ client.error = MQTT_ERROR_SOCKET_ERROR; | |
+ } | |
+ | |
+ /* disconnect */ | |
+ printf("\n%s disconnecting from %s\n", argv[0], addr); | |
+ sleep(1); | |
+ | |
+ /* exit */ | |
+ exit_example(EXIT_SUCCESS, client.socketfd, &client_daemon); | |
+} | |
+ | |
+void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr) | |
+{ | |
+ struct reconnect_state_t *reconnect_state = *((struct reconnect_state_t**) reconnect_state_vptr); | |
+ | |
+ /* Close the clients socket if this isn't the initial reconnect call */ | |
+ if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { | |
+ close(client->socketfd); | |
+ } | |
+ | |
+ /* Perform error handling here. */ | |
+ if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { | |
+ printf("reconnect_client: called while client was in error state \"%s\"\n", | |
+ mqtt_error_str(client->error) | |
+ ); | |
+ } | |
+ | |
+ /* Open a new socket. */ | |
+ int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port); | |
+ if (sockfd == -1) { | |
+ perror("Failed to open socket: "); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ } | |
+ | |
+ /* Reinitialize the client. */ | |
+ mqtt_reinit(client, sockfd, | |
+ reconnect_state->sendbuf, reconnect_state->sendbufsz, | |
+ reconnect_state->recvbuf, reconnect_state->recvbufsz | |
+ ); | |
+ | |
+ /* Send connection request to the broker. */ | |
+ mqtt_connect(client, "subscribing_client", NULL, NULL, 0, NULL, NULL, 0, 400); | |
+ | |
+ /* Subscribe to the topic. */ | |
+ mqtt_subscribe(client, reconnect_state->topic, 0); | |
+} | |
+ | |
+void exit_example(int status, int sockfd, pthread_t *client_daemon) | |
+{ | |
+ if (sockfd != -1) close(sockfd); | |
+ if (client_daemon != NULL) pthread_cancel(*client_daemon); | |
+ exit(status); | |
+} | |
+ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published) | |
+{ | |
+ /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */ | |
+ char* topic_name = (char*) malloc(published->topic_name_size + 1); | |
+ memcpy(topic_name, published->topic_name, published->topic_name_size); | |
+ topic_name[published->topic_name_size] = '\0'; | |
+ | |
+ printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message); | |
+ | |
+ free(topic_name); | |
+} | |
+ | |
+void* client_refresher(void* client) | |
+{ | |
+ while(1) | |
+ { | |
+ mqtt_sync((struct mqtt_client*) client); | |
+ usleep(100000U); | |
+ } | |
+ return NULL; | |
+} | |
\ No newline at end of file | |
diff --git a/system/mqttc/simple_publisher.c b/system/mqttc/simple_publisher.c | |
new file mode 100644 | |
index 00000000..1368cfe3 | |
--- /dev/null | |
+++ b/system/mqttc/simple_publisher.c | |
@@ -0,0 +1,163 @@ | |
+ | |
+/** | |
+ * @file | |
+ * A simple program to that publishes the current time whenever ENTER is pressed. | |
+ */ | |
+#include <unistd.h> | |
+#include <stdlib.h> | |
+#include <stdio.h> | |
+ | |
+#include "netutils/mqtt.h" | |
+#include "templates/posix_sockets.h" | |
+ | |
+ | |
+/** | |
+ * @brief The function that would be called whenever a PUBLISH is received. | |
+ * | |
+ * @note This function is not used in this example. | |
+ */ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published); | |
+ | |
+/** | |
+ * @brief The client's refresher. This function triggers back-end routines to | |
+ * handle ingress/egress traffic to the broker. | |
+ * | |
+ * @note All this function needs to do is call \ref __mqtt_recv and | |
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that | |
+ * client ingress/egress traffic will be handled every 100 ms. | |
+ */ | |
+void* client_refresher(void* client); | |
+ | |
+/** | |
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. | |
+ */ | |
+void exit_example(int status, int sockfd, pthread_t *client_daemon); | |
+ | |
+/** | |
+ * A simple program to that publishes the current time whenever ENTER is pressed. | |
+ */ | |
+#ifdef CONFIG_BUILD_KERNEL | |
+int main(int argc, FAR char *argv[]) | |
+#else | |
+int mqttc_main(int argc, char **argv) | |
+#endif | |
+{ | |
+ const char* addr; | |
+ const char* port; | |
+ const char* topic; | |
+ char application_message[256]; | |
+ char stat[256]; | |
+ int i; | |
+ | |
+ /* get address (argv[1] if present) */ | |
+ if (argc > 1) { | |
+ addr = argv[1]; | |
+ } else { | |
+ addr = "test.mosquitto.org"; | |
+ } | |
+ | |
+ /* get port number (argv[2] if present) */ | |
+ if (argc > 2) { | |
+ port = argv[2]; | |
+ } else { | |
+ port = "1883"; | |
+ } | |
+ | |
+ /* get the topic name to publish */ | |
+ if (argc > 3) { | |
+ topic = argv[3]; | |
+ } else { | |
+ topic = "datetime"; | |
+ } | |
+ | |
+ /* open the non-blocking TCP socket (connecting to the broker) */ | |
+ int sockfd = open_nb_socket(addr, port); | |
+ | |
+ if (sockfd == -1) { | |
+ perror("Failed to open socket: "); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ } | |
+ | |
+ /* setup a client */ | |
+ struct mqtt_client client; | |
+ uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */ | |
+ uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */ | |
+ mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback); | |
+ mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400); | |
+ | |
+ /* check that we don't have any errors */ | |
+ if (client.error != MQTT_OK) { | |
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ } | |
+ | |
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */ | |
+ pthread_t client_daemon; | |
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) { | |
+ fprintf(stderr, "Failed to start client daemon.\n"); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ | |
+ } | |
+ | |
+ /* start publishing the time */ | |
+ printf("%s is ready to begin publishing the time.\n", argv[0]); | |
+ printf("Press ENTER to publish the current time.\n"); | |
+ printf("Press CTRL-D (or any other key) to exit.\n\n"); | |
+ i=0; | |
+ while(fgetc(stdin) == '\r') { | |
+ | |
+ /* get the current time */ | |
+ time_t timer; | |
+ time(&timer); | |
+ struct tm* tm_info = localtime(&timer); | |
+ char timebuf[26]; | |
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info); | |
+ | |
+ /* print a message */ | |
+ | |
+ snprintf(application_message, sizeof(application_message), "The time is %s", timebuf); | |
+ printf("%s published : \"%s\"", argv[0], application_message); | |
+ | |
+ /* publish the time */ | |
+ mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_0); | |
+ | |
+ /* check for errors */ | |
+ if (client.error != MQTT_OK) { | |
+ printf("error: %s\n", mqtt_error_str(client.error)); | |
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); | |
+ exit_example(EXIT_FAILURE, sockfd, &client_daemon); | |
+ } | |
+ i+=1; | |
+ } | |
+ | |
+ /* disconnect */ | |
+ printf("\n%s disconnecting from %s\n", argv[0], addr); | |
+ sleep(1); | |
+ | |
+ /* exit */ | |
+ exit_example(EXIT_SUCCESS, sockfd, &client_daemon); | |
+} | |
+ | |
+void exit_example(int status, int sockfd, pthread_t *client_daemon) | |
+{ | |
+ if (sockfd != -1) close(sockfd); | |
+ if (client_daemon != NULL) pthread_cancel(*client_daemon); | |
+ exit(status); | |
+} | |
+ | |
+ | |
+ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published) | |
+{ | |
+ /* not used in this example */ | |
+} | |
+ | |
+void* client_refresher(void* client) | |
+{ | |
+ while(1) | |
+ { | |
+ mqtt_sync((struct mqtt_client*) client); | |
+ usleep(100000U); | |
+ } | |
+ return NULL; | |
+} | |
diff --git a/system/mqttc/simple_subscriber.c b/system/mqttc/simple_subscriber.c | |
new file mode 100644 | |
index 00000000..f385086a | |
--- /dev/null | |
+++ b/system/mqttc/simple_subscriber.c | |
@@ -0,0 +1,137 @@ | |
+ | |
+/** | |
+ * @file | |
+ * A simple program that subscribes to a topic. | |
+ */ | |
+#include <unistd.h> | |
+#include <stdlib.h> | |
+#include <stdio.h> | |
+ | |
+#include <mqtt.h> | |
+#include "templates/posix_sockets.h" | |
+ | |
+ | |
+/** | |
+ * @brief The function will be called whenever a PUBLISH message is received. | |
+ */ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published); | |
+ | |
+/** | |
+ * @brief The client's refresher. This function triggers back-end routines to | |
+ * handle ingress/egress traffic to the broker. | |
+ * | |
+ * @note All this function needs to do is call \ref __mqtt_recv and | |
+ * \ref __mqtt_send every so often. I've picked 100 ms meaning that | |
+ * client ingress/egress traffic will be handled every 100 ms. | |
+ */ | |
+void* client_refresher(void* client); | |
+ | |
+/** | |
+ * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. | |
+ */ | |
+void exit_example(int status, int sockfd, pthread_t *client_daemon); | |
+ | |
+int main(int argc, const char *argv[]) | |
+{ | |
+ const char* addr; | |
+ const char* port; | |
+ const char* topic; | |
+ | |
+ /* get address (argv[1] if present) */ | |
+ if (argc > 1) { | |
+ addr = argv[1]; | |
+ } else { | |
+ addr = "test.mosquitto.org"; | |
+ } | |
+ | |
+ /* get port number (argv[2] if present) */ | |
+ if (argc > 2) { | |
+ port = argv[2]; | |
+ } else { | |
+ port = "1883"; | |
+ } | |
+ | |
+ /* get the topic name to publish */ | |
+ if (argc > 3) { | |
+ topic = argv[3]; | |
+ } else { | |
+ topic = "datetime"; | |
+ } | |
+ | |
+ /* open the non-blocking TCP socket (connecting to the broker) */ | |
+ int sockfd = open_nb_socket(addr, port); | |
+ | |
+ if (sockfd == -1) { | |
+ perror("Failed to open socket: "); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ } | |
+ | |
+ /* setup a client */ | |
+ struct mqtt_client client; | |
+ uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */ | |
+ uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */ | |
+ mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback); | |
+ mqtt_connect(&client, "subscribing_client", NULL, NULL, 0, NULL, NULL, 0, 400); | |
+ | |
+ /* check that we don't have any errors */ | |
+ if (client.error != MQTT_OK) { | |
+ fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ } | |
+ | |
+ /* start a thread to refresh the client (handle egress and ingree client traffic) */ | |
+ pthread_t client_daemon; | |
+ if(pthread_create(&client_daemon, NULL, client_refresher, &client)) { | |
+ fprintf(stderr, "Failed to start client daemon.\n"); | |
+ exit_example(EXIT_FAILURE, sockfd, NULL); | |
+ | |
+ } | |
+ | |
+ /* subscribe */ | |
+ mqtt_subscribe(&client, topic, 0); | |
+ | |
+ /* start publishing the time */ | |
+ printf("%s listening for '%s' messages.\n", argv[0], topic); | |
+ printf("Press CTRL-D to exit.\n\n"); | |
+ | |
+ /* block */ | |
+ while(fgetc(stdin) != EOF); | |
+ | |
+ /* disconnect */ | |
+ printf("\n%s disconnecting from %s\n", argv[0], addr); | |
+ sleep(1); | |
+ | |
+ /* exit */ | |
+ exit_example(EXIT_SUCCESS, sockfd, &client_daemon); | |
+} | |
+ | |
+void exit_example(int status, int sockfd, pthread_t *client_daemon) | |
+{ | |
+ if (sockfd != -1) close(sockfd); | |
+ if (client_daemon != NULL) pthread_cancel(*client_daemon); | |
+ exit(status); | |
+} | |
+ | |
+ | |
+ | |
+void publish_callback(void** unused, struct mqtt_response_publish *published) | |
+{ | |
+ /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */ | |
+ char* topic_name = (char*) malloc(published->topic_name_size + 1); | |
+ memcpy(topic_name, published->topic_name, published->topic_name_size); | |
+ topic_name[published->topic_name_size] = '\0'; | |
+ | |
+ printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message); | |
+ | |
+ free(topic_name); | |
+} | |
+ | |
+void* client_refresher(void* client) | |
+{ | |
+ while(1) | |
+ { | |
+ mqtt_sync((struct mqtt_client*) client); | |
+ usleep(100000U); | |
+ } | |
+ return NULL; | |
+} | |
\ No newline at end of file | |
diff --git a/system/mqttc/templates/bio_sockets.h b/system/mqttc/templates/bio_sockets.h | |
new file mode 100644 | |
index 00000000..78acd606 | |
--- /dev/null | |
+++ b/system/mqttc/templates/bio_sockets.h | |
@@ -0,0 +1,28 @@ | |
+#ifndef __BIO_SOCKET_TEMPLATE_H__ | |
+#define __BIO_SOCKET_TEMPLATE_H__ | |
+ | |
+#include <openssl/bio.h> | |
+#include <openssl/ssl.h> | |
+#include <openssl/err.h> | |
+ | |
+/* | |
+ A template for opening a non-blocking BIO socket. | |
+*/ | |
+BIO* open_nb_socket(const char* addr, const char* port) { | |
+ BIO* bio = BIO_new_connect(addr); | |
+ BIO_set_nbio(bio, 1); | |
+ BIO_set_conn_port(bio, port); | |
+ | |
+ /* timeout after 10 seconds */ | |
+ int start_time = time(NULL); | |
+ while(BIO_do_connect(bio) == 0 && (int)time(NULL) - start_time < 10); | |
+ | |
+ if (BIO_do_connect(bio) <= 0) { | |
+ fprintf(stderr, "Failed to open socket: BIO_do_connect returned <= 0\n"); | |
+ return NULL; | |
+ } | |
+ | |
+ return bio; | |
+} | |
+ | |
+#endif | |
\ No newline at end of file | |
diff --git a/system/mqttc/templates/openssl_sockets.h b/system/mqttc/templates/openssl_sockets.h | |
new file mode 100644 | |
index 00000000..7e86857a | |
--- /dev/null | |
+++ b/system/mqttc/templates/openssl_sockets.h | |
@@ -0,0 +1,49 @@ | |
+#ifndef __OPENSSL_SOCKET_TEMPLATE_H__ | |
+#define __OPENSSL_SOCKET_TEMPLATE_H__ | |
+ | |
+#include <openssl/bio.h> | |
+#include <openssl/ssl.h> | |
+#include <openssl/err.h> | |
+ | |
+/* | |
+ A template for opening a non-blocking OpenSSL connection. | |
+*/ | |
+void open_nb_socket(BIO** bio, SSL_CTX** ssl_ctx, const char* addr, const char* port, const char* ca_file, const char* ca_path) { | |
+ *ssl_ctx = SSL_CTX_new(SSLv23_client_method()); | |
+ SSL* ssl; | |
+ | |
+ /* load certificate */ | |
+ if (!SSL_CTX_load_verify_locations(*ssl_ctx, ca_file, ca_path)) { | |
+ printf("error: failed to load certificate\n"); | |
+ exit(1); | |
+ } | |
+ | |
+ /* open BIO socket */ | |
+ *bio = BIO_new_ssl_connect(*ssl_ctx); | |
+ BIO_get_ssl(*bio, &ssl); | |
+ SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY); | |
+ BIO_set_conn_hostname(*bio, addr); | |
+ BIO_set_nbio(*bio, 1); | |
+ BIO_set_conn_port(*bio, port); | |
+ | |
+ /* wait for connect with 10 second timeout */ | |
+ int start_time = time(NULL); | |
+ while(BIO_do_connect(*bio) <= 0 && (int)time(NULL) - start_time < 10); | |
+ if (BIO_do_connect(*bio) <= 0) { | |
+ printf("error: %s\n", ERR_reason_error_string(ERR_get_error())); | |
+ BIO_free_all(*bio); | |
+ SSL_CTX_free(*ssl_ctx); | |
+ *bio = NULL; | |
+ *ssl_ctx=NULL; | |
+ return; | |
+ } | |
+ | |
+ /* verify certificate */ | |
+ if (SSL_get_verify_result(ssl) != X509_V_OK) { | |
+ /* Handle the failed verification */ | |
+ printf("error: x509 certificate verification failed\n"); | |
+ exit(1); | |
+ } | |
+} | |
+ | |
+#endif | |
\ No newline at end of file | |
diff --git a/system/mqttc/templates/posix_sockets.h b/system/mqttc/templates/posix_sockets.h | |
new file mode 100644 | |
index 00000000..dcd172ac | |
--- /dev/null | |
+++ b/system/mqttc/templates/posix_sockets.h | |
@@ -0,0 +1,50 @@ | |
+#ifndef __POSIX_SOCKET_TEMPLATE_H__ | |
+#define __POSIX_SOCKET_TEMPLATE_H__ | |
+ | |
+#include <stdio.h> | |
+#include <sys/types.h> | |
+#include <sys/socket.h> | |
+#include <netdb.h> | |
+#include <fcntl.h> | |
+ | |
+/* | |
+ A template for opening a non-blocking POSIX socket. | |
+*/ | |
+int open_nb_socket(const char* addr, const char* port) { | |
+ struct addrinfo hints = {0}; | |
+ | |
+ hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */ | |
+ hints.ai_socktype = SOCK_STREAM; /* Must be TCP */ | |
+ int sockfd = -1; | |
+ int rv; | |
+ struct addrinfo *p, *servinfo; | |
+ | |
+ /* get address information */ | |
+ rv = getaddrinfo(addr, port, &hints, &servinfo); | |
+ if(rv != 0) { | |
+ fprintf(stderr, "Failed to open socket (getaddrinfo): %s\n", gai_strerror(rv)); | |
+ return -1; | |
+ } | |
+ | |
+ /* open the first possible socket */ | |
+ for(p = servinfo; p != NULL; p = p->ai_next) { | |
+ sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); | |
+ if (sockfd == -1) continue; | |
+ | |
+ /* connect to server */ | |
+ rv = connect(sockfd, servinfo->ai_addr, servinfo->ai_addrlen); | |
+ if(rv == -1) continue; | |
+ break; | |
+ } | |
+ | |
+ /* free servinfo */ | |
+ freeaddrinfo(servinfo); | |
+ | |
+ /* make non-blocking */ | |
+ if (sockfd != -1) fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK); | |
+ | |
+ /* return the new socket fd */ | |
+ return sockfd; | |
+} | |
+ | |
+#endif | |
\ No newline at end of file |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment