Skip to content

Instantly share code, notes, and snippets.

@matbor
Last active September 3, 2022 13:32
Show Gist options
  • Save matbor/6532185 to your computer and use it in GitHub Desktop.
Save matbor/6532185 to your computer and use it in GitHub Desktop.
Simple Python script (v2.7x) that subscribes to a MQTT broker topic and inserts the data into a mysql database for later querying. This is designed to be used with the http://mqttitude.org/
#!/usr/bin/env python
# September 2013
# by Matthew Bordignon, @bordignon on Twitter
#
# Simple Python script (v2.7x) that subscribes to a MQTT broker topic and inserts the topic into a mysql database
# This is designed for the http://mqttitude.org/ project backend
#
import MySQLdb
import mosquitto
import json
import time
#mosquitto broker config
broker = 'mqtt.localdomain'
broker_port = 1883
broker_topic = '/test/location/#'
#broker_clientid = 'mqttuide2mysqlScript'
#mysql config
mysql_server = 'thebeast.localdomain'
mysql_username = 'root'
mysql_passwd = ''
mysql_db = 'mqtt'
#change table below.
# Open database connection
db = MySQLdb.connect(mysql_server, mysql_username, mysql_passwd, mysql_db)
# prepare a cursor object using cursor() method
cursor = db.cursor()
def on_connect(mosq, obj, rc):
print("rc: "+str(rc))
def on_message(mosq, obj, msg):
print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
vars_to_sql = []
keys_to_sql = []
list = []
list = json.loads(msg.payload)
for key,value in list.iteritems():
print ("")
print key, value
if key == 'tst':
print "time found"
print value
value = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(value)))
print value
value_type = type(value)
if value_type is not dict:
print "value_type is not dict"
if value_type is unicode:
print "value_type is unicode"
vars_to_sql.append(value.encode('ascii', 'ignore'))
keys_to_sql.append(key.encode('ascii', 'ignore'))
else:
print "value_type is not unicode"
vars_to_sql.append(value)
keys_to_sql.append(key)
#add the msg.topic to the list as well
print "topic", msg.topic
addtopic = 'topic'
vars_to_sql.append(msg.topic.encode('ascii', 'ignore'))
keys_to_sql.append(addtopic.encode('ascii', 'ignore'))
keys_to_sql = ', '.join(keys_to_sql)
try:
# Execute the SQL command
# change locations to the table you are using
queryText = "INSERT INTO locations(%s) VALUES %r"
queryArgs = (keys_to_sql, tuple(vars_to_sql))
cursor.execute(queryText % queryArgs)
print('Successfully Added record to mysql')
db.commit()
except MySQLdb.Error, e:
try:
print "MySQL Error [%d]: %s" % (e.args[0], e.args[1])
except IndexError:
print "MySQL Error: %s" % str(e)
# Rollback in case there is any error
db.rollback()
print('ERROR adding record to MYSQL')
def on_publish(mosq, obj, mid):
print("mid: "+str(mid))
def on_subscribe(mosq, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_log(mosq, obj, level, string):
print(string)
# If you want to use a specific client id, use
#mqttc = mosquitto.Mosquitto(broker_clientid)
# but note that the client id must be unique on the broker. Leaving the client
# id parameter empty will generate a random id for you.
mqttc = mosquitto.Mosquitto()
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
# Uncomment to enable debug messages
mqttc.on_log = on_log
mqttc.connect(broker, broker_port, 60)
mqttc.subscribe(broker_topic, 0)
rc = 0
while rc == 0:
rc = mqttc.loop()
print("rc: "+str(rc))
# disconnect from server
print ('Disconnected, done.')
db.close()
@matbor
Copy link
Author

matbor commented Sep 12, 2013

note I haven't added the MQTT subscribe to topic part yet as that is easy, just trying to get it to add all the information to MYSQL correctly first.

@jpmens
Copy link

jpmens commented Sep 12, 2013

Not much time at the moment, but maybe these things help:

date

import datetime
dt_epoch = datetime.datetime.fromtimestamp(list['tst'])  # now you have a datetime object

example:

str_now = dt_epoch.isoformat()

JSON

import json
list = json.loads(payload)

Topic

You'll get the topic when you do the subscribe; it's delivered to you as a UTF-8 string which you just grab. Shout if you need help

@matbor
Copy link
Author

matbor commented Sep 12, 2013

Just saw your msg, had fixed the json part, working on the time part (thx for the example), have updated the code above and added the broker part and all seems to be working atm!

Current Issues are; date and time 'tst' field need to convert it to mysql datetime stamp, also need to add the mqtt topic eg'/test/location' to the database.

@matbor
Copy link
Author

matbor commented Sep 12, 2013

Fixed it. Time using mysql time and not epoch and topic will be saved in the database, probably not pretty code and I have a lot of print statements for debugging atm!

Still need to add clean exits.

@bk28051994
Copy link

im getting a constant error if i run the code.It says "attribute error float has no object iteritems".Can any one help me out?as soon as possible

@shiva8688
Copy link

I got this error after subscribing the data "Caught exception in on_message: No JSON object could be decoded". Could anyone please suggest a solution
Thank you

@matbor
Copy link
Author

matbor commented Jul 25, 2019

I got this error after subscribing the data "Caught exception in on_message: No JSON object could be decoded". Could anyone please suggest a solution
Thank you

Hi, this isn’t maintained anymore, please use https://github.com/owntracks/recorder

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment