Skip to content

Instantly share code, notes, and snippets.

@chrishannam
Created March 6, 2019 20:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chrishannam/aa47f263a1193d08138fc94a56a3fb96 to your computer and use it in GitHub Desktop.
Save chrishannam/aa47f263a1193d08138fc94a56a3fb96 to your computer and use it in GitHub Desktop.
Read sensor data from python
""" Takes output from "sensors" command and writes it to
a file by default. Can also connect to Redis and Influxdb
"""
#!/usr/bin/python
import subprocess
import json
from datetime import datetime
from traceback import print_exc
# you can install these modules with pip if needed
try:
import redis
REDIS_CONNECTION = redis.StrictRedis(host='localhost', port=6379, db=0)
except ImportError:
pass
try:
from influxdb import InfluxDBClient
INFLUX_CLIENT = InfluxDBClient('localhost', 8086, 'USERNAME', 'PASSWORD', 'DATABASE')
except ImportError:
pass
IGNORE_FANS = ['fan1', 'fan3', 'fan4', 'fan5']
DATA_FILE_LOCATION = '/tmp/fan_speeds.json'
# make True to update a influxdb server
USE_INFLUX = False
# make True to update a redis server
USE_REDIS = False
def fetch_sensor_data():
try:
proc = subprocess.Popen(['sensors'], stdout=subprocess.PIPE,
close_fds=True)
results = proc.communicate()[0]
fans = {}
for line in results.split("\n"):
fan = fetch_fan(line)
if fan:
fans[fan.get('name', 'NO_NAME')] = float(fan.get('value', 0))
for fan, speed in fans.iteritems():
print("{0} currently at {1}RPM".format(fan, speed))
write_data_file(fans)
if USE_REDIS:
write_data_redis(fans)
if USE_INFLUX:
write_data_influx(fans)
except Exception as exception:
print_exc()
print("Failed to get sensor data: {0}".format(exception.message))
def write_data_influx(fans):
json_payload = []
time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
try:
for fan, speed in fans.iteritems():
payload = {
"measurement": fan,
"tags": {
"host": "jarvis",
"location": "garage"
},
"time": time,
"fields": {
"value": speed
}
}
json_payload.append(payload)
INFLUX_CLIENT.write_points(json_payload)
except Exception as exception:
print_exc()
print("Failed to write to Influx. Error {}".format(exception.message))
def write_data_redis(fans):
try:
for fan, speed in fans.iteritems():
REDIS_CONNECTION.set(fan, speed)
except Exception as exception:
print_exc()
print("Failed to write to Redis. Error {}".format(exception.message))
def write_data_file(fans):
try:
fans['time'] = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
with open(DATA_FILE_LOCATION, 'w') as fan_speed_file:
fan_speed_json = fan_speed_file.write(json.dumps(fans))
fan_speed_file.close()
except Exception as exception:
print_exc()
print("Failed to write to data file. Error {}".format(
exception.message))
def fetch_fan(line):
if not line.startswith('fan'):
return False
result = {}
try:
name, speed = line.split(':')
if name not in IGNORE_FANS:
result['name'] = name
result['value'] = speed.split('RPM')[0].strip()
except Exception as exception:
print_exc()
print ("Failed to get fan data from sensor output")
return False
return result
if __name__ == "__main__":
fetch_sensor_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment