-
-
Save hospitableit/363e0dce227c6677b4768553f4aea059 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import requests | |
import json | |
import time | |
import logging | |
import logging.handlers | |
import os | |
import fcntl | |
import subprocess | |
from blinkstick import blinkstick | |
PrtgUser = 'API User Name' | |
PrtgPass = 'API Password' | |
PrtgServer = 'https://myprtgserver.com' | |
PrtgSensorDownQuery = PrtgServer + '/api/table.json?content=sensors&columns=device,sensor,status&filter_status=5&username=' + PrtgUser + '&passhash=' + PrtgPass | |
LogFilePath = '/var/log/prtg_led.log' | |
LogLevel = 'DEBUG' | |
PollingDelay = 60 | |
LEDBrightness = 40 | |
MorphDelay = 2000 | |
StartUpPause = 30 | |
USB_RESET = ord('U') << (4*2) | 20 | |
def setup_logging(log_level): | |
log = logging.getLogger('prtg_led_logger') | |
log.setLevel(logging.DEBUG) | |
file_logger = logging.handlers.WatchedFileHandler(LogFilePath) | |
file_logger.setLevel(getattr(logging, log_level, 'DEBUG')) | |
file_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
file_logger.setFormatter(file_format) | |
log.addHandler(file_logger) | |
return log | |
def query_prtg(api_query): | |
#Run Query | |
try: | |
logger.debug(f'Polling {PrtgServer}') | |
PrtgSensorDownResult = requests.get(api_query) | |
logger.debug(f'Result Code is {PrtgSensorDownResult.status_code}') | |
#Got a good response from PRTG? | |
if PrtgSensorDownResult.status_code == 200: | |
#Load the response into a Dictionary | |
PrtgSensorsDown = json.loads(PrtgSensorDownResult.content.decode('utf-8')) | |
PrtgSensorsDownCount = (PrtgSensorsDown['treesize']) | |
#Any PRTG Sensors Down? | |
if PrtgSensorsDownCount > 0: | |
logger.info(f'There are {PrtgSensorsDownCount} PRTG sensor errors') | |
return 'Yes' | |
else: | |
logger.debug('There are No PRTG sensor errors') | |
return 'No' | |
#Bad Request returned by PRTG? | |
elif PrtgSensorDownResult.status_code == 400: | |
logger.error('API Query Error - Bad Request') | |
return 'Bad' | |
#Unauthorized returned by PRTG? | |
elif PrtgSensorDownResult.status_code == 401: | |
logger.error('API Query Error - Unauthorized') | |
return 'UnAuth' | |
#Something else messed up? | |
else: | |
logger.error('Uncaught API Query Error') | |
return 'Error' | |
except requests.exceptions.ConnectionError as e: | |
logger.error('Polling error encountered') | |
logger.error(f'{e.args}') | |
return 'Error' | |
except: | |
logger.exception('Exception raised during polling') | |
return 'Error' | |
def morph_color(hexcode, brightness, duration, index): | |
r,g,b = bstick._hex_to_rgb(hexcode) | |
r = (brightness / 100.0 * r) | |
g = (brightness / 100.0 * g) | |
b = (brightness / 100.0 * b) | |
time.sleep(0.2) | |
try: | |
logger.debug(f'Setting LED {index} to R-{r} G-{g} B-{b} with a Duration of {duration}') | |
bstick.morph(red=r, green=g, blue=b, duration=duration, index=index) | |
logger.debug('Done') | |
except blinkstick.BlinkStickException as e: | |
logger.error(f'Error encountered while lighting up LED {index}') | |
logger.error(f'{e.args}') | |
time.sleep(1) | |
reset_blink_usb() | |
except: | |
logger.exception(f'Exception raised while lighting up LED {index}') | |
time.sleep(1) | |
reset_blink_usb() | |
def led_control(action, brightness, morphdelay, led_index): | |
if action == 'Yes': | |
#Red | |
morph_color("#ff0000", brightness, morphdelay, led_index) | |
elif action == 'No': | |
#Lime | |
morph_color("#00ff00", brightness, morphdelay, led_index) | |
elif action == 'Bad': | |
#Yellow | |
morph_color("#ffff00", brightness, morphdelay, led_index) | |
elif action == 'UnAuth': | |
#Purple | |
morph_color("#800080", brightness, morphdelay, led_index) | |
elif action == 'Error': | |
#Aqua | |
morph_color("#00ffff", brightness, morphdelay, led_index) | |
def led_off(): | |
try: | |
logger.debug('Turning off all LEDs') | |
for led in range(8): | |
bstick.set_color(name='black', index=led) | |
time.sleep(0.1) | |
except blinkstick.BlinkStickException as e: | |
logger.error(f'Error encountered while blacking out LED {led}') | |
logger.error(f'{e.args}') | |
time.sleep(1) | |
reset_blink_usb() | |
except: | |
logger.exception(f'Exception raised while blacking out LED {led}') | |
time.sleep(1) | |
reset_blink_usb() | |
def get_blink_usb(): | |
try: | |
proc = subprocess.Popen(['lsusb'], stdout=subprocess.PIPE) | |
out = proc.communicate()[0] | |
lines = out.split(b'\n') | |
for line in lines: | |
if b'Clay Logic' in line: | |
parts = line.split() | |
bus = parts[1].decode("utf-8") | |
dev = parts[3][:3].decode("utf-8") | |
blink_path = '/dev/bus/usb/%s/%s' % (bus, dev) | |
logger.info(f'Found Blinkstick at {blink_path} for reset') | |
return blink_path | |
except: | |
logger.exception('Exception finding USB path of Blinkstick') | |
def send_usb_reset(dev_path): | |
try: | |
fd = os.open(dev_path, os.O_WRONLY) | |
logger.info(f'Attempting USB device reset at {dev_path}') | |
fcntl.ioctl(fd, USB_RESET, 0) | |
logger.info(f'Succesfully reset USB device at {dev_path}') | |
except: | |
logger.exception(f'Exception occured while resetting USB device at {dev_path}') | |
finally: | |
try: | |
os.close(fd) | |
except: | |
logger.exception('USB reset has gone horribly wrong') | |
pass | |
def reset_blink_usb(): | |
usb_path = get_blink_usb() | |
if usb_path is None: | |
logger.error('Cant find a Blinkstick to Reset') | |
else: | |
send_usb_reset(usb_path) | |
if __name__ == '__main__': | |
logger = setup_logging(LogLevel) | |
logger.info(f'Log Level set to {LogLevel}') | |
logger.info('PRTG API LED Service Starting.....') | |
try: | |
bstick = blinkstick.find_first() | |
except: | |
logger.exception('Exception raised during BlinkStick discovery') | |
exit() | |
if bstick is None: | |
logger.error('No BlinkSticks found, exiting') | |
exit() | |
blink_desc = bstick.get_description() | |
blink_serial = bstick.get_serial() | |
logger.info(f'Found a {blink_desc} with Serial {blink_serial}') | |
logger.info(f'Polling {PrtgServer}') | |
logger.info(f'Quick {StartUpPause} second pause before we start') | |
time.sleep(StartUpPause) | |
logger.info('Commencing Polling') | |
led_index = 0 | |
try: | |
while True: | |
AnyPRTGErrors = query_prtg(PrtgSensorDownQuery) | |
logger.debug(f'PRTG Code is {AnyPRTGErrors}') | |
if led_index == 8: | |
led_index = 0 | |
led_off() | |
time.sleep(0.2) | |
led_control(AnyPRTGErrors, LEDBrightness, MorphDelay, led_index) | |
time.sleep(PollingDelay) | |
led_index = led_index + 1 | |
finally: | |
logger.info('Terminating Process') | |
logger.info('Shutting down LEDs') | |
for led in range(8): | |
bstick.set_color(name='black', index=led) | |
time.sleep(0.3) | |
logger.info('Shutdown Completed') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment