Mqtt to InfluxDB

Node-RED as bridge from MQTT to InfluxDB

My Wifi logger sends every hour sensor data for TEMPERATURE, DEWPOINT, HUMIDITY and VOLT (battery monitoring) to the MQTT broker on the Raspberry Pi. These measurements can be read in Node-RED by using the mqtt-node and subscribing to WINECELLAR/SUBTOPIC. Choose output option "a parsed JSON object" to get number formatting (instead of string) in the outgoing message. Here is the flow for visualizing the data in a simple Node-RED dashboard:

To get the data into a Grafana dashboard instead requires several steps. First, we have to send the data to InfluxDB:

We would like to combine these four mqqt-subtopics into one measurement with the name "winecellar" in the database "home" in InfluxDB:

pi@raspberrypi:~ $ influx

Connected to http://localhost:8086 version 1.8.10

InfluxDB shell version: 1.8.10

> use home

Using database home

> select * from winecellar

name: winecellar

time Volt dewpoint humidity temperature

---- ---- -------- -------- -----------

1651782122496481013 4.64 8.6 63.1 15.6

1651785729859770639 4.64 8.6 63.1 15.6

1651785729912447156 4.64 8.6 63 15.6

1651785729912796893 4.64 8.6 63 15.6

1651785729913761345 4.65 8.6 63 15.6

1651789348283298268 4.65 8.6 63 15.6

Get started with InfluxDB.

How to pack these four messages together in one "measurement" for InfluxDB? I developed the following Node-RED flow to get the job done: We subscribe to the topic winecellar/# using the multilevel wildcard #. This will result in four separate messages one after the other with the four mqtt topics as message topics, e.g. winecellar/temperature.

In the next step, we split off the first part of the topic. We want only the mqtt-subtopic as message-topic. In the function, in the field "On Message", we insert the following javascript:

var device = msg.topic.split("/");

msg.topic = device[1];

return msg;

Next, we join the four messages together with a join-node:

The resulting message contains a JS object with the correct format to be passed into the influxDB node:

Finally, we configure the influxDB-node as follows to send the data to the database home:

The following import data contains the whole flow:

[

{

"id": "7c0fabe1134a651a",

"type": "mqtt in",

"z": "bea4d2d8bf1128a7",

"name": "",

"topic": "winecellar/#",

"qos": "2",

"datatype": "json",

"broker": "b6f069be.1771d8",

"nl": false,

"rap": true,

"rh": 0,

"inputs": 0,

"x": 150,

"y": 480,

"wires": [

[

"97bcd43c8dd11d77",

"710a8f17dbe8e08b"

]

]

},

{

"id": "97bcd43c8dd11d77",

"type": "debug",

"z": "bea4d2d8bf1128a7",

"name": "",

"active": true,

"tosidebar": true,

"console": false,

"tostatus": false,

"complete": "payload",

"targetType": "msg",

"statusVal": "",

"statusType": "auto",

"x": 390,

"y": 420,

"wires": []

},

{

"id": "7186c0931b2a69c3",

"type": "join",

"z": "bea4d2d8bf1128a7",

"name": "join messages",

"mode": "custom",

"build": "object",

"property": "payload",

"propertyType": "msg",

"key": "topic",

"joiner": "\\n",

"joinerType": "str",

"accumulate": true,

"timeout": "",

"count": "4",

"reduceRight": false,

"reduceExp": "",

"reduceInit": "",

"reduceInitType": "",

"reduceFixup": "",

"x": 560,

"y": 480,

"wires": [

[

"cb7bd571fa2a6f23",

"96509d39b1e06a68"

]

]

},

{

"id": "cb7bd571fa2a6f23",

"type": "debug",

"z": "bea4d2d8bf1128a7",

"name": "",

"active": true,

"tosidebar": true,

"console": false,

"tostatus": false,

"complete": "payload",

"targetType": "msg",

"statusVal": "",

"statusType": "auto",

"x": 770,

"y": 420,

"wires": []

},

{

"id": "710a8f17dbe8e08b",

"type": "function",

"z": "bea4d2d8bf1128a7",

"name": "Split topic",

"func": "var device = msg.topic.split(\"/\");\n\nmsg.topic = device[1];\n\nreturn msg;",

"outputs": 1,

"noerr": 0,

"initialize": "",

"finalize": "",

"libs": [],

"x": 380,

"y": 480,

"wires": [

[

"7186c0931b2a69c3"

]

]

},

{

"id": "96509d39b1e06a68",

"type": "influxdb out",

"z": "bea4d2d8bf1128a7",

"influxdb": "98836e7869331dca",

"name": "winecellar",

"measurement": "winecellar",

"precision": "",

"retentionPolicy": "",

"database": "database",

"precisionV18FluxV20": "ms",

"retentionPolicyV18Flux": "",

"org": "organisation",

"bucket": "bucket",

"x": 760,

"y": 480,

"wires": []

},

{

"id": "b6f069be.1771d8",

"type": "mqtt-broker",

"name": "localhost",

"broker": "localhost",

"port": "1883",

"clientid": "",

"usetls": false,

"protocolVersion": "4",

"keepalive": "60",

"cleansession": true,

"birthTopic": "",

"birthQos": "0",

"birthPayload": "",

"birthMsg": {},

"closeTopic": "",

"closeQos": "0",

"closePayload": "",

"closeMsg": {},

"willTopic": "",

"willQos": "0",

"willPayload": "",

"willMsg": {},

"sessionExpiry": ""

},

{

"id": "98836e7869331dca",

"type": "influxdb",

"hostname": "127.0.0.1",

"port": "8086",

"protocol": "http",

"database": "home",

"name": "",

"usetls": false,

"tls": "d50d0c9f.31e858",

"influxdbVersion": "1.x",

"url": "http://localhost:8086",

"rejectUnauthorized": true

},

{

"id": "d50d0c9f.31e858",

"type": "tls-config",

"name": "",

"cert": "",

"key": "",

"ca": "",

"certname": "",

"keyname": "",

"caname": "",

"servername": "",

"verifyservercert": false

}

]

© 2022 notthemarsian