Skip to content

Instantly share code, notes, and snippets.

@dkmonaghan
Last active May 1, 2022 21:48
Show Gist options
  • Save dkmonaghan/66c9a10dc71986604f9dfcd7d078264c to your computer and use it in GitHub Desktop.
Save dkmonaghan/66c9a10dc71986604f9dfcd7d078264c to your computer and use it in GitHub Desktop.
Ingest Zigbee2MQTT readings into InfluxDB
#!/usr/bin/python3
from typing import NamedTuple
import os
import json
from pprint import pprint
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
INFLUXDB_ADDRESS = 'localhost'
INFLUXDB_DATABASE = 'metrics'
MQTT_ADDRESS = 'localhost'
MQTT_USER = 'sensor'
MQTT_PASSWORD = 'XXXXXXX'
MQTT_TOPIC = 'zigbee2mqtt/+'
MQTT_CLIENT_ID = 'MQTTInfluxDBBridge'
influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, None, None, None)
def on_connect(client, userdata, flags, rc):
""" The callback for when the client receives a CONNACK response from the server."""
print('Connected with result code ' + str(rc))
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
"""The callback for when a PUBLISH message is received from the server."""
print(msg.topic + ' ' + str(msg.payload))
if msg.payload == "":
return
payload = json.loads(msg.payload)
# Load into InfluxDB
for key, value in payload.items():
if key == "voltage": continue
if key == "temperature" or key == "humidity":
value = float(value)
if key == "battery":
value = int(value)
json_body = [{
'measurement': key,
'tags': {
'device': msg.topic
},
'fields': {
'value': value
}
}]
pprint(json_body)
influxdb_client.write_points(json_body)
def main():
influxdb_client.switch_database(INFLUXDB_DATABASE)
mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_ADDRESS, 1883)
mqtt_client.loop_forever()
if __name__ == '__main__':
print('MQTT to InfluxDB bridge')
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment