Skip to content

Instantly share code, notes, and snippets.

@dceejay
Last active February 14, 2019 15:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dceejay/6773f62cace54766bf6d79afe945314f to your computer and use it in GitHub Desktop.
Save dceejay/6773f62cace54766bf6d79afe945314f to your computer and use it in GitHub Desktop.
Simple Status gated queue

This simple flow show how you could use the status node to act as a gate to a simple function node acting as a queue for sending messages to an mqtt node.

When the status is not connected the function node stores up the messages to a maximum depth (set in the function node to 10 currently).

When the mqtt node status changes to connected all the stored messages are then sent to the mqtt node for delivery.

Obviously this technique could be extended for use by other node types.

Note: the mqtt node may only detect the broken connection after its keepalive fails (typically set to 60 seconds), in that period the function node will not be saving messages... so you need to ensure the mqtt node is set to manage this itself, by using QoS 2 delivery and setting clean-session to false. That way the mqtt client will itself then hang onto those messages itself until they have been delivered.

[{"id":"f9ec7a29.852b18","type":"inject","z":"82738787.0e0338","name":"","topic":"test","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":220,"y":600,"wires":[["8e6485b.fc3ce78"]]},{"id":"d25c0e58.e75d7","type":"mqtt out","z":"82738787.0e0338","name":"","topic":"","qos":"2","retain":"","broker":"5085b198.064c4","x":630,"y":600,"wires":[]},{"id":"ccb84c90.3f15e","type":"debug","z":"82738787.0e0338","name":"","active":false,"console":"false","complete":"false","x":650,"y":660,"wires":[]},{"id":"8e6485b.fc3ce78","type":"function","z":"82738787.0e0338","name":"Status gated queue","func":"\nvar MAX_DEPTH = 10;\n\n// if queue doesn't exist, create it\ncontext.queue = context.queue || [];\ncontext.pass = context.pass || false;\n\n// Use MQTT connected status to gate flow\nif (msg.hasOwnProperty(\"status\")) {\n if (msg.status.text.indexOf(\".connected\") !== -1) { \n setTimeout(function() { \n while (context.queue.length > 0) {\n var m = context.queue.shift();\n node.send(m); \n }\n context.pass = true; \n node.status({});\n },5);\n }\n else { context.pass = false; }\n}\nelse {\n if (context.pass) { return msg; }\n else { \n context.queue.push(msg); \n if (context.queue.length > MAX_DEPTH) { context.queue.shift(); }\n node.status({text:context.queue.length});\n }\n}\nreturn null;","outputs":1,"noerr":0,"x":430,"y":600,"wires":[["ccb84c90.3f15e","d25c0e58.e75d7"]]},{"id":"147525f9.9ff5ba","type":"status","z":"82738787.0e0338","name":"","scope":["d25c0e58.e75d7"],"x":220,"y":660,"wires":[["8e6485b.fc3ce78"]]},{"id":"5085b198.064c4","type":"mqtt-broker","z":"","name":"","broker":"myremotebroker","port":"1883","clientid":"test","usetls":false,"compatmode":true,"keepalive":"20","cleansession":false,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment