Mqtt to InfluxDB
Node-RED as bridge from MQTT to InfluxDB
My Wifi logger sends every hour sensor data for TEMPERATURE, DEWPOINT, HUMIDITY and VOLT (battery monitoring) to the MQTT broker on the Raspberry Pi. These measurements can be read in Node-RED by using the mqtt-node and subscribing to WINECELLAR/SUBTOPIC. Choose output option "a parsed JSON object" to get number formatting (instead of string) in the outgoing message. Here is the flow for visualizing the data in a simple Node-RED dashboard:
To get the data into a Grafana dashboard instead requires several steps. First, we have to send the data to InfluxDB:
We would like to combine these four mqqt-subtopics into one measurement with the name "winecellar" in the database "home" in InfluxDB:
pi@raspberrypi:~ $ influx
Connected to http://localhost:8086 version 1.8.10
InfluxDB shell version: 1.8.10
> use home
Using database home
> select * from winecellar
name: winecellar
time Volt dewpoint humidity temperature
---- ---- -------- -------- -----------
1651782122496481013 4.64 8.6 63.1 15.6
1651785729859770639 4.64 8.6 63.1 15.6
1651785729912447156 4.64 8.6 63 15.6
1651785729912796893 4.64 8.6 63 15.6
1651785729913761345 4.65 8.6 63 15.6
1651789348283298268 4.65 8.6 63 15.6
How to pack these four messages together in one "measurement" for InfluxDB? I developed the following Node-RED flow to get the job done: We subscribe to the topic winecellar/# using the multilevel wildcard #. This will result in four separate messages one after the other with the four mqtt topics as message topics, e.g. winecellar/temperature.
In the next step, we split off the first part of the topic. We want only the mqtt-subtopic as message-topic. In the function, in the field "On Message", we insert the following javascript:
var device = msg.topic.split("/");
msg.topic = device[1];
return msg;
Next, we join the four messages together with a join-node:
The resulting message contains a JS object with the correct format to be passed into the influxDB node:
Finally, we configure the influxDB-node as follows to send the data to the database home:
The following import data contains the whole flow:
[
{
"id": "7c0fabe1134a651a",
"type": "mqtt in",
"z": "bea4d2d8bf1128a7",
"name": "",
"topic": "winecellar/#",
"qos": "2",
"datatype": "json",
"broker": "b6f069be.1771d8",
"nl": false,
"rap": true,
"rh": 0,
"inputs": 0,
"x": 150,
"y": 480,
"wires": [
[
"97bcd43c8dd11d77",
"710a8f17dbe8e08b"
]
]
},
{
"id": "97bcd43c8dd11d77",
"type": "debug",
"z": "bea4d2d8bf1128a7",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 390,
"y": 420,
"wires": []
},
{
"id": "7186c0931b2a69c3",
"type": "join",
"z": "bea4d2d8bf1128a7",
"name": "join messages",
"mode": "custom",
"build": "object",
"property": "payload",
"propertyType": "msg",
"key": "topic",
"joiner": "\\n",
"joinerType": "str",
"accumulate": true,
"timeout": "",
"count": "4",
"reduceRight": false,
"reduceExp": "",
"reduceInit": "",
"reduceInitType": "",
"reduceFixup": "",
"x": 560,
"y": 480,
"wires": [
[
"cb7bd571fa2a6f23",
"96509d39b1e06a68"
]
]
},
{
"id": "cb7bd571fa2a6f23",
"type": "debug",
"z": "bea4d2d8bf1128a7",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 770,
"y": 420,
"wires": []
},
{
"id": "710a8f17dbe8e08b",
"type": "function",
"z": "bea4d2d8bf1128a7",
"name": "Split topic",
"func": "var device = msg.topic.split(\"/\");\n\nmsg.topic = device[1];\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 380,
"y": 480,
"wires": [
[
"7186c0931b2a69c3"
]
]
},
{
"id": "96509d39b1e06a68",
"type": "influxdb out",
"z": "bea4d2d8bf1128a7",
"influxdb": "98836e7869331dca",
"name": "winecellar",
"measurement": "winecellar",
"precision": "",
"retentionPolicy": "",
"database": "database",
"precisionV18FluxV20": "ms",
"retentionPolicyV18Flux": "",
"org": "organisation",
"bucket": "bucket",
"x": 760,
"y": 480,
"wires": []
},
{
"id": "b6f069be.1771d8",
"type": "mqtt-broker",
"name": "localhost",
"broker": "localhost",
"port": "1883",
"clientid": "",
"usetls": false,
"protocolVersion": "4",
"keepalive": "60",
"cleansession": true,
"birthTopic": "",
"birthQos": "0",
"birthPayload": "",
"birthMsg": {},
"closeTopic": "",
"closeQos": "0",
"closePayload": "",
"closeMsg": {},
"willTopic": "",
"willQos": "0",
"willPayload": "",
"willMsg": {},
"sessionExpiry": ""
},
{
"id": "98836e7869331dca",
"type": "influxdb",
"hostname": "127.0.0.1",
"port": "8086",
"protocol": "http",
"database": "home",
"name": "",
"usetls": false,
"tls": "d50d0c9f.31e858",
"influxdbVersion": "1.x",
"url": "http://localhost:8086",
"rejectUnauthorized": true
},
{
"id": "d50d0c9f.31e858",
"type": "tls-config",
"name": "",
"cert": "",
"key": "",
"ca": "",
"certname": "",
"keyname": "",
"caname": "",
"servername": "",
"verifyservercert": false
}
]
© 2022 notthemarsian