Skip to content

Instantly share code, notes, and snippets.

@colinl
Last active August 29, 2023 16:23
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save colinl/05e6d61f14ef6af763ec4cfd1049ab61 to your computer and use it in GitHub Desktop.
Guaranteed delivery of data (upload, email, MQTT etc) across a network

These flows demonstrates how to achieve guaranteed delivery of data across a network, where it may not always be possible to send the data, due to network outage or server not available. When it is not possible the data will be buffered in a queue and transmitted later. If persistent context is configured then the queue can be retained there so that the queue will survive a node-red restart or power down.

The first example in the flow below shows how to use it for sending email, but it can be easily modified to handle other similar processes. The 'guaranteed delivery' in this case is only that it got to the email server of course, not that it actually got to the recipient. It can be passed messages suitable for sending to the email node by sending to the link labelled Send email messages to this link.

The second example in the flow shows how to use it to guarantee that MQTT publish messages are all sent.

To try the first example, edit the Send test email node and enter an appropriate email recipient in msg.to. Edit the email node and enter your email server and credentials. If the email delivery (to the server) is successful then it will wait for more messages. If the delivery fails then it will retry approximately every 60 seconds until it succeeds. In the meantime any additional requests will be queued and will be sent when possible.

Double click the Guaranteed delivery node to see the configuration options available.

The subflow has to be informed when the delivery is successful or fails. This is done by passing in a message with a property set to a value to indicate success or failure. The property name and the success and fail values are configurable. The default property is control and the default values are OK and FAIL Choose a property name which will not conflict with any properties in messages to be sent. Look in the OK and FAIL nodes in the flow and it can be seen how the propery is setup ready for passing back to the subflow.

Following a failure the attempt is retried at a configurable Retry Period, configurable in the subflow properties. The default is 60 seconds.

If a persistent config store is available then this may also be selected in the properties. If this is done then if node-red is restarted or the machine reboots then messages currently in the queue will not be lost.

The maximum number of messages may be specified in the subflow properties. Once the queue is full then the oldest messages are discarded as new ones are passed in. If the max is set to zero then there will be no limit.

To modify the flow to deliver to something other than email then remove the nodes below the Guaranteed Delivery node and replace them with nodes to perform the required delivery action. When the delivery succeeds send an OK control message back to the node and if it fails then send a FAIL message back in the same way as is done in the sample flow. It is imperative that either an OK or a FAIL message is sent, and not both.

For the second example, configure the MQTT node as required and send the messages to be sent in via the top left link node. It uses a Complete node attached to the MQTT node to know when the message has been sent successfully but there is no way to know when it has failed, so it uses a 5 second timeout to indicate that. When it fails it retries every 60 seconds. That can be configured in the subflow settings. Be aware though that you may get repeated messages when MQTT reconnects as the MQTT node may buffer up some messages and send them when the connection recovers. If repeated messages are a problem for you then you will have to deal with that at the receiving end. Also you may get warning messages about unexpected OK messages after a recovery. You can safely ignore those (or edit the function in the subflow to stop the warning).

[{"id":"72e9a4edb56a84a7","type":"group","z":"db2da7ec2cdcbc9c","name":"Sending emails","style":{"label":true},"nodes":["40032b86.16b0dc","9d7e52ff.9373a","98d72bd3.c5c98","78bcff9d.8c29c","eada5d9a.8c60e","87580f77.38e0a","75675144.54ee98","26ef1404.562c74","52c5b999.776288","a246571e.945ae8","406fad44.dfcbdc","be8e3686.b3db4"],"x":34,"y":39,"w":722,"h":362},{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow","info":"","category":"","in":[{"x":50,"y":81,"wires":[{"id":"62560ae6ab8737dc"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"62560ae6ab8737dc","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":100,"y":181,"wires":[["62560ae6ab8737dc"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["62560ae6ab8737dc"],"x":300,"y":160,"wires":[[]]},{"id":"62560ae6ab8737dc","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n handleFAIL(stat)\n} else if (control === \"__trigger\") {\n handleTrigger(stat)\n} else {\n // no valid control value so must be incoming message\n handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n case \"initial\":\n case \"waitingForMsg\":\n sendMsg(stat) // send next message if any\n break;\n \n case \"waitingForTrigger\":\n case \"waitingForOKFail\":\n // do nothing\n break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n //node.warn(\"handleMessage\")\n // push a clone onto the queue\n stat.queue.push(RED.util.cloneMessage(msg))\n // limit number in queue\n const max = Number(env.get(\"maxQueue\"))\n if (!isNaN(max) && max > 0) {\n // max length hit, remove oldest\n if (stat.queue.length > max) stat.queue.shift()\n }\n // Simulate a trigger event to handle any state change needed\n handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n //node.warn(\"sendMsg\")\n let thisMsg = stat.queue[0]\n if (thisMsg) {\n // send a clone\n //node.warn(\"sending\")\n node.send(RED.util.cloneMessage(thisMsg))\n stat.state = \"waitingForOKFail\"\n } else {\n // nothing in queue\n stat.state = \"waitingForMsg\"\n }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n //node.warn(\"handleOK\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // OK received so drop the top message \n stat.queue.shift()\n // set the state to waiting for message, which will allow the next one to be sent\n stat.state = \"waitingForMsg\"\n } else {\n node.warn(\"Ignoring unnexpected OK\")\n }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n //node.warn(\"handleFAIL\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // FAIL received so go to waiting for trigger state\n stat.state = \"waitingForTrigger\"\n } else {\n node.warn(\"Ignoring unnexpected FAIL\")\n }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n //node.warn(\"handleTrigger\")\n if (stat.state === \"waitingForTrigger\") {\n //node.warn(\"state to waiting\")\n // set it to watitingForMsg in order to trigger send \n stat.state = \"waitingForMsg\"\n }\n // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"40032b86.16b0dc","type":"link out","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"","links":["eada5d9a.8c60e"],"x":715,"y":320,"wires":[]},{"id":"9d7e52ff.9373a","type":"change","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":550,"y":320,"wires":[["40032b86.16b0dc"]]},{"id":"98d72bd3.c5c98","type":"change","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":550,"y":360,"wires":[["40032b86.16b0dc"]]},{"id":"78bcff9d.8c29c","type":"inject","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"Send test email","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"to","v":"someone@somewhere.com","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"email subject","payload":"This is the email contents","payloadType":"str","x":340,"y":80,"wires":[["52c5b999.776288"]]},{"id":"eada5d9a.8c60e","type":"link in","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"","links":["40032b86.16b0dc"],"x":175,"y":280,"wires":[["406fad44.dfcbdc"]]},{"id":"87580f77.38e0a","type":"complete","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"","scope":["be8e3686.b3db4"],"uncaught":false,"x":370,"y":320,"wires":[["9d7e52ff.9373a"]]},{"id":"75675144.54ee98","type":"catch","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"","scope":["be8e3686.b3db4"],"uncaught":false,"x":350,"y":360,"wires":[["98d72bd3.c5c98"]]},{"id":"26ef1404.562c74","type":"link in","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"Email delivery","links":["52c5b999.776288"],"x":95,"y":180,"wires":[["406fad44.dfcbdc"]]},{"id":"52c5b999.776288","type":"link out","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"","links":["26ef1404.562c74"],"x":455,"y":80,"wires":[]},{"id":"a246571e.945ae8","type":"comment","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"Send email messages to this link","info":"","x":190,"y":140,"wires":[]},{"id":"406fad44.dfcbdc","type":"subflow:149380c1.63e107","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","name":"Guaranteed delivery","env":[],"x":360,"y":180,"wires":[["be8e3686.b3db4"]]},{"id":"be8e3686.b3db4","type":"e-mail","z":"db2da7ec2cdcbc9c","g":"72e9a4edb56a84a7","server":"smtp.gmail.com","port":"465","authtype":"BASIC","saslformat":false,"token":"","secure":true,"tls":true,"name":"","dname":"","x":350,"y":260,"wires":[]},{"id":"1034d23a3ba16675","type":"group","z":"db2da7ec2cdcbc9c","name":"Publishing to MQTT","style":{"label":true},"nodes":["8bed187d18f1775d","d94bd0a592a3ad72","04f71eed39690546","7f547cec7932ba41","27fcfce0cae62640","7dca73f85662f315","fb17d270bf0b4b9a","0866631e9e6da383","1348e3d206d0cc83","cdd4ffb09922fef1","430d018a776903b1","34b76192fa82bed4"],"x":34,"y":439,"w":812,"h":322},{"id":"8bed187d18f1775d","type":"change","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":670,"y":620,"wires":[["04f71eed39690546"]]},{"id":"d94bd0a592a3ad72","type":"change","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":670,"y":720,"wires":[["04f71eed39690546"]]},{"id":"04f71eed39690546","type":"link out","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"","links":["7f547cec7932ba41"],"x":795,"y":660,"wires":[]},{"id":"7f547cec7932ba41","type":"link in","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"","links":["04f71eed39690546"],"x":155,"y":580,"wires":[["34b76192fa82bed4"]]},{"id":"27fcfce0cae62640","type":"link in","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"Email delivery","links":["2090b139.480446","3e9920770783be4e"],"x":95,"y":520,"wires":[["34b76192fa82bed4"]]},{"id":"7dca73f85662f315","type":"comment","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"Send MQTT messages to this link","info":"","x":190,"y":480,"wires":[]},{"id":"fb17d270bf0b4b9a","type":"mqtt out","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"b11cbe817c4d2a5a","x":770,"y":540,"wires":[]},{"id":"0866631e9e6da383","type":"trigger","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"Timeout 5 secs","op1":"","op2":"0","op1type":"nul","op2type":"num","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":520,"y":620,"wires":[["8bed187d18f1775d"]]},{"id":"1348e3d206d0cc83","type":"comment","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"Adjust timeout time if necessary","info":"","x":570,"y":580,"wires":[]},{"id":"cdd4ffb09922fef1","type":"complete","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"MQTT Complete","scope":["fb17d270bf0b4b9a"],"uncaught":false,"x":200,"y":720,"wires":[["430d018a776903b1","d94bd0a592a3ad72"]]},{"id":"430d018a776903b1","type":"change","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":290,"y":640,"wires":[["0866631e9e6da383"]]},{"id":"34b76192fa82bed4","type":"subflow:149380c1.63e107","z":"db2da7ec2cdcbc9c","g":"1034d23a3ba16675","name":"Guaranteed delivery","x":320,"y":540,"wires":[["0866631e9e6da383","fb17d270bf0b4b9a"]]},{"id":"b11cbe817c4d2a5a","type":"mqtt-broker","name":"","broker":"localhost","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment