Skip to content

Instantly share code, notes, and snippets.

@crystalin
Last active August 12, 2021 13:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save crystalin/eda405260e8903966898fd2836a625ee to your computer and use it in GitHub Desktop.
Save crystalin/eda405260e8903966898fd2836a625ee to your computer and use it in GitHub Desktop.
Playing with adafruit lights on Raspberry with python server
<head>
<script>
let actions = [];
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function setLight(position, red, blue, green) {
actions.push(position, red, blue, green);
}
function show() {
fetch(
`http://raspberrypi.local:5555/set_lights?${actions
.map((value) => `p%5B%5D=${value}`)
.join("&")}`,
{
method: "GET",
}
)
.then(function (response) {
// This is the JSON from our response
console.log(response.ok);
})
.catch(function (err) {
// There was an error
console.warn("Something went wrong.", err);
});
actions = [];
}
let timers = {};
const MAX_LIGHT_INTENSITY = 100; // 255 for max
const MAX_LIGHTS = 60;
const SNAKE_LENGTH = 10;
async function all_blue() {
// We loop over all the lights
for (let i = 0; i < MAX_LIGHTS; i++) {
setLight(i, 0, 0, 255);
}
// We activate the color changes.
await show();
}
async function all_red() {
// We loop over all the lights
for (let i = 0; i < MAX_LIGHTS; i++) {
setLight(i, 0, 255, 0);
}
// We activate the color changes.
await show();
}
async function two_snakes(loop_count) {
// We loop over the length of the snake (10)
for (let i = 0; i < SNAKE_LENGTH; i++) {
// We set the color for each light of the first snake
setLight(
(loop_count + i) % MAX_LIGHTS,
loop_count % MAX_LIGHT_INTENSITY,
MAX_LIGHT_INTENSITY - ((loop_count * 2) % MAX_LIGHT_INTENSITY),
0
);
// We set the color for each light of the second snake (starting at light 20)
setLight(
(loop_count + i + 20) % MAX_LIGHTS,
0,
MAX_LIGHT_INTENSITY - ((loop_count * 2) % MAX_LIGHT_INTENSITY),
MAX_LIGHT_INTENSITY
);
}
// We turn off the light before the snake.
setLight((loop_count + MAX_LIGHTS - 1) % MAX_LIGHTS, 0, 0, 0);
setLight((loop_count + MAX_LIGHTS + 19) % MAX_LIGHTS, 0, 0, 0);
// We wait until the lights are changed to go on.
await show();
// Will trigger another change in the lights in 30 milliseconds
timers["two_snakes"] = setTimeout(() => two_snakes(loop_count + 1), 30);
}
async function rainbow(loop_count) {
// Will trigger another change in the lights in 30 milliseconds
const frequency = 0.01 * (loop_count % MAX_LIGHTS);
for (let i = 0; i < MAX_LIGHTS; i++) {
// Formula for rainbow colors!
red = Math.sin(frequency * i + 0) * 127 + 128;
green = Math.sin(frequency * i + 2) * 127 + 128;
blue = Math.sin(frequency * i + 4) * 127 + 128;
// Math.floor transform the float number(ex: 53.23942) into integer (ex: 53)
setLight(i, Math.floor(red), Math.floor(green), Math.floor(blue));
}
// we wait until the lights are changed.
await show();
timers["rainbow"] = setTimeout(() => rainbow(loop_count + 1), 30);
}
function stop() {
for (const timer_name of Object.keys(timers)) {
clearTimeout(timers[timer_name]);
}
timers = {};
}
</script>
</head>
<body>
<button type="button" onClick="all_red()">All red</button>
<button type="button" onClick="all_blue()">All blue</button>
<br />
<hr />
LOOPS:
<br />
<button type="button" onClick="two_snakes(0)">Two Snakes</button>
<button type="button" onClick="rainbow(0)">Rainbow</button>
<button type="button" onClick="stop()">STOP</button>
</body>
from rpi_ws281x import *
from atexit import register as addtoexit
from queue import Queue
from threading import Thread
def close():
print("\nshutting down")
addtoexit(close)
# LED strip configuration:
LED_COUNT = 60 # Number of LED pixels.
LED_PIN = 18 # GPIO pin connected to the pixels (18 uses PWM!).
#LED_PIN = 10 # GPIO pin connected to the pixels (10 uses SPI /dev/spidev0.0).
LED_FREQ_HZ = 800000 # LED signal frequency in hertz (usually 800khz)
LED_DMA = 10 # DMA channel to use for generating signal (try 10)
LED_BRIGHTNESS = 255 # Set to 0 for darkest and 255 for brightest
LED_INVERT = False # True to invert the signal (when using NPN transistor level shift)
LED_CHANNEL = 0 # set to '1' for GPIOs 13, 19, 41, 45 or 53
strip = Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL)
q = Queue()
def consumer(in_q):
while True:
args = in_q.get()
for i in range(0, len(args), 4):
strip.setPixelColor(int(args[i]), Color(int(args[i+1]), int(args[i+2]), int(args[i+3])))
strip.show()
def set_lights(args):
q.put(args)
return "ok"
if __name__ == '__main__':
strip.begin()
t1 = Thread(target = consumer, args =(q, ))
t1.start()
# Create a cross domain request handler (with OPTIONS request) that serves
# JSON on post and JSONp + files on GET
from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCRequestHandler
from http.server import SimpleHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from json import dumps, loads
class Handler(SimpleHTTPRequestHandler, SimpleJSONRPCRequestHandler):
def do_OPTIONS(self):
self.send_response(200, 'ok')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type')
self.send_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'X-Requested-With')
self.end_headers()
def do_GET(self):
# Parse query string, make sure we have a callback.
url = urlparse(self.path)
# if '.jsonp' != url.path[-6:]: return SimpleHTTPRequestHandler.do_GET(self)
query = parse_qs(url.query)
# if 'callback' not in query: raise Exception('No callback specified')
# callback = query['callback'][-1]
# Get data for different JSONp calls
try:
if '/set_lights' == url.path:
data = set_lights(query['p[]'])
else:
data = {'error': 'Did not understand ' + url.path}
except (KeyError, ValueError): data = {'error': 'Wrong parameters', 'query': query}
# Send the reply as jsonp
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type')
self.send_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'X-Requested-With')
self.send_header('Content-type', 'application/javascript')
self.end_headers()
self.wfile.write(bytes(dumps(data), 'UTF-8'))
# self.wfile.write(bytes("{\"result\": \"ok\"}", 'UTF-8'))
# Create server
from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
server = SimpleJSONRPCServer(('0.0.0.0', 5555), Handler)
# Register functions
server.register_introspection_functions()
server.register_function(set_lights, 'set_lights')
# Serve RPC till exit
addtoexit(server.shutdown)
sockname = server.socket.getsockname()
print("\nServing JSON, JSONp and HTTP on", sockname[0], "port", sockname[1], "...")
try:
server.serve_forever()
# Shut down
except KeyboardInterrupt:
from sys import exit
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment