Recipe: from MQTT to InfluxDB via Node-Red
Introduction
Getting data produced by IoT sensors into a database is practically a mandatory step before effective visualisation (eg dashboards).
This recipe shows you how to get an MQTT payload into an InfluxDB database using three Node-Red nodes. It makes the following assumptions:
- A client device of some kind publishing data to a topic via the MQTT protocol;
- Mosquitto (MQTT broker);
- Node-Red subscribing to the topic; and
- InfluxDB running and accessible to Node-Red.
This recipe also show how to map between JSON keys and database field and tag names.
Task goal
Given this MQTT message structure:
topic: /site/topic
message: {"b": true, "i": 123, "r": 456.78, "s": "hello world", "t": "tagValue"}
the task is to configure the Node-Red instance to:
- Listen for the topic "/site/topic";
- Parse the JSON-format message payload; and
- Add a row to a measurement (table) named "example" in an InfluxDB database named "test".
This recipe also demonstrates how to use abbreviated keys in a JSON message and map those to more meaningful field names (attributes) in the InfluxDB database:
JSON Key | Influx Field | Influx Tag | Expected Value | Implied type |
---|---|---|---|---|
b | flag | true | Boolean | |
i | discrete | 123 | Integer | |
r | continuous | 456.78 | Real | |
s | message | "hello world" | String | |
t | identity | "tagValue" | String |
There is nothing magical about either these JSON keys, or the InfluxDB field or tag names. JSON keys do not have to be single characters, and InfluxDB names do not have to be long, meaningful or descriptive. It is just that, in general, IoT devices are memory-constrained so short strings are to be preferred over longer strings; while "meaningful and descriptive" are considered best practice in a database environment.
There is also nothing magical about the choice of these implied data types. They just happen to be the ones you are most likely to need to pass between a client device and a database.
Preparation
client device (simulated)
During testing you will find it useful to have a computer with an MQTT client installed. Explaining the full how-to of this is beyond the scope of this recipe so you should start at mosquitto.org/download.
On Linux, it is not immediately obvious that you need either or both of the following:
$ sudo apt install mosquitto-clients
$ sudo apt install mosquitto
The first command installs the MQTT publishing and subscribing command-line clients. You will need this package to get the mosquitto_pub command.
The second command installs the MQTT broker. You will only need this if you do not have another MQTT broker running somewhere (eg in a Docker container).
InfluxDB
If your copy of InfluxDB is running inside a Docker container, consider adding this alias statement to your .profile or .bashrc:
$ alias influx='docker exec -it influxdb influx -precision=rfc3339'
That alias allows you to connect to the "influx" command line interface (CLI) simply by typing:
$ influx
By default, "influx" displays time as nanoseconds since 1970-01-01 UTC. The
-precision=rfc3339
argument displays time in human-readable form.
The "test" database must be created by hand. If you omit this step you will get an error from Node-Red. You can initialise the database like this:
$ influx
> CREATE DATABASE test
> quit
Node-Red
- Launch your browser and connect to your Node-Red server.
- Use the main menu (three horizontal bars "≡" at the top, right of the Node-Red window) to open the Palette Manager:
- select the "Nodes" tab and check whether
node-red-contrib-influxdb
is already installed. If it is not installed, - select the "Install" tab, then search for and install node-red-contrib-influxdb. If you prefer to install contributions from the command line, do that.
- select the "Nodes" tab and check whether
A three-node flow
Back in the Node-Red main window, click the "+" button to add a new empty flow. The default title will be something like "Flow 1". Double-click the title and rename the flow with something more descriptive, like "Influx Test". The actual name is not important.
1. "mqtt in" node
Drag an "mqtt in" node onto the canvas. Double-click to open and configure as follows:
- Select the option to add a new mqtt-broker.
- Click the pencil icon to open the server properties panel.
- Give the server a meaningful name (eg "Docker MQTT").
- Supply the network path to the host running Mosquitto:
- In a Docker environment, this will be the name of the container running Mosquitto (eg "mosquitto").
- In a non-Docker enviroment where Node-Red and Mosquitto are running on the same host, this will be the loopback address 127.0.0.1.
- If Node-Red and Mosquitto are running on different hosts then this will be a static IP address or the fully-qualified domain name of the host running Mosquitto.
- Click "Add".
- Enter the topic string ("/site/topic").
- Set the "Output" popup to "a parsed JSON object".
- Enter a name for the node. This appears in the schematic and it is good practice to repeat the topic string.
- Click "Done" to complete the node setup.
All other fields can either be left at their default settings or changed to suit your requirements.
The purpose of this node is to:
- Listen to MQTT messages directed to "/site/topic"; and
- Convert the JSON string in the MQTT message body to a JavaScript object representation.
In other words, given an input of the JSON string specified in the task goal, the output from the node will be:
msg.payload = {
b: true,
i: 123,
r: 456.78,
s: "hello world"
t: "tagValue"
}
2. "change" node
Drag a "change" node onto the canvas.
Connect the outlet of the "mqtt in" node to the inlet of the "change" node.
Double-click the "change" node to open and configure as follows:
-
Enter a name for the node. This appears in the schematic and it is good practice to summarise the purpose of the node.
-
A new change node contains a single rule to "Set msg.payload" but where the data type of the "to" field defaults to a string. Change the popup menu to a Java expression.
-
Click the ellipsis ("…") to open the expression editor.
-
Copy the expression below and paste it into this window.
[ { "flag": msg.payload.b, "discrete": msg.payload.i, "continuous": msg.payload.r, "message": msg.payload.s },{ "identity": msg.payload.t } ]
-
Click "Done".
-
Click "Done" to complete the node setup.
The purpose of this node is to provide a cross-walk between the JSON keys ("b", "i", "r", "s" and "t"), and the field and tag names you need in the InfluxDB database. The basic pattern is:
[
{
fieldName : msg.payload.key,
...
},{
tagName : msg.payload.key,
...
}
]
If you only want to pass fields, then omit the square brackets and the elements that describe the tag(s), like this:
{
fieldName : msg.payload.key,
...
}
Note that it is feasible to omit this "change" node entirely. If you do that the JSON keys in the Node-Red "payload" variable will become the field names in the database. Before you take that shortcut, consider:
-
If an MQTT client starts providing unexpected JSON keys, those can easily pollute your InfluxDB database. Using a cross-walk between the expected JSON keys and the field and tag names you want in your database provides an effective barrier against unexpected keys.
-
You may wish to include keys in your MQTT payload that you do not want to wind up in your database. Good examples are millis() uptime, free heap and version numbers. Such values are usually ephemeral and only of interest at the moment when they are produced (and might only be produced when you are actively debugging your MQTT client). You can always see such values by subscribing to the MQTT feed or attaching a Debug node to the "mqtt in" node.
-
A "change" node simplifies the process of adding new tags and fields. You can:
- Add a new key+value pair to the JSON payload being produced by your MQTT client, then
- Attach a Debug node to the "mqtt in" node to confirm that you are receiving the expected data, then
- Change the cross-walk when you are ready to start importing the data into your database.
-
If you want to include both tags and fields in your database, you really only have two options, either:
- your MQTT client has to format the JSON payload correctly before transmission, or
- you need a "change" node to implement a cross-walk.
-
Opting to do the work in your MQTT client effectively rules out the tactical use of ephemeral values and a step-wise refinement approach to development if you need to add new fields.
Given the output from the "mqtt in" node, the Javascript expression in the "change" node will result in:
msg.payload = [
{
flag: true,
discrete: 123,
continuous: 456.78,
message: "hello world"
},{
identity: "tagValue"
}
]
3. "influxdb out" node
Drag an "influxdb out" node onto the canvas.
Can't find the "influxdb out" node in the palette? Double-check that you installed "node-red-contrib-influxdb" as described above.
Connect the outlet of the "change" node to the inlet of the "influxdb out" node.
Double-click the "influxdb out" node to open and configure as follows:
- From the "Server" popup menu, choose "Add new influxdb...".
- Click the pencil icon to open the server properties panel.
- Supply the network path to the host running InfluxDB:
- In a Docker environment, this will be the name of the container running InfluxDB (eg "influxdb").
- In a non-Docker enviroment where Node-Red and InfluxDB are running on the same host, this will be the loopback address 127.0.0.1.
- If Node-Red and InfluxDB are running on different hosts then this will be a static IP address or the fully-qualified domain name of the host running InfluxDB.
- The name of the InfluxDB database. This needs to be the same as you created earlier ("CREATE DATABASE test").
- I recommend leaving this field blank. If you do then the "Server" field in the previous panel will take on the "host:port/database" appearance shown at the end of the dotted line in the figure. You lose that valuable synopsis by supplying a name in this field.
- Click "Add".
- Supply the name of the measurement you want to write to. This is analogous to a "table" in SQL parlance. The recommended name at this stage is "example".
- Enter a name for the node. This appears in the schematic. It is good practice to summarise the purpose of the node.
- Click "Done" to complete the node setup.
All other fields can be left at their default settings or changed to suit your requirements.
Warning: InfluxDB database connections are global to Node-Red. Suppose you have an existing flow connected to the "test" database. When you create a new flow for a new database, it is very tempting to copy the "influxdb out" node from the old flow, paste it into the new flow, open it, click the pencil icon, and just change the database name. If you do that, you will break your old flow because it will refer to the new database. Always start from scratch by dragging a new "influxdb out" node onto the canvas.
Given the output from the "change" node, the practical effect of this node is:
$ influx
> USE test
> INSERT example,identity=tagValue flag=true,discrete=123,continuous=456.78,message="hello world"
> quit
Saving your work
Click the Deploy button near the top, right of the canvas.
Testing your work
Add two "debug" nodes to the canvas. Double-click each in turn and set its Output to "complete msg object". Connect the outlet of the "mqtt in" node to the first "debug" node, and the outlet of the "change" node to the second "debug" node. The final result should look something like this:
Select the Debug panel (the controls below "Deploy").
Click "Deploy" to activate. Any errors will show up in the Debug panel.
Copy the following text to the clipboard then paste it into a text editor.
mosquitto_pub -h host -t '/site/topic' -m '{"b": true, "i": 123, "r": 456.78, "s": "hello world", "t": "tagValue"}'
Edit the "host" field to point to the server running your Mosquitto broker. This might be an IP address or a fully-qualified domain name.
Paste the text into a Terminal window on your client device and press return.
If all goes well, you will get two debug messages from Node-Red. The first is from the "mqtt in" node confirming receipt of the JSON payload:
▿ object
topic: "/site/topic"
▿ payload: object
b: true
i: 123
r: 456.78
s: "hello world"
t: "tagValue"
and the second is from the "change" node showing the effect of the cross-walk:
▿ object
topic: "/site/topic"
▿ payload: array[2]
▿0: object
flag: true
discrete: 123
continuous: 456.78
message: "hello world"
▿1: object
identity: "tagValue"
To confirm that the data made it all the way to the InfluxDB database:
$ influx
> USE test
> show measurements
name: measurements
name
----
example
> show series
key
---
example,identity=tagValue
> show tag keys
name: example
tagKey
------
identity
> show field keys
name: example
fieldKey fieldType
-------- ---------
continuous float
discrete float
flag boolean
message string
> select * from example
name: example
time continuous discrete flag identity message
---- ---------- -------- ---- -------- -------
2020-02-12T03:56:07.844235334Z 456.78 123 true tagValue hello world
> quit
Cleaning up
You can either delete or deactivate the "debug" nodes in the Node-Red flow.
When you no longer need the test database, you can remove it like this:
$ influx
> DROP TABLE test
> quit
Hi Phill, this guide is amazing! Thanks to share your knowledge!!! I'm trying to get data from ShellyEM. I'm not a programmer, but I have a general understanding of MQTT, NodeRed and InfluxDB. Could you help me adjust the JSO Expression in the Change Node for ShellyEM? I'm need to get the following topics: Power, Reactive Power and Voltage. Thanks for your support!
