#!/usr/bin/python3 | |
import requests | |
import json | |
import time | |
import logging | |
import logging.handlers | |
import os | |
import fcntl | |
import subprocess | |
from blinkstick import blinkstick | |
PrtgUser = 'API User Name' | |
PrtgPass = 'API Password' | |
PrtgServer = 'https://myprtgserver.com' | |
PrtgSensorDownQuery = PrtgServer + '/api/table.json?content=sensors&columns=device,sensor,status&filter_status=5&username=' + PrtgUser + '&passhash=' + PrtgPass | |
LogFilePath = '/var/log/prtg_led.log' | |
LogLevel = 'DEBUG' | |
PollingDelay = 60 | |
LEDBrightness = 40 | |
MorphDelay = 2000 | |
StartUpPause = 30 | |
USB_RESET = ord('U') << (4*2) | 20 | |
def setup_logging(log_level): | |
log = logging.getLogger('prtg_led_logger') | |
log.setLevel(logging.DEBUG) | |
file_logger = logging.handlers.WatchedFileHandler(LogFilePath) | |
file_logger.setLevel(getattr(logging, log_level, 'DEBUG')) | |
file_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
file_logger.setFormatter(file_format) | |
log.addHandler(file_logger) | |
return log | |
def query_prtg(api_query): | |
#Run Query | |
try: | |
logger.debug(f'Polling {PrtgServer}') | |
PrtgSensorDownResult = requests.get(api_query) | |
logger.debug(f'Result Code is {PrtgSensorDownResult.status_code}') | |
#Got a good response from PRTG? | |
if PrtgSensorDownResult.status_code == 200: | |
#Load the response into a Dictionary | |
PrtgSensorsDown = json.loads(PrtgSensorDownResult.content.decode('utf-8')) | |
PrtgSensorsDownCount = (PrtgSensorsDown['treesize']) | |
#Any PRTG Sensors Down? | |
if PrtgSensorsDownCount > 0: | |
logger.info(f'There are {PrtgSensorsDownCount} PRTG sensor errors') | |
return 'Yes' | |
else: | |
logger.debug('There are No PRTG sensor errors') | |
return 'No' | |
#Bad Request returned by PRTG? | |
elif PrtgSensorDownResult.status_code == 400: | |
logger.error('API Query Error - Bad Request') | |
return 'Bad' | |
#Unauthorized returned by PRTG? | |
elif PrtgSensorDownResult.status_code == 401: | |
logger.error('API Query Error - Unauthorized') | |
return 'UnAuth' | |
#Something else messed up? | |
else: | |
logger.error('Uncaught API Query Error') | |
return 'Error' | |
except requests.exceptions.ConnectionError as e: | |
logger.error('Polling error encountered') | |
logger.error(f'{e.args}') | |
return 'Error' | |
except: | |
logger.exception('Exception raised during polling') | |
return 'Error' | |
def morph_color(hexcode, brightness, duration, index): | |
r,g,b = bstick._hex_to_rgb(hexcode) | |
r = (brightness / 100.0 * r) | |
g = (brightness / 100.0 * g) | |
b = (brightness / 100.0 * b) | |
time.sleep(0.2) | |
try: | |
logger.debug(f'Setting LED {index} to R-{r} G-{g} B-{b} with a Duration of {duration}') | |
bstick.morph(red=r, green=g, blue=b, duration=duration, index=index) | |
logger.debug('Done') | |
except blinkstick.BlinkStickException as e: | |
logger.error(f'Error encountered while lighting up LED {index}') | |
logger.error(f'{e.args}') | |
time.sleep(1) | |
reset_blink_usb() | |
except: | |
logger.exception(f'Exception raised while lighting up LED {index}') | |
time.sleep(1) | |
reset_blink_usb() | |
def led_control(action, brightness, morphdelay, led_index): | |
if action == 'Yes': | |
#Red | |
morph_color("#ff0000", brightness, morphdelay, led_index) | |
elif action == 'No': | |
#Lime | |
morph_color("#00ff00", brightness, morphdelay, led_index) | |
elif action == 'Bad': | |
#Yellow | |
morph_color("#ffff00", brightness, morphdelay, led_index) | |
elif action == 'UnAuth': | |
#Purple | |
morph_color("#800080", brightness, morphdelay, led_index) | |
elif action == 'Error': | |
#Aqua | |
morph_color("#00ffff", brightness, morphdelay, led_index) | |
def led_off(): | |
try: | |
logger.debug('Turning off all LEDs') | |
for led in range(8): | |
bstick.set_color(name='black', index=led) | |
time.sleep(0.1) | |
except blinkstick.BlinkStickException as e: | |
logger.error(f'Error encountered while blacking out LED {led}') | |
logger.error(f'{e.args}') | |
time.sleep(1) | |
reset_blink_usb() | |
except: | |
logger.exception(f'Exception raised while blacking out LED {led}') | |
time.sleep(1) | |
reset_blink_usb() | |
def get_blink_usb(): | |
try: | |
proc = subprocess.Popen(['lsusb'], stdout=subprocess.PIPE) | |
out = proc.communicate()[0] | |
lines = out.split(b'\n') | |
for line in lines: | |
if b'Clay Logic' in line: | |
parts = line.split() | |
bus = parts[1].decode("utf-8") | |
dev = parts[3][:3].decode("utf-8") | |
blink_path = '/dev/bus/usb/%s/%s' % (bus, dev) | |
logger.info(f'Found Blinkstick at {blink_path} for reset') | |
return blink_path | |
except: | |
logger.exception('Exception finding USB path of Blinkstick') | |
def send_usb_reset(dev_path): | |
try: | |
fd = os.open(dev_path, os.O_WRONLY) | |
logger.info(f'Attempting USB device reset at {dev_path}') | |
fcntl.ioctl(fd, USB_RESET, 0) | |
logger.info(f'Succesfully reset USB device at {dev_path}') | |
except: | |
logger.exception(f'Exception occured while resetting USB device at {dev_path}') | |
finally: | |
try: | |
os.close(fd) | |
except: | |
logger.exception('USB reset has gone horribly wrong') | |
pass | |
def reset_blink_usb(): | |
usb_path = get_blink_usb() | |
if usb_path is None: | |
logger.error('Cant find a Blinkstick to Reset') | |
else: | |
send_usb_reset(usb_path) | |
if __name__ == '__main__': | |
logger = setup_logging(LogLevel) | |
logger.info(f'Log Level set to {LogLevel}') | |
logger.info('PRTG API LED Service Starting.....') | |
try: | |
bstick = blinkstick.find_first() | |
except: | |
logger.exception('Exception raised during BlinkStick discovery') | |
exit() | |
if bstick is None: | |
logger.error('No BlinkSticks found, exiting') | |
exit() | |
blink_desc = bstick.get_description() | |
blink_serial = bstick.get_serial() | |
logger.info(f'Found a {blink_desc} with Serial {blink_serial}') | |
logger.info(f'Polling {PrtgServer}') | |
logger.info(f'Quick {StartUpPause} second pause before we start') | |
time.sleep(StartUpPause) | |
logger.info('Commencing Polling') | |
led_index = 0 | |
try: | |
while True: | |
AnyPRTGErrors = query_prtg(PrtgSensorDownQuery) | |
logger.debug(f'PRTG Code is {AnyPRTGErrors}') | |
if led_index == 8: | |
led_index = 0 | |
led_off() | |
time.sleep(0.2) | |
led_control(AnyPRTGErrors, LEDBrightness, MorphDelay, led_index) | |
time.sleep(PollingDelay) | |
led_index = led_index + 1 | |
finally: | |
logger.info('Terminating Process') | |
logger.info('Shutting down LEDs') | |
for led in range(8): | |
bstick.set_color(name='black', index=led) | |
time.sleep(0.3) | |
logger.info('Shutdown Completed') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment