Skip to content

Instantly share code, notes, and snippets.

@jamesbulpin
Created December 22, 2016 14:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jamesbulpin/1fc922bddfd933f1935f423925cffb1c to your computer and use it in GitHub Desktop.
Save jamesbulpin/1fc922bddfd933f1935f423925cffb1c to your computer and use it in GitHub Desktop.
Quick and dirty script to get X-Y coordinates of WS2811 LEDSs using a webcam
#!/usr/bin/python
import threading
import sys
import os
import shutil
import datetime
import subprocess
import signal
import time
import math
from neopixel import *
from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer
import skimage.io
import skimage.segmentation
import numpy as np
# LED strip configuration:
LED_COUNT = 100 # Number of LED pixels.
LED_PIN = 18 # GPIO pin connected to the pixels (must support PWM!).
LED_FREQ_HZ = 800000 # LED signal frequency in hertz (usually 800khz)
LED_DMA = 5 # DMA channel to use for generating signal (try 5)
LED_BRIGHTNESS = 255 # Set to 0 for darkest and 255 for brightest
LED_INVERT = False # True to invert the signal (when using NPN transistor level shift)
cfgResolution = "960x720"
cfgRotation = "270"
cfgPortNumber = 8000
cfgCameraName = "UnnamedCamera"
strip = None
pixels = []
for i in range(LED_COUNT):
pixels.append([None, None])
def log(s):
f = file("/tmp/rpiagent.log", 'a')
m = "[%s] %s\n" % (datetime.datetime.utcnow(), s)
sys.stderr.write(m + "\n")
f.write(m)
f.close()
class Watcher:
"""this class solves two problems with multithreaded
programs in Python, (1) a signal might be delivered
to any thread (which is just a malfeature) and (2) if
the thread that gets the signal is waiting, the signal
is ignored (which is a bug).
The watcher is a concurrent process (not thread) that
waits for a signal and the process that contains the
threads. See Appendix A of The Little Book of Semaphores.
http://greenteapress.com/semaphores/
I have only tested this on Linux. I would expect it to
work on the Macintosh and not work on Windows.
From: http://code.activestate.com/recipes/496735-workaround-for-missed-sigin
t-in-multithreaded-prog/
"""
def __init__(self):
""" Creates a child thread, which returns. The parent
thread waits for a KeyboardInterrupt and then kills
the child thread.
"""
self.child = os.fork()
if self.child == 0:
return
else:
self.watch()
def watch(self):
try:
os.wait()
except KeyboardInterrupt:
# I put the capital B in KeyBoardInterrupt so I can
# tell when the Watcher gets the SIGINT
print 'KeyBoardInterrupt'
self.kill()
sys.exit()
def kill(self):
try:
os.kill(self.child, signal.SIGKILL)
except OSError: pass
class WebCam(threading.Thread):
def __init__(self, filebasename, unlink=True):
super(WebCam, self).__init__()
self.filebasename = filebasename.replace("%DATETIME%", datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S"))
self.unlink = unlink
def run(self):
pngfile = "%s.png" % (self.filebasename)
pngpath = "/tmp/" + pngfile
#cmd = ["fswebcam", "--subtitle", cfgCameraName, "-r", cfgResolution, "--rotate", cfgRotation, pngpath]
cmd = ["fswebcam", "--no-timestamp", "--no-banner", "-r", cfgResolution, "--rotate", cfgRotation, pngpath]
log("Executing local command: " + " ".join(cmd))
rc = subprocess.call(cmd)
if rc != 0:
log("fswebcam failed with error %d" % (rc))
return rc
if self.unlink:
os.unlink(pngpath)
else:
self.pngfile = pngpath
class myHandler(BaseHTTPRequestHandler):
#Handler for the GET requests
def do_GET(self):
w = WebCam(cfgCameraName + "_%DATETIME%", unlink=False)
w.start()
self.send_response(200)
self.send_header('Content-type','image/png')
self.end_headers()
w.join()
f = file(w.pngfile, "r")
try:
shutil.copyfileobj(f, self.wfile)
finally:
f.close()
os.unlink(w.pngfile)
return
class WebServer(threading.Thread):
def __init__(self):
super(WebServer, self).__init__()
self.server = HTTPServer(('', cfgPortNumber), myHandler)
def run(self):
self.server.serve_forever()
def doPixel(pixel):
global strip
try:
for i in range(strip.numPixels()):
strip.setPixelColor(i, Color(0,0,0))
strip.setPixelColor(pixel, Color(255,255,255))
strip.show()
time.sleep(1)
w = WebCam(cfgCameraName + "_%DATETIME%", unlink=False)
w.start()
w.join()
# spot finding based on http://stackoverflow.com/questions/30081725/how-to-locate-the-center-of-a-bright-spot-in-an-image
# Read in the image
# Note - intensities are floating point from [0,1]
im = skimage.io.imread(w.pngfile, True)
# Threshold the image first then clear the border
th = 200.0
while 1:
im_clear = skimage.segmentation.clear_border(im > (th/255.0))
# Find coordinates of thresholded image
y,x = np.nonzero(im_clear)
# Find average
xmean = x.mean()
ymean = y.mean()
if not math.isnan(xmean) and not math.isnan(ymean):
pixels[pixel][0] = xmean
pixels[pixel][1] = ymean
print pixel,xmean,ymean
break
th = th - 10
if th < 0:
pixels[pixel][0] = xmean
pixels[pixel][1] = ymean
print pixel,xmean,ymean
break
time.sleep(1)
except:
pass
if __name__ == "__main__":
Watcher()
www = WebServer()
www.start()
# Create NeoPixel object with appropriate configuration.
strip = Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS)
# Intialize the library (must be called once before other functions).
strip.begin()
for i in range(strip.numPixels()):
strip.setPixelColor(i, Color(255,255,255))
strip.show()
# Iterate over the LEDs until we have a position for all of them.
# Sometimes the webcam capture fails, hence the retrying.
while 1:
remaining = 0
for i in range(len(pixels)):
if pixels[i][0] == None:
remaining = remaining + 1
doPixel(i)
if remaining == 0:
break;
# Normalise to a 100x100 grid
xs = map(lambda x: x[0], pixels)
ys = map(lambda x: x[1], pixels)
minx = min(xs)
maxx = max(xs)
miny = min(ys)
maxy = max(ys)
spanx = maxx - minx
spany = maxy - miny
pixelsnorm = []
for i in range(len(pixels)):
pixelsnorm.append([0,0])
x = pixels[i][0]
y = pixels[i][1]
pixelsnorm[i][0] = 100.0*(x - minx)/spanx
pixelsnorm[i][1] = 100.0*(y - miny)/spany
print pixels
print pixelsnorm
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment