Skip to content

Instantly share code, notes, and snippets.

@bassdread
Last active February 19, 2016 07:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bassdread/b860dda2dcf92c12b5e3 to your computer and use it in GitHub Desktop.
Save bassdread/b860dda2dcf92c12b5e3 to your computer and use it in GitHub Desktop.
Read data using pysensors and save it to a file.
#!/usr/bin/python
import collections
from datetime import datetime
import json
# https://pypi.python.org/pypi/PySensors/
# tl;dr: "sudo pip install pysensors"
import sensors
# you can install these modules with pip if needed
try:
import redis
REDIS_CONNECTION = redis.StrictRedis(host='localhost', port=6379, db=0)
except ImportError:
pass
try:
from influxdb import InfluxDBClient
INFLUX_CLIENT = InfluxDBClient('localhost', 8086, 'USERNAME', 'PASSWORD', 'DATABASE')
except ImportError:
pass
IGNORE_READINGS = ['fan1', 'fan3', 'fan4', 'fan5']
DATA_FILE_LOCATION = '/tmp/sensors_output.json'
USE_INFLUX = False
USE_REDIS = False
def fetch_sensor_data():
try:
sensors.init()
data = {}
for chip in sensors.iter_detected_chips():
for feature in chip:
# log stuff we care about
if feature.label not in IGNORE_READINGS:
data[feature.label] = round(feature.get_value(), 3)
sorted_data = collections.OrderedDict(sorted(data.items()))
write_data_file(sorted_data)
if USE_REDIS:
write_data_redis(sorted_data)
if USE_INFLUX:
write_data_influx(sorted_data)
for name, reading in sorted_data.iteritems():
print "{0}: {1}".format(name, reading)
except Exception as exception:
print_exc()
print "Failed to get sensor data: {0}".format(exception.message)
def write_data_influx(data):
json_payload = []
time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
try:
for name, value in data.iteritems():
payload = {
"measurement": name,
"tags": {
"host": "jarvis",
"location": "garage"
},
"time": time,
"fields": {
"value": value
}
}
json_payload.append(payload)
INFLUX_CLIENT.write_points(json_payload)
except Exception as exception:
print_exc()
print "Failed to write to Influx. Error {}".format(exception.message)
def write_data_redis(data):
try:
for name, value in data.iteritems():
REDIS_CONNECTION.set(name, value)
except Exception as exception:
print_exc()
print "Failed to write to Redis. Error {}".format(exception.message)
def write_data_file(data):
try:
data['time'] = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
with open(DATA_FILE_LOCATION, 'w') as data_file:
data_file.write(json.dumps(data))
data_file.close()
return True
except Exception as exception:
print_exc()
print "Failed to write to data file. Error {}".format(
exception.message)
if __name__ == "__main__":
fetch_sensor_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment