Created
December 2, 2019 01:54
-
-
Save shreddedbacon/d9205d431755076c352ad7f13af416a7 to your computer and use it in GitHub Desktop.
Light visualisation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# NeoPixel library strandtest example | |
# Author: Chris Davis (cdchris12@gmail.com) | |
# | |
# Showcases various animations on a strip of NeoPixels. | |
import time | |
from rpi_ws281x import Adafruit_NeoPixel, Color | |
# from collections import OrderedDict | |
import argparse | |
import random | |
import pika | |
import json | |
from multiprocessing import Process, Manager, Queue | |
from elasticsearch import Elasticsearch | |
# LED strip configuration: | |
#LED_COUNT = 144 # Number of LED pixels. | |
LED_COUNT = 720 # Number of LED pixels. | |
LED_PIN = 18 # GPIO pin connected to the pixels (18 uses PWM!). | |
#LED_PIN = 10 # GPIO pin connected to the pixels (10 uses SPI /dev/spidev0.0). | |
LED_FREQ_HZ = 1000000 # LED signal frequency in hertz (usually 800khz) | |
LED_DMA = 10 # DMA channel to use for generating signal (try 10) | |
LED_BRIGHTNESS = 155 # Set to 0 for darkest and 255 for brightest | |
LED_INVERT = False # True to invert the signal (when using NPN transistor level shift) | |
LED_CHANNEL = 0 # set to '1' for GPIOs 13, 19, 41, 45 or 53 | |
HIT_SIZE = 10 | |
black = Color(0,0,0) | |
def getRGBfromI(RGBint): | |
blue = RGBint & 255 | |
green = (RGBint >> 8) & 255 | |
red = (RGBint >> 16) & 255 | |
return {"red": red, "green": green, "blue": blue} | |
def setPixels(strip, position, color, speed): | |
# Fade old Pixels | |
for i in range(int(HIT_SIZE*(speed/1.25)), 0, -1): | |
currentPixel = strip.getPixelColorRGB(position-i) | |
strip.setPixelColorRGB(position-i, int(currentPixel.r/1.3), int(currentPixel.g/1.3), int(currentPixel.b/1.3)) | |
# Set New Pixels | |
for i in range(0, HIT_SIZE): | |
strip.setPixelColorRGB(position+i, int(color['red']/(1+(HIT_SIZE-i)*0.05)), int(color['green']/(1+(HIT_SIZE-i)*0.05)), int(color['blue']/(1+(HIT_SIZE-i)*0.05))) | |
def updateFlares(flares): | |
"""Shows Hits visualisation""" | |
while True: | |
for key in flares.keys(): | |
value = flares[key] | |
if value['position'] >= LED_COUNT+30: | |
# we are at then end of the strip, remove the current element | |
del flares[key] | |
else: | |
del flares[key] | |
flares[key] = {'position': int(value['position'] + value['speed']), 'color': value['color'], 'speed': value['speed']} | |
#print(len(flares)) | |
time.sleep(0.02) | |
def showLights(flares): | |
"""Shows Hits visualisation""" | |
strip = Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL) | |
# Intialize the library (must be called once before other functions). | |
strip.begin() | |
while True: | |
for key, value in flares.items(): | |
setPixels(strip, value['position'], value['color'], value['speed']) | |
strip.setBrightness(255) | |
strip.show() | |
#print(len(flares)) | |
#time.sleep(0.001) | |
# End def | |
def colorWipe(strip, color, wait_ms=50): | |
"""Wipe color across display a pixel at a time.""" | |
for i in range(0, strip.numPixels()): | |
strip.setPixelColor(i, color) | |
# End for | |
strip.show() | |
time.sleep(wait_ms/1000.0) | |
# End def | |
# this isn't used anymore | |
# def workerRabbitMQ(flares): | |
# connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) | |
# channel = connection.channel() | |
# channel.queue_declare(queue='hits') | |
# while True: | |
# method_frame, header_frame, body = channel.basic_get(queue='hits') | |
# if method_frame: | |
# print("Handling %r" % body) | |
# hits = json.loads(body) | |
# start_time = time.time() | |
# max_time = time.time() + 10 | |
# elapsed = 0 | |
# while time.time() < max_time: | |
# elapsed = time.time() - start_time | |
# sent = False | |
# for key, value in hits.items(): | |
# if value['time'] < elapsed and 'consumed' not in value: | |
# if sent == False: | |
# # print("added to flares") | |
# flares[key] = {'position': 0, 'color': {'red': value['color']['red'], 'green': value['color']['green'], 'blue': value['color']['blue']}, 'speed': value['speed']} | |
# sent=True | |
# # else: | |
# # print("skipped") | |
# hits[key]['consumed'] = True | |
# time.sleep(0.1) | |
# channel.basic_ack(method_frame.delivery_tag) | |
# print("Finished") | |
# else: | |
# print("No Data, waiting") | |
# sleep(0.2) | |
def workerHandleESResult(elasticResultQueue, flares): | |
while True: | |
elasticResult = elasticResultQueue.get() | |
if len(elasticResult) > 0: | |
print("Handling %r" % elasticResult) | |
hits = elasticResult | |
elasticResult = {} | |
start_time = time.time() | |
max_time = time.time() + 10 | |
elapsed = 0 | |
while time.time() < max_time: | |
elapsed = time.time() - start_time | |
sent = False | |
for key, value in hits.items(): | |
if value['time'] < elapsed and 'consumed' not in value: | |
if sent == False: | |
# print("added to flares") | |
flares[key] = {'position': 0, 'color': {'red': value['color']['red'], 'green': value['color']['green'], 'blue': value['color']['blue']}, 'speed': value['speed']} | |
sent=True | |
# else: | |
# print("skipped") | |
hits[key]['consumed'] = True | |
time.sleep(0.25) | |
print("Finished") | |
else: | |
print("No Data, waiting") | |
time.sleep(0.2) | |
def workerElastic(elasticResultQueue): | |
es = Elasticsearch( | |
['elasticsearch.server'], | |
http_auth=('esuser', 'espass'), | |
scheme="https", | |
port=443, | |
) | |
last_execution_time = 0 | |
while True: | |
current_time = time.time() | |
if last_execution_time < current_time - 10: | |
last_execution_time = time.time() | |
print("starting ES query") | |
try: | |
res = es.search(index="router-logs-project1-*,router-logs-project2-*,router-logs-project3-*,router-logs-project4-*", body= | |
{ | |
"from": 0, | |
"size": 200, | |
"query": { | |
"bool": { | |
"must": [{ | |
"exists": { | |
"field": "http_request" | |
} | |
}], | |
"filter":[{ | |
"range":{ | |
"@timestamp":{ | |
"gte": "now-10s" | |
} | |
} | |
}] | |
} | |
} | |
}) | |
image_exts = ['.jpg', '.jpeg', '.png', '.gif'] | |
file_exts = ['.pdf', '.doc', '.xls'] | |
css_exts = ['.css', '.js'] | |
elasticResult = {} | |
key = 0 | |
for doc in res['hits']['hits']: | |
http_verb = doc['_source']['http_verb'] | |
speed = 3 | |
direction = 0 | |
if "project1" in doc['_source']['openshift_project']: | |
color = getRGBfromI(0xFF1100) | |
elif "project2" in doc['_source']['openshift_project']: | |
color = getRGBfromI(0x00FF22) | |
elif "project3" in doc['_source']['openshift_project']: | |
color = getRGBfromI(0xFF00FF) | |
elif "project4" in doc['_source']['openshift_project']: | |
color = getRGBfromI(0x0000FF) | |
# POST not actually used | |
if http_verb == "POST": | |
direction = 1 | |
if any(ext in doc['_source']['http_request'] for ext in file_exts): | |
speed = 1 | |
elif any(ext in doc['_source']['http_request'] for ext in image_exts): | |
speed = 4 | |
elif any(ext in doc['_source']['http_request'] for ext in css_exts): | |
speed = 7 | |
item_time = (float(doc['_source']['haproxy_second']) % 10) + float("0."+doc['_source']['haproxy_milliseconds']) | |
index_id = key | |
key = key + 1 | |
elasticResult[index_id] = {"project": doc['_source']['openshift_project'], "color": color, "speed": speed, "time": item_time, "direction": direction} | |
elasticResultQueue.put(elasticResult) | |
print("finished ES query") | |
except elasticsearch.ElasticsearchException as es1: | |
print('error connecting to or querying against Elasticsearch') | |
else: | |
time.sleep(1) | |
def main(): | |
# Process arguments | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-c', '--clear', action='store_true', help='Clear the display on exit') | |
args = parser.parse_args() | |
# Create NeoPixel object with appropriate configuration. | |
print ('Press Ctrl-C to quit.') | |
if not args.clear: | |
print('Use "-c" argument to clear LEDs on exit') | |
# End if | |
try: | |
manager = Manager() | |
flares = manager.dict() | |
elasticResultQueue = Queue() | |
process1 = Process(target=workerHandleESResult, args=[elasticResultQueue, flares]) | |
process2 = Process(target=updateFlares, args=[flares]) | |
process3 = Process(target=showLights, args=[flares]) | |
process4 = Process(target=workerElastic, args=[elasticResultQueue]) | |
process1.start() | |
process2.start() | |
process3.start() | |
process4.start() | |
print(process1.pid) | |
print(process2.pid) | |
print(process3.pid) | |
print(process4.pid) | |
process1.join() | |
process2.join() | |
process3.join() | |
process4.join() | |
except KeyboardInterrupt: | |
if args.clear: | |
colorWipe(strip, Color(0,0,0), 0) | |
print() | |
# # End if | |
# End try/except | |
# End def | |
if __name__ == '__main__': | |
main() | |
# End if |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment