Simple Python script (v2.7x) that subscribes to a MQTT broker topic and inserts the data into a mysql database for later querying. This is designed to be used with the
#!/usr/bin/env python
# September 2013
# by Matthew Bordignon, @bordignon on Twitter
# Simple Python script (v2.7x) that subscribes to a MQTT broker topic and inserts the topic into a mysql database
# This is designed for the project backend
import MySQLdb
import mosquitto
import json
import time
#mosquitto broker config
broker = 'mqtt.localdomain'
broker_port = 1883
broker_topic = '/test/location/#'
#broker_clientid = 'mqttuide2mysqlScript'
#mysql config
mysql_server = 'thebeast.localdomain'
mysql_username = 'root'
mysql_passwd = ''
mysql_db = 'mqtt'
#change table below.
# Open database connection
db = MySQLdb.connect(mysql_server, mysql_username, mysql_passwd, mysql_db)
# prepare a cursor object using cursor() method
cursor = db.cursor()
def on_connect(mosq, obj, rc):
print("rc: "+str(rc))
def on_message(mosq, obj, msg):
print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
vars_to_sql = []
keys_to_sql = []
list = []
list = json.loads(msg.payload)
for key,value in list.iteritems():
print ("")
print key, value
if key == 'tst':
print "time found"
print value
value = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(value)))
print value
value_type = type(value)
if value_type is not dict:
print "value_type is not dict"
if value_type is unicode:
print "value_type is unicode"
vars_to_sql.append(value.encode('ascii', 'ignore'))
keys_to_sql.append(key.encode('ascii', 'ignore'))
print "value_type is not unicode"
#add the msg.topic to the list as well
print "topic", msg.topic
addtopic = 'topic'
vars_to_sql.append(msg.topic.encode('ascii', 'ignore'))
keys_to_sql.append(addtopic.encode('ascii', 'ignore'))
keys_to_sql = ', '.join(keys_to_sql)
# Execute the SQL command
# change locations to the table you are using
queryText = "INSERT INTO locations(%s) VALUES %r"
queryArgs = (keys_to_sql, tuple(vars_to_sql))
cursor.execute(queryText % queryArgs)
print('Successfully Added record to mysql')
except MySQLdb.Error, e:
print "MySQL Error [%d]: %s" % (e.args[0], e.args[1])
except IndexError:
print "MySQL Error: %s" % str(e)
# Rollback in case there is any error
print('ERROR adding record to MYSQL')
def on_publish(mosq, obj, mid):
print("mid: "+str(mid))
def on_subscribe(mosq, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_log(mosq, obj, level, string):
# If you want to use a specific client id, use
#mqttc = mosquitto.Mosquitto(broker_clientid)
# but note that the client id must be unique on the broker. Leaving the client
# id parameter empty will generate a random id for you.
mqttc = mosquitto.Mosquitto()
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
# Uncomment to enable debug messages
mqttc.on_log = on_log
mqttc.connect(broker, broker_port, 60)
mqttc.subscribe(broker_topic, 0)
rc = 0
while rc == 0:
rc = mqttc.loop()
print("rc: "+str(rc))
# disconnect from server
print ('Disconnected, done.')
matbor commented Sep 12, 2013

note I haven't added the MQTT subscribe to topic part yet as that is easy, just trying to get it to add all the information to MYSQL correctly first.

jpmens commented Sep 12, 2013

Not much time at the moment, but maybe these things help:


import datetime
dt_epoch = datetime.datetime.fromtimestamp(list['tst'])  # now you have a datetime object


str_now = dt_epoch.isoformat()


import json
list = json.loads(payload)


You'll get the topic when you do the subscribe; it's delivered to you as a UTF-8 string which you just grab. Shout if you need help

matbor commented Sep 12, 2013

Just saw your msg, had fixed the json part, working on the time part (thx for the example), have updated the code above and added the broker part and all seems to be working atm!

Current Issues are; date and time 'tst' field need to convert it to mysql datetime stamp, also need to add the mqtt topic eg'/test/location' to the database.

matbor commented Sep 12, 2013

Fixed it. Time using mysql time and not epoch and topic will be saved in the database, probably not pretty code and I have a lot of print statements for debugging atm!

Still need to add clean exits.

im getting a constant error if i run the code.It says "attribute error float has no object iteritems".Can any one help me out?as soon as possible

I got this error after subscribing the data "Caught exception in on_message: No JSON object could be decoded". Could anyone please suggest a solution
Thank you

matbor commented Jul 25, 2019

I got this error after subscribing the data "Caught exception in on_message: No JSON object could be decoded". Could anyone please suggest a solution
Thank you

Hi, this isn’t maintained anymore, please use

