Skip to content

Instantly share code, notes, and snippets.

@sysarcher
Created June 27, 2019 16:40
Show Gist options
  • Save sysarcher/769934edc179902d7206d165d18ea2a5 to your computer and use it in GitHub Desktop.
Save sysarcher/769934edc179902d7206d165d18ea2a5 to your computer and use it in GitHub Desktop.
A simple bridge I was testing
"""A MQTT to InfluxDB Bridge
This script receives MQTT data and saves those to InfluxDB.
"""
#import re
import json
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
INFLUXDB_ADDRESS = 'localhost'
#INFLUXDB_USER = 'root'
#INFLUXDB_PASSWORD = 'root'
INFLUXDB_DATABASE = 'family'
MQTT_ADDRESS = 'localhost'
MQTT_USER = 'mqttuser'
#MQTT_PASSWORD = 'mqttpassword'
MQTT_TOPIC = 'family' # [bme280|mijia]/[temperature|humidity|battery|status]
#MQTT_REGEX = 'home/([^/]+)/([^/]+)'
MQTT_CLIENT_ID = 'MQTTInfluxDBBridge'
influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086)
def on_connect(client, userdata, flags, rc):
""" The callback for when the client receives a CONNACK response from the server."""
print('Connected with result code ' + str(rc))
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
"""The callback for when a PUBLISH message is received from the server."""
print(msg.topic + ' ' + str(msg.payload))
sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
if sensor_data is not None:
_send_sensor_data_to_influxdb(sensor_data)
def _parse_mqtt_message(topic, payload):
return json.loads(payload)
def _send_sensor_data_to_influxdb(sensor_data):
json_body = [
{
'measurement': 'push events',
'tags': {
'event': 'family day'
},
'fields': {
'color': sensor_data['color'],
'reaction_time': sensor_data['reaction']
}
}
]
influxdb_client.write_points(json_body)
def _init_influxdb_database():
databases = influxdb_client.get_list_database()
if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
influxdb_client.create_database(INFLUXDB_DATABASE)
influxdb_client.switch_database(INFLUXDB_DATABASE)
def main():
_init_influxdb_database()
mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
#mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_ADDRESS, 1883)
mqtt_client.loop_forever()
if __name__ == '__main__':
print('MQTT to InfluxDB bridge')
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment