Created
December 22, 2016 14:38
-
-
Save jamesbulpin/1fc922bddfd933f1935f423925cffb1c to your computer and use it in GitHub Desktop.
Quick and dirty script to get X-Y coordinates of WS2811 LEDSs using a webcam
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import threading | |
import sys | |
import os | |
import shutil | |
import datetime | |
import subprocess | |
import signal | |
import time | |
import math | |
from neopixel import * | |
from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer | |
import skimage.io | |
import skimage.segmentation | |
import numpy as np | |
# LED strip configuration: | |
LED_COUNT = 100 # Number of LED pixels. | |
LED_PIN = 18 # GPIO pin connected to the pixels (must support PWM!). | |
LED_FREQ_HZ = 800000 # LED signal frequency in hertz (usually 800khz) | |
LED_DMA = 5 # DMA channel to use for generating signal (try 5) | |
LED_BRIGHTNESS = 255 # Set to 0 for darkest and 255 for brightest | |
LED_INVERT = False # True to invert the signal (when using NPN transistor level shift) | |
cfgResolution = "960x720" | |
cfgRotation = "270" | |
cfgPortNumber = 8000 | |
cfgCameraName = "UnnamedCamera" | |
strip = None | |
pixels = [] | |
for i in range(LED_COUNT): | |
pixels.append([None, None]) | |
def log(s): | |
f = file("/tmp/rpiagent.log", 'a') | |
m = "[%s] %s\n" % (datetime.datetime.utcnow(), s) | |
sys.stderr.write(m + "\n") | |
f.write(m) | |
f.close() | |
class Watcher: | |
"""this class solves two problems with multithreaded | |
programs in Python, (1) a signal might be delivered | |
to any thread (which is just a malfeature) and (2) if | |
the thread that gets the signal is waiting, the signal | |
is ignored (which is a bug). | |
The watcher is a concurrent process (not thread) that | |
waits for a signal and the process that contains the | |
threads. See Appendix A of The Little Book of Semaphores. | |
http://greenteapress.com/semaphores/ | |
I have only tested this on Linux. I would expect it to | |
work on the Macintosh and not work on Windows. | |
From: http://code.activestate.com/recipes/496735-workaround-for-missed-sigin | |
t-in-multithreaded-prog/ | |
""" | |
def __init__(self): | |
""" Creates a child thread, which returns. The parent | |
thread waits for a KeyboardInterrupt and then kills | |
the child thread. | |
""" | |
self.child = os.fork() | |
if self.child == 0: | |
return | |
else: | |
self.watch() | |
def watch(self): | |
try: | |
os.wait() | |
except KeyboardInterrupt: | |
# I put the capital B in KeyBoardInterrupt so I can | |
# tell when the Watcher gets the SIGINT | |
print 'KeyBoardInterrupt' | |
self.kill() | |
sys.exit() | |
def kill(self): | |
try: | |
os.kill(self.child, signal.SIGKILL) | |
except OSError: pass | |
class WebCam(threading.Thread): | |
def __init__(self, filebasename, unlink=True): | |
super(WebCam, self).__init__() | |
self.filebasename = filebasename.replace("%DATETIME%", datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S")) | |
self.unlink = unlink | |
def run(self): | |
pngfile = "%s.png" % (self.filebasename) | |
pngpath = "/tmp/" + pngfile | |
#cmd = ["fswebcam", "--subtitle", cfgCameraName, "-r", cfgResolution, "--rotate", cfgRotation, pngpath] | |
cmd = ["fswebcam", "--no-timestamp", "--no-banner", "-r", cfgResolution, "--rotate", cfgRotation, pngpath] | |
log("Executing local command: " + " ".join(cmd)) | |
rc = subprocess.call(cmd) | |
if rc != 0: | |
log("fswebcam failed with error %d" % (rc)) | |
return rc | |
if self.unlink: | |
os.unlink(pngpath) | |
else: | |
self.pngfile = pngpath | |
class myHandler(BaseHTTPRequestHandler): | |
#Handler for the GET requests | |
def do_GET(self): | |
w = WebCam(cfgCameraName + "_%DATETIME%", unlink=False) | |
w.start() | |
self.send_response(200) | |
self.send_header('Content-type','image/png') | |
self.end_headers() | |
w.join() | |
f = file(w.pngfile, "r") | |
try: | |
shutil.copyfileobj(f, self.wfile) | |
finally: | |
f.close() | |
os.unlink(w.pngfile) | |
return | |
class WebServer(threading.Thread): | |
def __init__(self): | |
super(WebServer, self).__init__() | |
self.server = HTTPServer(('', cfgPortNumber), myHandler) | |
def run(self): | |
self.server.serve_forever() | |
def doPixel(pixel): | |
global strip | |
try: | |
for i in range(strip.numPixels()): | |
strip.setPixelColor(i, Color(0,0,0)) | |
strip.setPixelColor(pixel, Color(255,255,255)) | |
strip.show() | |
time.sleep(1) | |
w = WebCam(cfgCameraName + "_%DATETIME%", unlink=False) | |
w.start() | |
w.join() | |
# spot finding based on http://stackoverflow.com/questions/30081725/how-to-locate-the-center-of-a-bright-spot-in-an-image | |
# Read in the image | |
# Note - intensities are floating point from [0,1] | |
im = skimage.io.imread(w.pngfile, True) | |
# Threshold the image first then clear the border | |
th = 200.0 | |
while 1: | |
im_clear = skimage.segmentation.clear_border(im > (th/255.0)) | |
# Find coordinates of thresholded image | |
y,x = np.nonzero(im_clear) | |
# Find average | |
xmean = x.mean() | |
ymean = y.mean() | |
if not math.isnan(xmean) and not math.isnan(ymean): | |
pixels[pixel][0] = xmean | |
pixels[pixel][1] = ymean | |
print pixel,xmean,ymean | |
break | |
th = th - 10 | |
if th < 0: | |
pixels[pixel][0] = xmean | |
pixels[pixel][1] = ymean | |
print pixel,xmean,ymean | |
break | |
time.sleep(1) | |
except: | |
pass | |
if __name__ == "__main__": | |
Watcher() | |
www = WebServer() | |
www.start() | |
# Create NeoPixel object with appropriate configuration. | |
strip = Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS) | |
# Intialize the library (must be called once before other functions). | |
strip.begin() | |
for i in range(strip.numPixels()): | |
strip.setPixelColor(i, Color(255,255,255)) | |
strip.show() | |
# Iterate over the LEDs until we have a position for all of them. | |
# Sometimes the webcam capture fails, hence the retrying. | |
while 1: | |
remaining = 0 | |
for i in range(len(pixels)): | |
if pixels[i][0] == None: | |
remaining = remaining + 1 | |
doPixel(i) | |
if remaining == 0: | |
break; | |
# Normalise to a 100x100 grid | |
xs = map(lambda x: x[0], pixels) | |
ys = map(lambda x: x[1], pixels) | |
minx = min(xs) | |
maxx = max(xs) | |
miny = min(ys) | |
maxy = max(ys) | |
spanx = maxx - minx | |
spany = maxy - miny | |
pixelsnorm = [] | |
for i in range(len(pixels)): | |
pixelsnorm.append([0,0]) | |
x = pixels[i][0] | |
y = pixels[i][1] | |
pixelsnorm[i][0] = 100.0*(x - minx)/spanx | |
pixelsnorm[i][1] = 100.0*(y - miny)/spany | |
print pixels | |
print pixelsnorm |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment