Skip to content

Instantly share code, notes, and snippets.

@koma5
Created January 10, 2017 11:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save koma5/499bf51442ebf23bac718c3c041c15a7 to your computer and use it in GitHub Desktop.
Save koma5/499bf51442ebf23bac718c3c041c15a7 to your computer and use it in GitHub Desktop.
import pymongo, mosquitto, os
from pymongo import MongoClient
from datetime import datetime, time
mqttHost = 'mqtt'
mongoHost = '127.0.0.1'
pidFile = '/var/run/record2MongodbPid'
def isRunning(pid):
try:
pid = int(pid)
os.kill(pid, 0)
return True
except (OSError, ValueError):
return False
def castValue(value):
try:
42 + value # Test for a number (float or int)
r = value
except TypeError:
try:
r = int(value)
except ValueError: # it is eventually a float
try:
r = float(value)
except ValueError:
r = value
return r
def on_message(a, b, msg):
jetzt = datetime.now()
if (msg.topic != "vw/gps"):
collMessages.insert(
{ 'time': jetzt,
'tpc': msg.topic,
'msg': castValue(msg.payload)
})
# update 5 minute average
# update 10 minute average
# update 15 minute average
# update 30 minute average
# update daily average
valueToAverage = castValue(msg.payload)
if not isinstance(valueToAverage, str):
currentHour = datetime.combine(jetzt, time(jetzt.hour))
hourlyData = {
'$inc': { 'msgCount': 1, 'msgSum': valueToAverage }
}
query = {
'tpc': msg.topic,
'time': currentHour,
'rate': 'hourly',
'currentHour': jetzt.hour
}
collAverages.update(query, hourlyData, True)
# update daily average
midnight = datetime.combine(jetzt, time(0))
dayOfWeek = jetzt.isoweekday()
dailyData = {
'$inc': { 'msgCount': 1, 'msgSum': valueToAverage }
}
query = {
'tpc': msg.topic,
'time': midnight,
'rate': 'daily',
'dayOfWeek': dayOfWeek
}
collAverages.update(query, dailyData, True)
print(str(jetzt) + " : " + msg.topic + ": " + msg.payload)
mqttc = mosquitto.Mosquitto('mqttRecord-' + str(os.getpid()))
mqttc.on_message = on_message
try:
f = open(pidFile,'r')
runningPid = f.read()
f.close()
pidFileExists = True
except IOError:
pidFileExists = False
#not allready running
if (not pidFileExists or not isRunning(runningPid)):
mqttc.connect(mqttHost, 1883, 60)
topics = ['#']
[mqttc.subscribe(t, 0) for t in topics]
mongo = MongoClient(mongoHost)
db = mongo.mqttRecord
collMessages = db.messages
collAverages = db.averages
# create pid file
f = open(pidFile,'w')
f.write(str(os.getpid()))
f.close()
else:
print "not supposed to run, allready running here: " + runningPid
raise SystemExit(0)
try:
while True:
mqttc.loop()
except (KeyboardInterrupt):
print "\ntime to die.."
mqttc.disconnect()
os.remove(pidFile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment