Skip to content

Instantly share code, notes, and snippets.

@LarsBergqvist
Last active October 27, 2020 02:14
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LarsBergqvist/5f3d5a738220a242d35f1113dbc6a277 to your computer and use it in GitHub Desktop.
Save LarsBergqvist/5f3d5a738220a242d35f1113dbc6a277 to your computer and use it in GitHub Desktop.
A service that subscribes to MQTT topics and stores the received data in an InfluxDB database
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import datetime
import time
from influxdb import InfluxDBClient
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("Home/#")
def on_message(client, userdata, msg):
print("Received a message on topic: " + msg.topic)
# Use utc as timestamp
receiveTime=datetime.datetime.utcnow()
message=msg.payload.decode("utf-8")
isfloatValue=False
try:
# Convert the string to a float so that it is stored as a number and not a string in the database
val = float(message)
isfloatValue=True
except:
print("Could not convert " + message + " to a float value")
isfloatValue=False
if isfloatValue:
print(str(receiveTime) + ": " + msg.topic + " " + str(val))
json_body = [
{
"measurement": msg.topic,
"time": receiveTime,
"fields": {
"value": val
}
}
]
dbclient.write_points(json_body)
print("Finished writing to InfluxDB")
# Set up a client for InfluxDB
dbclient = InfluxDBClient('192.168.1.16', 8086, 'root', 'root', 'sensordata')
# Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
try:
client.connect("192.168.1.16", 1883, 60)
connOK = True
except:
connOK = False
time.sleep(2)
# Blocking loop to the Mosquitto broker
client.loop_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment