Skip to content

Instantly share code, notes, and snippets.

@Paraphraser
Last active February 21, 2024 21:54
Show Gist options
  • Star 45 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save Paraphraser/c9db25d131dd4c09848ffb353b69038f to your computer and use it in GitHub Desktop.
Save Paraphraser/c9db25d131dd4c09848ffb353b69038f to your computer and use it in GitHub Desktop.
Efficient recipe for getting MQTT data into an InfluxDB database using Node-Red

Recipe: from MQTT to InfluxDB via Node-Red

  • 2023-12-02 revise graphics to correspond with InfluxDB-in node, and explain the pros and cons of InfluxDB 1.8 vs InfluxDB 2.

Introduction

Getting data produced by IoT sensors into a database is practically a mandatory step before effective visualisation (eg dashboards).

This recipe shows you how to get an MQTT payload into an InfluxDB database using three Node-Red nodes. It makes the following assumptions:

  • A client device of some kind publishing data to a topic via the MQTT protocol;
  • Mosquitto (MQTT broker);
  • Node-Red subscribing to the topic; and
  • InfluxDB running and accessible to Node-Red.

This recipe also show how to map between JSON keys and database field and tag names.

Some words about InfluxDB

At the time of writing (December 2023), InfluxData supports both InfluxDB 1.8 and InfluxDB 2.

InfluxDB 3 has been announced but is not yet generally available.

If you are beginning your IoT journey, you may wonder which version you should choose. Absent useful guidance, you may well reason like this:

Version 2.0 is obviously later than 1.8 therefore 2.0 must be "better".

In some ways, InfluxDB 2 can be seen as a natural progression from InfluxDB 1.8. In other ways, the two database systems are like chalk and cheese. From a user perspective, one of the most obvious differences between the two systems is the query language:

  • InfluxDB 1.8 uses InfluxQL natively with Flux being an experimental add-on which needs to be enabled;
  • InfluxDB 2 uses Flux natively with partial support for InfluxQL.

One consequence of the difference in query language affects Grafana. Its GUI has native point-and-click support for constructing InfluxQL queries that will run against InfluxDB 1.8 but has no equivalent support for constructing Flux queries that run against InfluxDB 2.

There is a workaround. It involves using the InfluxDB 2 web GUI to construct queries in Flux, which you then copy and paste into Grafana. This definitely works but the lack of native support for Flux in Grafana means that, if anything goes wrong, you often have to start from scratch.

This workaround may seem like an acceptable compromise but, eventually, you will encounter situations where you will want to manipulate the data in your databases. Common examples are:

  • deleting junk that crept in when a sensor went berserk; and
  • reformatting, splitting or merging tables to better reflect how your sensors operate and/or to fix poor design decisions (we all make those).

To manipulate your data, you will need to use the query language.

If you are familiar with SQL then you will likely be comfortable with InfluxQL. While InfluxQL is not the same as SQL and does have its own peculiarities, it has sufficient syntactic similarities with SQL that you can usually figure out how to make it do what you need it to do. Plus, if you get stuck, you can generally find a close example by Googling and go from there.

Flux, on the other hand, is its own language which … how can I put this politely … defies description. In my experience the official documentation is quite short on useful examples and Googling mostly fares no better.

To put this problem more succinctly: you are far more likely to be able to get help with InfluxQL than you are with Flux. Whether that matters is something only you will know.

The last point is that InfluxData has announced InfluxDB 3. The announcement includes:

  1. This statement which appears in a popup overlay in the Flux documentation:

    The future of Flux

    Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

  2. This statement which appears in the InfluxDB Clustered documentation:

    … brings with it native SQL support and improved InfluxQL performance.

I take those two statements to mean that Flux is a technological dead-end and, because of its dependence on Flux, so too is InfluxDB 2.

If you have a good reason for preferring InfluxDB 2 then it's your system and your rules. But please don't say you weren't warned.

This remainder of this gist focuses on InfluxDB 1.8 exclusively.

Task goal

Given this MQTT message structure:

  topic: /site/topic
message: {"b": true, "i": 123, "r": 456.78, "s": "hello world", "t": "tagValue"}

the task is to configure the Node-Red instance to:

  1. Listen for the topic "/site/topic";
  2. Parse the JSON-format message payload; and
  3. Add a row to a measurement (table) named "example" in an InfluxDB database named "test".

This recipe also demonstrates how to use abbreviated keys in a JSON message and map those to more meaningful field names (attributes) in the InfluxDB database:

JSON Key Influx Field Influx Tag Expected Value Implied type
b flag true Boolean
i discrete 123 Integer
r continuous 456.78 Real
s message "hello world" String
t identity "tagValue" String

There is nothing magical about either these JSON keys, or the InfluxDB field or tag names. JSON keys do not have to be single characters, and InfluxDB names do not have to be long, meaningful or descriptive. It is just that, in general, IoT devices are memory-constrained so short strings are to be preferred over longer strings; while "meaningful and descriptive" are considered best practice in a database environment.

There is also nothing magical about the choice of these implied data types. Booleans, numbers (both integers and reals) and strings are just what you usually need to pass between a client device and a database.

Preparation

client device (simulated)

During testing you will find it useful to have a computer with an MQTT client installed. Explaining the full how-to of this is beyond the scope of this recipe so you should start at mosquitto.org/download.

On Linux, it is not immediately obvious that you need either or both of the following:

$ sudo apt install mosquitto-clients
$ sudo apt install mosquitto

The first command installs the MQTT publishing and subscribing command-line clients. You will need this package to get the mosquitto_pub command.

The second command installs the MQTT broker. You will only need this if you do not have another MQTT broker running somewhere (eg in a Docker container).

InfluxDB

If your copy of InfluxDB is running inside a Docker container, consider adding this alias statement to your .profile or .bashrc:

$ alias influx='docker exec -it influxdb influx -precision=rfc3339'

That alias allows you to connect to the "influx" command line interface (CLI) simply by typing:

$ influx

By default, "influx" displays time as nanoseconds since 1970-01-01 UTC. The -precision=rfc3339 argument displays time in human-readable form.

The "test" database must be created by hand. If you omit this step you will get an error from Node-Red. You can initialise the database like this:

$ influx
> CREATE DATABASE test
> quit

Node-Red

  • Launch your browser and connect to your Node-Red server.
  • Use the main menu (three horizontal bars "≡" at the top, right of the Node-Red window) to open the Palette Manager:
    • select the "Nodes" tab and check whether node-red-contrib-influxdb is already installed. If it is not installed,
    • select the "Install" tab, then search for and install node-red-contrib-influxdb. If you prefer to install contributions from the command line, do that.

A three-node flow

Back in the Node-Red main window, click the "+" button to add a new empty flow. The default title will be something like "Flow 1". Double-click the title and rename the flow with something more descriptive, like "Influx Test". The actual name is not important.

1. "mqtt in" node

Drag an "mqtt in" node onto the canvas. Double-click to open and configure as follows:

mqtt-in-node configuration

  1. Select the option to add a new mqtt-broker.
  2. Click the pencil icon to open the server properties panel.
  3. Give the server a meaningful name (eg "Docker MQTT").
  4. Supply the network path to the host running Mosquitto:
    • In a Docker environment, this will be the name of the container running Mosquitto (eg "mosquitto").
    • In a non-Docker environment where Node-Red and Mosquitto are running on the same host, this will be the loopback address 127.0.0.1.
    • If Node-Red and Mosquitto are running on different hosts then this will be a static IP address or the fully-qualified domain name of the host running Mosquitto.
  5. Click "Add".
  6. Enter the topic string ("/site/topic").
  7. Set the "Output" popup to "a parsed JSON object".
  8. Enter a name for the node. This appears in the schematic and it is good practice to repeat the topic string.
  9. Click "Done" to complete the node setup.

All other fields can either be left at their default settings or changed to suit your requirements.

The purpose of this node is to:

  1. Listen to MQTT messages directed to "/site/topic"; and
  2. Convert the JSON string in the MQTT message body to a JavaScript object representation.

In other words, given an input of the JSON string specified in the task goal, the output from the node will be:

msg.payload = {
    b: true,
    i: 123,
    r: 456.78,
    s: "hello world"
    t: "tagValue"
}

tip - avoiding a common mistake

One common mistake is skipping step 7 above. Please go back and double-check that you have set the "Output" popup to "a parsed JSON object".

2. "change" node

Drag a "change" node onto the canvas.

Connect the outlet of the "mqtt in" node to the inlet of the "change" node.

Double-click the "change" node to open and configure as follows:

change-node configuration

  1. Enter a name for the node. This appears in the schematic and it is good practice to summarise the purpose of the node.

  2. A new change node contains a single rule to "Set msg.payload" but where the data type of the "to" field defaults to a string. Change the popup menu to a Java expression.

  3. Click the ellipsis ("…") to open the expression editor.

  4. Copy the expression below and paste it into this window.

    [
        {
            "flag": msg.payload.b,
            "discrete": msg.payload.i,
            "continuous": msg.payload.r,
            "message": msg.payload.s
        },{
            "identity": msg.payload.t
        }
    ]
    
  5. Click "Done".

  6. Click "Done" to complete the node setup.

The purpose of this node is to provide a cross-walk between the JSON keys ("b", "i", "r", "s" and "t"), and the field and tag names you need in the InfluxDB database. The basic pattern is:

[
   {
      fieldName : msg.payload.key,
      ...
   },{
      tagName : msg.payload.key,
      ...
   }
]

If you only want to pass fields, then omit the square brackets and the elements that describe the tag(s), like this:

{
   fieldName : msg.payload.key,
   ...
}

Note that it is feasible to omit this "change" node entirely. If you do that the JSON keys in the Node-Red "payload" variable will become the field names in the database. Before you take that shortcut, consider:

  • If an MQTT client starts providing unexpected JSON keys, those can easily pollute your InfluxDB database. Using a cross-walk between the expected JSON keys and the field and tag names you want in your database provides an effective barrier against unexpected keys.

  • You may wish to include keys in your MQTT payload that you do not want to wind up in your database. Good examples are millis() uptime, free heap and version numbers. Such values are usually ephemeral and only of interest at the moment when they are produced (and might only be produced when you are actively debugging your MQTT client). You can always see such values by subscribing to the MQTT feed or attaching a Debug node to the "mqtt in" node.

  • A "change" node simplifies the process of adding new tags and fields. You can:

    • Add a new key+value pair to the JSON payload being produced by your MQTT client, then
    • Attach a Debug node to the "mqtt in" node to confirm that you are receiving the expected data, then
    • Change the cross-walk when you are ready to start importing the data into your database.
  • If you want to include both tags and fields in your database, you really only have two options, either:

    • your MQTT client has to format the JSON payload correctly before transmission, or
    • you need a "change" node to implement a cross-walk.
  • Opting to do the work in your MQTT client effectively rules out the tactical use of ephemeral values and a step-wise refinement approach to development if you need to add new fields.

Given the output from the "mqtt in" node, the Javascript expression in the "change" node will result in:

msg.payload = [
   {
      flag: true,
      discrete: 123,
      continuous: 456.78,
      message: "hello world"
   },{
      identity: "tagValue"
   }
]

3. "influxdb out" node

Drag an "influxdb out" node onto the canvas.

Can't find the "influxdb out" node in the palette? Double-check that you installed "node-red-contrib-influxdb" as described above.

Connect the outlet of the "change" node to the inlet of the "influxdb out" node.

Double-click the "influxdb out" node to open and configure as follows:

influxdb-out-node configuration

  1. Enter a name for the node. This appears in the schematic. It is good practice to summarise the purpose of the node.
  2. From the "Server" popup menu, choose "Add new influxdb...".
  3. Click the pencil icon to open the server properties panel.
  4. I recommend leaving the "Name" field blank. If you do then the "Server" field in the previous panel will take on the "host:port/database" appearance shown at the end of the dotted line in the figure. You lose that valuable synopsis by supplying a name in this field.
  5. Set the Version popup to "1.x" (this gist does not cover InfluxDB 2; please see Some words about InfluxDB if you want to understand why).
  6. Supply the network path to the host running InfluxDB:
    • In a Docker environment, this will be the name of the container running InfluxDB (eg "influxdb").
    • In a non-Docker environment where Node-Red and InfluxDB are running on the same host, this will be the loopback address 127.0.0.1.
    • If Node-Red and InfluxDB are running on different hosts then this will be a static IP address or the fully-qualified domain name of the host running InfluxDB.
  7. The name of the InfluxDB database. This needs to be the same as you created earlier ("CREATE DATABASE test").
  8. Click "Add".
  9. Supply the name of the measurement you want to write to. This is analogous to a "table" in SQL parlance. The recommended name for this tutorial is "example".
  10. Click "Done" to complete the node setup.

All other fields can be left at their default settings or changed to suit your requirements.

Warning: InfluxDB database connections are global to Node-Red. Suppose you have an existing flow connected to the "test" database. When you create a new flow for a new database, it is very tempting to copy the "influxdb out" node from the old flow, paste it into the new flow, open it, click the pencil icon, and just change the database name. If you do that, you will break your old flow because it will refer to the new database. Always start from scratch by dragging a new "influxdb out" node onto the canvas.

Given the output from the "change" node, the practical effect of this node is:

$ influx
> USE test
> INSERT example,identity=tagValue flag=true,discrete=123,continuous=456.78,message="hello world"
> quit

Saving your work

Click the Deploy button near the top, right of the canvas.

Testing your work

Add two "debug" nodes to the canvas. Double-click each in turn and set its Output to "complete msg object". Connect the outlet of the "mqtt in" node to the first "debug" node, and the outlet of the "change" node to the second "debug" node. The final result should look something like this:

Node-Red test flow

Select the Debug panel (the controls below "Deploy").

Click "Deploy" to activate. Any errors will show up in the Debug panel.

Copy the following text to the clipboard then paste it into a text editor.

mosquitto_pub -h host -t '/site/topic' -m '{"b": true, "i": 123, "r": 456.78, "s": "hello world", "t": "tagValue"}'

Edit the "host" field to point to the server running your Mosquitto broker. This might be an IP address or a fully-qualified domain name.

Paste the text into a Terminal window on your client device and press return.

If all goes well, you will get two debug messages from Node-Red. The first is from the "mqtt in" node confirming receipt of the JSON payload:

▿ object
  topic: "/site/topic"
▿ payload: object
   b: true
   i: 123
   r: 456.78
   s: "hello world"
   t: "tagValue"

and the second is from the "change" node showing the effect of the cross-walk:

▿ object
  topic: "/site/topic"
▿ payload: array[2]
  ▿0: object
   flag: true
   discrete: 123
   continuous: 456.78
   message: "hello world"
  ▿1: object
   identity: "tagValue"

To confirm that the data made it all the way to the InfluxDB database:

$ influx
> USE test

> show measurements
name: measurements
name
----
example

> show series
key
---
example,identity=tagValue

> show tag keys
name: example
tagKey
------
identity

> show field keys
name: example
fieldKey   fieldType
--------   ---------
continuous float
discrete   float
flag       boolean
message    string

> select * from example
name: example
time                           continuous discrete flag identity message
----                           ---------- -------- ---- -------- -------
2020-02-12T03:56:07.844235334Z 456.78     123      true tagValue hello world

> quit

Cleaning up

You can either delete or deactivate the "debug" nodes in the Node-Red flow.

When you no longer need the test database, you can remove it like this:

$ influx
> DROP DATABASE test
> quit
@Paraphraser
Copy link
Author

Paraphraser commented Jul 13, 2021 via email

@Numex106
Copy link

@Paraphraser
I had two silly issues that are both fixed and it's working perfectly now, thanks!

  1. I forgot to select "complete msg object" in the debug node, leading to my initial question.
  2. I also didn't realize I had a pesky space at the end of my msg payload keys causing the change node to come up empty.

Thanks again for the fantastic write-up.

@NewHopeApiary
Copy link

Thanks for the write-up! This was very helpful for a newb.

(I did have to add a JSON node in between to get things to work right.)

@Paraphraser
Copy link
Author

@NewHopeApiary you probably missed a step when you first set up the MQTT-in node. If you look at "7" in the diagram you'll see the output needs to be set to "parsed JSON object".

Your way isn't "wrong" so there's no need to change it unless you want to. The result is exactly the same as you are getting by using the extra JSON node. It's just that "MQTT-in to JSON to rest of flow" is such a common pattern that the MQTT-in node lets you do it in a single step. Make sense?

@NewHopeApiary
Copy link

You are correct of course. :) I already had built out a flow and was just modifying it, so I missed that. Appreciate the reply, really wasn't expecting it. Again, thanks for sharing your knowledge!

@jpalecrim
Copy link

Hello @Paraphraser, Thanks a lot for your commitment and effort on this! This has been helping me a lot.
I'm facing a problem where i have 5 Weather Stations (IDs = WS1,WS2,WS3,WS4,WS5) sending readings from simple sensors.
I want to feed the InfluxDB with them and have a single dashboard page for each of them. This project could escalate more so i'm having troubles how to build a flow that could handle and differentiate the IDs so it can send the data for the correct Gauges and Database.

I saw above that you provided a simple flow in which you extract the Sensor ID and prepare the Influx Insert. Could you Explain a little more how can i Write the Change Node JSON for that? I'm not experienced with this language, to be honest. But it does seems to be a simple solution.

@Paraphraser
Copy link
Author

Basically, if the station identifier (WS1, WS2) etc is carried in the message payload, you can use it directly. The "Prepare influx insert" Change node is "set msg.payload to the JSON expression", where the expression field is something like:

[
    {
        "temperature": msg.payload.temp
    },{
        "station": msg.payload.station
    }
]

remembering that this double-array structure tells the Influx insert node that temperature is a field, and station is a tag.

However, if the station identifier is carried in the topic string like this:

/home/weatherstation/WS1

it seems that you need to use a separate Change node ahead of the "Prepare influx insert". I usually call that separate Change node "extract station". It is set up as "set msg.station to the JSON expression" where the expression field is something like:

$split(msg.topic,'/')[3]

Then, the expression in the "Prepare influx insert" node becomes:

[
    {
        "temperature": msg.payload.temp
    },{
        "station": msg.station
    }
]

I assume you see what's going on in:

$split(msg.topic,'/')[3]

It's saying "crack the topic string about the slash". If you use a leading slash on your topic (as in this example of "/home/weatherstation/WS1") you wind up with four elements in the array where the zeroth element is a null string so element 1 is "home" and element 3 is "WS1". If you don't use a leading slash (and opinions vary on whether you should or shouldn't) then a topic like "home/weatherstation/WS1" would see you aiming at element [2] for WS1. Make sense?

I also assume you can see that the "extract station" node is only setting the field "msg.station" and isn't interfering with "msg.payload", so msg.payload persists unchanged into the "Prepare influx insert" node where "msg.station" is also then available.

Last point. The MQTT in node should be a wildcard match. It should be looking for:

/home/weatherstation/+

Of course, it doesn't really matter where the station is:

/home/+/weatherstation/

will match on "/home/WS1/weatherstation". If the device reports against topics like this:

/home/WS1/weatherstation/temp
/home/WS1/weatherstation/humidity

then you'll probably be running two flows, one for temperature, one for humidity, with the temperature flow matching on:

/home/+/weatherstation/temp

I'm really making the point that you can adapt to whatever the sensor provides.

I hope that all makes sense.


I really do not understand why the whole thing can't be done in a single Change node. Something like:

[
    {
        "temperature": msg.payload.temp
    },{
        "station": $split(msg.topic,'/')[3]
    }
]

I tried a few variations on that theme but Node-RED always either grizzled about syntax errors or accepted the expression as valid syntax but it then didn't work. I gave up and took it to be a rule that I needed two nodes.

If you, or anyone else reading this, can nut-out how to do it all in a single node, please point me in the right direction. I use this double-Change-node pattern all over the place and it would be great if it could be reduced to a single Change node.

@tferrin
Copy link

tferrin commented Dec 29, 2022

@Paraphraser, Just a tip-of-the-hat "Thank You" to your gist. Your explanations of MQTT-relevant Node-Red nodes and answers to several questions has been extremely helpful. I especially value your tip about taking the time to figure out what you may not fully understand about InFlux DBs prior to savings lots of data. This caused me to re-configure my data flows into ones that are now more efficient. My only advice to others is to read (and re-read) this gist carefully because there are lots of details included here and it may take a few reads to fully comprehend all the subtle details.

@theDiverDK
Copy link

OMG this is an excellent description :)

After finding this, it was very easy to get it to actually work :)

Thanks

@F4Kbov
Copy link

F4Kbov commented Sep 28, 2023

Excellent gist. Some great tips in there. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment