Skip to content

Instantly share code, notes, and snippets.

@smartynov
Last active July 22, 2020 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smartynov/c3101c9a021ff97a37cf714f67412aa1 to your computer and use it in GitHub Desktop.
Save smartynov/c3101c9a021ff97a37cf714f67412aa1 to your computer and use it in GitHub Desktop.
An example of "service bus" approach to Arduino-based "smart home" systems
#include "ServiceBus.h"
#include "Button.h"
#include "Relay.h"
#include "HeaterControl.h"
void setup() {
sb = ServiceBus::Init();
// just a button (emits mqtt events on click / longpress)
new Button(sb, "entrance/button", CONTROLLINO_A10);
// a button-controlled relay (mqtt control applies too)
new Button(sb, "bedroom/button", CONTROLLINO_A1,
new Relay(sb, "bedroom/light", CONTROLLINO_R0));
// relay without button, only mqtt control
new Relay(sb, "toilet/fan", CONTROLLINO_R14);
// floor heating controller (1wire temperature sensor + relay pin)
OneWire *owBus = new OneWire(20);
byte sensorBathroom[] = {0x28, 0x2B, 0xB7, 0xC7, 0x16, 0x13, 0x01, 0x8D};
new HeaterControl(sb, "bathroom/floor", CONTROLLINO_D18, owBus, sensorBathroom, 26);
}
void loop() {
sb->loop();
}
#include "Service.h"
Service::Service(ServiceBus * _bus, const char * _name) : bus(_bus), period(0)
{
name = strdup(_name);
_bus->add(this);
}
Service::~Service() {
free(name);
}
#ifndef SERVICE_H
#define SERVICE_H
class Service;
#include "ServiceBus.h"
class Service {
protected:
ServiceBus * bus;
public:
char * name = {0};
Service * next = 0;
unsigned long period; // how often to call (may be changed in runtime)
unsigned long lastCall; // last time service loop() was called
Service(ServiceBus * _bus, const char * _name); // takes one arg - current time
virtual ~Service();
virtual void loop() {}
virtual void callback(const char * _topic, const char * _payload, unsigned int _length) {}
};
#endif
#include "ServiceBus.h"
#include "MemoryInfo.h"
ServiceBus * ServiceBus::instance = 0;
ServiceBus * ServiceBus::Instance() {
return instance;
}
ServiceBus * ServiceBus::Init(Client& _client, IPAddress _mqttServer) {
if (!instance) {
instance = new ServiceBus;
instance->mqttClient = new PubSubClient(_client);
}
instance->now = millis();
instance->mqttClient->setServer(_mqttServer, 1883);
instance->mqttClient->setCallback(_callback);
return instance;
}
void ServiceBus::_callback(char * _topic, uint8_t * _payload, unsigned int _length) {
#ifdef DEBUG
Serial.print("ServiceBus::_callback: ");
Serial.println(_topic);
#endif
topicListNode * node = instance->topicList;
while (node) {
if (!strcmp(_topic, node->topic)) {
node->service->callback(_topic, (const char *)_payload, _length);
}
node = node->next;
}
}
boolean ServiceBus::publish(const char * _topic, const char * _payload) {
return publish(_topic, _payload, false);
}
boolean ServiceBus::publish(const char * _topic, const char * _payload, boolean _retained) {
#ifdef DEBUG
Serial.print("ServiceBus::publish: ");
Serial.println(_topic);
Serial.println(_payload);
Serial.println(_retained);
#endif
return mqttClient->publish(_topic, _payload, _retained);
}
boolean ServiceBus::subscribe(const char * _topic, Service * _service) {
#ifdef DEBUG
Serial.print("ServiceBus::subscribe: ");
Serial.println(_topic);
#endif
topicListNode * node = new topicListNode;
node->topic = strdup(_topic);
node->service = _service;
node->next = topicList;
topicList = node;
return mqttClient->subscribe(_topic);
}
Service * ServiceBus::add(Service * _service) {
#ifdef DEBUG
Serial.print("ServiceBus::add: ");
Serial.println(_service->name);
#endif
now = millis();
_service->lastCall = now;
_service->next = first;
first = _service;
return _service;
}
void ServiceBus::loop() {
now = millis();
Service * service = first;
while (service) {
// check next execution time and avoid millis() rollover
if (now - service->lastCall > service->period) {
service->loop();
service->lastCall = now;
}
service = service->next;
}
if (!mqttClient->connected()) {
if (now - lastReconnectAttempt > 15000) {
lastReconnectAttempt = now;
Serial.println("reconnect");
if (mqttClient->connect("controllino", NULL, NULL, "rack/controllino/status", 0, 1, "offline")) {
Serial.println("ok");
lastReconnectAttempt = 0;
mqttClient->publish("rack/controllino/status", "online", true);
// Resubscribe
topicListNode * node = topicList;
while (node) {
mqttClient->subscribe(node->topic);
node = node->next;
}
}
}
} else {
mqttClient->loop();
}
if (now - lastMemStat > 600000) {
_reportMemStat();
lastMemStat = now;
}
if (now - lastWrote > 1000 && now - lastNow > 5) {
Serial.print("loop time = ");
Serial.println(now - lastNow);
lastWrote = now;
}
lastNow = now;
}
// TODO: move stats to separate service
void ServiceBus::_reportMemStat() {}
#ifndef SERVICEBUS_H
#define SERVICEBUS_H
class ServiceBus;
#include "Service.h"
#include <PubSubClient.h>
class ServiceBus {
private:
ServiceBus() {};
ServiceBus(ServiceBus const&) {};
ServiceBus & operator = (ServiceBus const& sb) { return instance[0]; };
static ServiceBus * instance;
protected:
typedef struct topicListNode {
char * topic;
Service * service;
topicListNode * next;
} topicListNode;
topicListNode * topicList = 0;
Service * first = 0;
PubSubClient * mqttClient = 0;
unsigned long lastReconnectAttempt = 0;
unsigned long lastMemStat = 0;
unsigned long lastWrote = 0;
unsigned long lastNow = 0;
void _reportMemStat();
static void _callback(char * _topic, uint8_t * _payload, unsigned int _length);
public:
unsigned long now;
static ServiceBus * Instance();
static ServiceBus * Init(Client& _client, IPAddress _mqttServer);
Service * add(Service * _service);
boolean publish(const char * _topic, const char * _payload);
boolean publish(const char * _topic, const char * _payload, boolean _retained);
boolean subscribe(const char * _topic, Service * _service);
void loop();
};
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment