Skip to content

Instantly share code, notes, and snippets.

@hospitableit
Last active May 21, 2020 02:57
Show Gist options
  • Save hospitableit/363e0dce227c6677b4768553f4aea059 to your computer and use it in GitHub Desktop.
Save hospitableit/363e0dce227c6677b4768553f4aea059 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import requests
import json
import time
import logging
import logging.handlers
import os
import fcntl
import subprocess
from blinkstick import blinkstick
PrtgUser = 'API User Name'
PrtgPass = 'API Password'
PrtgServer = 'https://myprtgserver.com'
PrtgSensorDownQuery = PrtgServer + '/api/table.json?content=sensors&columns=device,sensor,status&filter_status=5&username=' + PrtgUser + '&passhash=' + PrtgPass
LogFilePath = '/var/log/prtg_led.log'
LogLevel = 'DEBUG'
PollingDelay = 60
LEDBrightness = 40
MorphDelay = 2000
StartUpPause = 30
USB_RESET = ord('U') << (4*2) | 20
def setup_logging(log_level):
log = logging.getLogger('prtg_led_logger')
log.setLevel(logging.DEBUG)
file_logger = logging.handlers.WatchedFileHandler(LogFilePath)
file_logger.setLevel(getattr(logging, log_level, 'DEBUG'))
file_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_logger.setFormatter(file_format)
log.addHandler(file_logger)
return log
def query_prtg(api_query):
#Run Query
try:
logger.debug(f'Polling {PrtgServer}')
PrtgSensorDownResult = requests.get(api_query)
logger.debug(f'Result Code is {PrtgSensorDownResult.status_code}')
#Got a good response from PRTG?
if PrtgSensorDownResult.status_code == 200:
#Load the response into a Dictionary
PrtgSensorsDown = json.loads(PrtgSensorDownResult.content.decode('utf-8'))
PrtgSensorsDownCount = (PrtgSensorsDown['treesize'])
#Any PRTG Sensors Down?
if PrtgSensorsDownCount > 0:
logger.info(f'There are {PrtgSensorsDownCount} PRTG sensor errors')
return 'Yes'
else:
logger.debug('There are No PRTG sensor errors')
return 'No'
#Bad Request returned by PRTG?
elif PrtgSensorDownResult.status_code == 400:
logger.error('API Query Error - Bad Request')
return 'Bad'
#Unauthorized returned by PRTG?
elif PrtgSensorDownResult.status_code == 401:
logger.error('API Query Error - Unauthorized')
return 'UnAuth'
#Something else messed up?
else:
logger.error('Uncaught API Query Error')
return 'Error'
except requests.exceptions.ConnectionError as e:
logger.error('Polling error encountered')
logger.error(f'{e.args}')
return 'Error'
except:
logger.exception('Exception raised during polling')
return 'Error'
def morph_color(hexcode, brightness, duration, index):
r,g,b = bstick._hex_to_rgb(hexcode)
r = (brightness / 100.0 * r)
g = (brightness / 100.0 * g)
b = (brightness / 100.0 * b)
time.sleep(0.2)
try:
logger.debug(f'Setting LED {index} to R-{r} G-{g} B-{b} with a Duration of {duration}')
bstick.morph(red=r, green=g, blue=b, duration=duration, index=index)
logger.debug('Done')
except blinkstick.BlinkStickException as e:
logger.error(f'Error encountered while lighting up LED {index}')
logger.error(f'{e.args}')
time.sleep(1)
reset_blink_usb()
except:
logger.exception(f'Exception raised while lighting up LED {index}')
time.sleep(1)
reset_blink_usb()
def led_control(action, brightness, morphdelay, led_index):
if action == 'Yes':
#Red
morph_color("#ff0000", brightness, morphdelay, led_index)
elif action == 'No':
#Lime
morph_color("#00ff00", brightness, morphdelay, led_index)
elif action == 'Bad':
#Yellow
morph_color("#ffff00", brightness, morphdelay, led_index)
elif action == 'UnAuth':
#Purple
morph_color("#800080", brightness, morphdelay, led_index)
elif action == 'Error':
#Aqua
morph_color("#00ffff", brightness, morphdelay, led_index)
def led_off():
try:
logger.debug('Turning off all LEDs')
for led in range(8):
bstick.set_color(name='black', index=led)
time.sleep(0.1)
except blinkstick.BlinkStickException as e:
logger.error(f'Error encountered while blacking out LED {led}')
logger.error(f'{e.args}')
time.sleep(1)
reset_blink_usb()
except:
logger.exception(f'Exception raised while blacking out LED {led}')
time.sleep(1)
reset_blink_usb()
def get_blink_usb():
try:
proc = subprocess.Popen(['lsusb'], stdout=subprocess.PIPE)
out = proc.communicate()[0]
lines = out.split(b'\n')
for line in lines:
if b'Clay Logic' in line:
parts = line.split()
bus = parts[1].decode("utf-8")
dev = parts[3][:3].decode("utf-8")
blink_path = '/dev/bus/usb/%s/%s' % (bus, dev)
logger.info(f'Found Blinkstick at {blink_path} for reset')
return blink_path
except:
logger.exception('Exception finding USB path of Blinkstick')
def send_usb_reset(dev_path):
try:
fd = os.open(dev_path, os.O_WRONLY)
logger.info(f'Attempting USB device reset at {dev_path}')
fcntl.ioctl(fd, USB_RESET, 0)
logger.info(f'Succesfully reset USB device at {dev_path}')
except:
logger.exception(f'Exception occured while resetting USB device at {dev_path}')
finally:
try:
os.close(fd)
except:
logger.exception('USB reset has gone horribly wrong')
pass
def reset_blink_usb():
usb_path = get_blink_usb()
if usb_path is None:
logger.error('Cant find a Blinkstick to Reset')
else:
send_usb_reset(usb_path)
if __name__ == '__main__':
logger = setup_logging(LogLevel)
logger.info(f'Log Level set to {LogLevel}')
logger.info('PRTG API LED Service Starting.....')
try:
bstick = blinkstick.find_first()
except:
logger.exception('Exception raised during BlinkStick discovery')
exit()
if bstick is None:
logger.error('No BlinkSticks found, exiting')
exit()
blink_desc = bstick.get_description()
blink_serial = bstick.get_serial()
logger.info(f'Found a {blink_desc} with Serial {blink_serial}')
logger.info(f'Polling {PrtgServer}')
logger.info(f'Quick {StartUpPause} second pause before we start')
time.sleep(StartUpPause)
logger.info('Commencing Polling')
led_index = 0
try:
while True:
AnyPRTGErrors = query_prtg(PrtgSensorDownQuery)
logger.debug(f'PRTG Code is {AnyPRTGErrors}')
if led_index == 8:
led_index = 0
led_off()
time.sleep(0.2)
led_control(AnyPRTGErrors, LEDBrightness, MorphDelay, led_index)
time.sleep(PollingDelay)
led_index = led_index + 1
finally:
logger.info('Terminating Process')
logger.info('Shutting down LEDs')
for led in range(8):
bstick.set_color(name='black', index=led)
time.sleep(0.3)
logger.info('Shutdown Completed')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment