Created
March 6, 2019 20:14
-
-
Save chrishannam/aa47f263a1193d08138fc94a56a3fb96 to your computer and use it in GitHub Desktop.
Read sensor data from python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Takes output from "sensors" command and writes it to | |
a file by default. Can also connect to Redis and Influxdb | |
""" | |
#!/usr/bin/python | |
import subprocess | |
import json | |
from datetime import datetime | |
from traceback import print_exc | |
# you can install these modules with pip if needed | |
try: | |
import redis | |
REDIS_CONNECTION = redis.StrictRedis(host='localhost', port=6379, db=0) | |
except ImportError: | |
pass | |
try: | |
from influxdb import InfluxDBClient | |
INFLUX_CLIENT = InfluxDBClient('localhost', 8086, 'USERNAME', 'PASSWORD', 'DATABASE') | |
except ImportError: | |
pass | |
IGNORE_FANS = ['fan1', 'fan3', 'fan4', 'fan5'] | |
DATA_FILE_LOCATION = '/tmp/fan_speeds.json' | |
# make True to update a influxdb server | |
USE_INFLUX = False | |
# make True to update a redis server | |
USE_REDIS = False | |
def fetch_sensor_data(): | |
try: | |
proc = subprocess.Popen(['sensors'], stdout=subprocess.PIPE, | |
close_fds=True) | |
results = proc.communicate()[0] | |
fans = {} | |
for line in results.split("\n"): | |
fan = fetch_fan(line) | |
if fan: | |
fans[fan.get('name', 'NO_NAME')] = float(fan.get('value', 0)) | |
for fan, speed in fans.iteritems(): | |
print("{0} currently at {1}RPM".format(fan, speed)) | |
write_data_file(fans) | |
if USE_REDIS: | |
write_data_redis(fans) | |
if USE_INFLUX: | |
write_data_influx(fans) | |
except Exception as exception: | |
print_exc() | |
print("Failed to get sensor data: {0}".format(exception.message)) | |
def write_data_influx(fans): | |
json_payload = [] | |
time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") | |
try: | |
for fan, speed in fans.iteritems(): | |
payload = { | |
"measurement": fan, | |
"tags": { | |
"host": "jarvis", | |
"location": "garage" | |
}, | |
"time": time, | |
"fields": { | |
"value": speed | |
} | |
} | |
json_payload.append(payload) | |
INFLUX_CLIENT.write_points(json_payload) | |
except Exception as exception: | |
print_exc() | |
print("Failed to write to Influx. Error {}".format(exception.message)) | |
def write_data_redis(fans): | |
try: | |
for fan, speed in fans.iteritems(): | |
REDIS_CONNECTION.set(fan, speed) | |
except Exception as exception: | |
print_exc() | |
print("Failed to write to Redis. Error {}".format(exception.message)) | |
def write_data_file(fans): | |
try: | |
fans['time'] = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") | |
with open(DATA_FILE_LOCATION, 'w') as fan_speed_file: | |
fan_speed_json = fan_speed_file.write(json.dumps(fans)) | |
fan_speed_file.close() | |
except Exception as exception: | |
print_exc() | |
print("Failed to write to data file. Error {}".format( | |
exception.message)) | |
def fetch_fan(line): | |
if not line.startswith('fan'): | |
return False | |
result = {} | |
try: | |
name, speed = line.split(':') | |
if name not in IGNORE_FANS: | |
result['name'] = name | |
result['value'] = speed.split('RPM')[0].strip() | |
except Exception as exception: | |
print_exc() | |
print ("Failed to get fan data from sensor output") | |
return False | |
return result | |
if __name__ == "__main__": | |
fetch_sensor_data() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment