-
-
Save phodal/fd1be9ea3cc13cd48ffa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Copyright (c) 2008-2012 Nicholas O'Leary | |
Permission is hereby granted, free of charge, to any person obtaining | |
a copy of this software and associated documentation files (the | |
"Software"), to deal in the Software without restriction, including | |
without limitation the rights to use, copy, modify, merge, publish, | |
distribute, sublicense, and/or sell copies of the Software, and to | |
permit persons to whom the Software is furnished to do so, subject to | |
the following conditions: | |
The above copyright notice and this permission notice shall be | |
included in all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE | |
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION | |
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION | |
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Copyright (c) 2012 Matteo Collina | |
Permission is hereby granted, free of charge, to any person obtaining | |
a copy of this software and associated documentation files (the | |
"Software"), to deal in the Software without restriction, including | |
without limitation the rights to use, copy, modify, merge, publish, | |
distribute, sublicense, and/or sell copies of the Software, and to | |
permit persons to whom the Software is furnished to do so, subject to | |
the following conditions: | |
The above copyright notice and this permission notice shall be | |
included in all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE | |
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION | |
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION | |
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
PubSubClient.cpp - A simple client for MQTT. | |
Nicholas O'Leary | |
http://knolleary.net | |
*/ | |
#include "PubSubClient.h" | |
#include <EthernetClient.h> | |
#include <string.h> | |
PubSubClient::PubSubClient() : _client() { | |
} | |
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,int)) : _client() { | |
this->callback = callback; | |
this->ip = ip; | |
this->port = port; | |
} | |
PubSubClient::PubSubClient(String domain, uint16_t port, void (*callback)(char*,uint8_t*,int)) : _client() { | |
this->callback = callback; | |
this->domain = domain; | |
this->port = port; | |
} | |
int PubSubClient::connect(char *id) { | |
return connect(id,0,0,0,0); | |
} | |
int PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { | |
if (!connected()) { | |
int result = 0; | |
if (domain != NULL) { | |
char c[40]; | |
this->domain.toCharArray(c, 40); | |
result = _client.connect(c, this->port); | |
} else { | |
result = _client.connect(this->ip, this->port); | |
} | |
if (result) { | |
nextMsgId = 1; | |
uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; | |
uint8_t length = 0; | |
int j; | |
for (j = 0;j<9;j++) { | |
buffer[length++] = d[j]; | |
} | |
if (willTopic) { | |
buffer[length++] = 0x06|(willQos<<3)|(willRetain<<5); | |
} else { | |
buffer[length++] = 0x02; | |
} | |
buffer[length++] = 0; | |
buffer[length++] = (KEEPALIVE/1000); | |
length = writeString(id,buffer,length); | |
if (willTopic) { | |
length = writeString(willTopic,buffer,length); | |
length = writeString(willMessage,buffer,length); | |
} | |
write(MQTTCONNECT,buffer,length); | |
lastOutActivity = millis(); | |
lastInActivity = millis(); | |
while (!_client.available()) { | |
long t= millis(); | |
if (t-lastInActivity > KEEPALIVE) { | |
_client.stop(); | |
return 0; | |
} | |
} | |
uint8_t len = readPacket(); | |
if (len == 4 && buffer[3] == 0) { | |
lastInActivity = millis(); | |
pingOutstanding = false; | |
return 1; | |
} | |
} | |
_client.stop(); | |
} | |
return 0; | |
} | |
uint8_t PubSubClient::readByte() { | |
while(!_client.available()) {} | |
return _client.read(); | |
} | |
uint8_t PubSubClient::readPacket() { | |
uint8_t len = 0; | |
buffer[len++] = readByte(); | |
uint8_t multiplier = 1; | |
uint8_t length = 0; | |
uint8_t digit = 0; | |
do { | |
digit = readByte(); | |
buffer[len++] = digit; | |
length += (digit & 127) * multiplier; | |
multiplier *= 128; | |
} while ((digit & 128) != 0); | |
for (int i = 0;i<length;i++) | |
{ | |
if (len < MAX_PACKET_SIZE) { | |
buffer[len++] = readByte(); | |
} else { | |
readByte(); | |
len = 0; // This will cause the packet to be ignored. | |
} | |
} | |
return len; | |
} | |
int PubSubClient::loop() { | |
if (connected()) { | |
long t = millis(); | |
if ((t - lastInActivity > KEEPALIVE) || (t - lastOutActivity > KEEPALIVE)) { | |
if (pingOutstanding) { | |
_client.stop(); | |
return 0; | |
} else { | |
_client.write(MQTTPINGREQ); | |
_client.write((uint8_t)0); | |
lastOutActivity = t; | |
lastInActivity = t; | |
pingOutstanding = true; | |
} | |
} | |
if (_client.available()) { | |
uint8_t len = readPacket(); | |
if (len > 0) { | |
lastInActivity = t; | |
uint8_t type = buffer[0]&0xF0; | |
if (type == MQTTPUBLISH) { | |
if (callback) { | |
uint8_t tl = (buffer[2]<<3)+buffer[3]; | |
char topic[tl+1]; | |
for (int i=0;i<tl;i++) { | |
topic[i] = buffer[4+i]; | |
} | |
topic[tl] = 0; | |
// ignore msgID - only support QoS 0 subs | |
uint8_t *payload = buffer+4+tl; | |
callback(topic,payload,len-4-tl); | |
} | |
} else if (type == MQTTPINGREQ) { | |
_client.write(MQTTPINGRESP); | |
_client.write((uint8_t)0); | |
} else if (type == MQTTPINGRESP) { | |
pingOutstanding = false; | |
} | |
} | |
} | |
return 1; | |
} | |
return 0; | |
} | |
int PubSubClient::publish(char* topic, char* payload) { | |
return publish(topic,(uint8_t*)payload,strlen(payload)); | |
} | |
int PubSubClient::publish(char* topic, uint8_t* payload, uint8_t plength) { | |
return publish(topic, payload, plength, 0); | |
} | |
int PubSubClient::publish(char* topic, uint8_t* payload, uint8_t plength, uint8_t retained) { | |
if (connected()) { | |
uint8_t length = writeString(topic,buffer,0); | |
int i; | |
for (i=0;i<plength;i++) { | |
buffer[length++] = payload[i]; | |
} | |
uint8_t header = MQTTPUBLISH; | |
if (retained != 0) { | |
header |= 1; | |
} | |
write(header,buffer,length); | |
return 1; | |
} | |
return 0; | |
} | |
int PubSubClient::write(uint8_t header, uint8_t* buf, uint8_t length) { | |
_client.write(header); | |
_client.write(length); | |
_client.write(buf,length); | |
lastOutActivity = millis(); | |
return 0; | |
} | |
void PubSubClient::subscribe(char* topic) { | |
if (connected()) { | |
uint8_t length = 2; | |
nextMsgId++; | |
buffer[0] = nextMsgId >> 8; | |
buffer[1] = nextMsgId - (buffer[0]<<8); | |
length = writeString(topic, buffer,length); | |
buffer[length++] = 0; // Only do QoS 0 subs | |
write(MQTTSUBSCRIBE,buffer,length); | |
} | |
} | |
void PubSubClient::disconnect() { | |
_client.write(MQTTDISCONNECT); | |
_client.write((uint8_t)0); | |
_client.stop(); | |
lastInActivity = millis(); | |
lastOutActivity = millis(); | |
} | |
uint8_t PubSubClient::writeString(char* string, uint8_t* buf, uint8_t pos) { | |
char* idp = string; | |
uint8_t i = 0; | |
pos += 2; | |
while (*idp) { | |
buf[pos++] = *idp++; | |
i++; | |
} | |
buf[pos-i-2] = 0; | |
buf[pos-i-1] = i; | |
return pos; | |
} | |
int PubSubClient::connected() { | |
int rc = (int)_client.connected(); | |
if (!rc) _client.stop(); | |
return rc; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
PubSubClient.h - A simple client for MQTT. | |
Nicholas O'Leary | |
http://knolleary.net | |
*/ | |
#ifndef PubSubClient_h | |
#define PubSubClient_h | |
#include "Ethernet.h" | |
#include "EthernetClient.h" | |
#define MAX_PACKET_SIZE 128 | |
#define KEEPALIVE 15000 // max value = 255000 | |
// from mqtt-v3r1 | |
#define MQTTPROTOCOLVERSION 3 | |
#define MQTTCONNECT 1 << 4 // Client request to connect to Server | |
#define MQTTCONNACK 2 << 4 // Connect Acknowledgment | |
#define MQTTPUBLISH 3 << 4 // Publish message | |
#define MQTTPUBACK 4 << 4 // Publish Acknowledgment | |
#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1) | |
#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2) | |
#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3) | |
#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request | |
#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment | |
#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request | |
#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment | |
#define MQTTPINGREQ 12 << 4 // PING Request | |
#define MQTTPINGRESP 13 << 4 // PING Response | |
#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting | |
#define MQTTReserved 15 << 4 // Reserved | |
class PubSubClient { | |
private: | |
EthernetClient _client; | |
uint8_t buffer[MAX_PACKET_SIZE]; | |
uint8_t nextMsgId; | |
long lastOutActivity; | |
long lastInActivity; | |
bool pingOutstanding; | |
void (*callback)(char*,uint8_t*,int); | |
uint8_t readPacket(); | |
uint8_t readByte(); | |
int write(uint8_t header, uint8_t* buf, uint8_t length); | |
uint8_t writeString(char* string, uint8_t* buf, uint8_t pos); | |
uint8_t *ip; | |
String domain; | |
uint16_t port; | |
public: | |
PubSubClient(); | |
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,int)); | |
PubSubClient(String, uint16_t, void(*)(char*,uint8_t*,int)); | |
int connect(char *); | |
int connect(char*, char*, uint8_t, uint8_t, char*); | |
void disconnect(); | |
int publish(char *, char *); | |
int publish(char *, uint8_t *, uint8_t); | |
int publish(char *, uint8_t *, uint8_t, uint8_t); | |
void subscribe(char *); | |
int loop(); | |
int connected(); | |
}; | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment