Skip to content

Instantly share code, notes, and snippets.

@tusing
Last active December 10, 2016 07:58
Show Gist options
  • Save tusing/2264036b5d1b10cb50f845922c708d58 to your computer and use it in GitHub Desktop.
Save tusing/2264036b5d1b10cb50f845922c708d58 to your computer and use it in GitHub Desktop.
"""CameraParser - designed to parse data from a camera such as the Microsoft
Kinect, in order to translate the data into intuitively acceptable input,
such as haptic feedback, for blind individuals.
The ideal use case is as such:
1) We have a vest with haptic feedback motors. We have a Kinect, or some
camera capable of giving us meaningful data for blind individuals, e.g.
depth.
2) We parse data from the Kinect into subdivided segments - one
for every motor. For example, a vest with 4 motors would mean
to subdivide our image into 4 squares (likely in a 2x2 formation).
3) We extract features from every subdivision (such as depth and change
in depth over time), and map these features onto different motors.
Written by Tushar Singal.
"""
import freenect
import numpy as np
from threading import Thread, RLock
class CameraParser:
"""Designed to parse output from the camera to various subdivided segments.
A use case involves haptic feedback vests, in which we can subdivide
the camera's output into distinct segments, which we can then map onto
vibration motors throughout the vest.
"""
def __init__(self, x, y, subdivFunc=False, cameraFunc=False):
"""Initializes the parser with a defined function and output type.
Subdivides output from the camera into various distinct segments with
resolution (x, y). Applies a function to every subdivision, and maps
results into an array of specified a type.
Args:
x: Desired number of subdivisions along the x-axis.
y: Desired number of subdivisions along the y-axis.
subdivFunc: The function to apply to every subdivided array.
Should return a tuple. (default: {self.decayingAverage})
cameraFunc: The function that obtains our snapshot from a camera.
Should return a 2D array. (default: {self._getKinectDepth})
"""
if subdivFunc is False:
self.subdivFunc = self.decayingAverage
else:
self.subdivFunc = subdivFunc
if cameraFunc is False:
self.cameraFunc = self._getcameraDepth
else:
self.cameraFunc = cameraFunc
self.subdivisions = np.zeros((y, x), dtype=tuple)
self.run()
def getValue(self, x, y):
"""Gets computed value after subdividing and applying ```self.subdivFunc```.
Args:
x: Width index of desired value.
y: Height index of desired value.
Returns:
Values returned by ```self.subdivFunc``` when applied
to subdivision.
[tuple]
"""
self.lock.acquire()
return self.subdivisions[y, x]
self.lock.release()
def getValues(self):
"""Gets latest snapshot of entire matrix of computed values.
Allows you to obtain the full matrix of values for the entire image
once subdivided and processed with ```self.subdivFunc```.
Returns:
A matrix of tuples, where the tuples represent the result of
```self.subdivFunc``` applied to every subdivision of the image
output from the camera.
[numpy array]
"""
self.lock.acquire()
return self.subdivisions
self.lock.release()
def _getKinectDepth(self):
"""Obtains a snapshot of depth data from the Microsoft Kinect.
Returns:
Array of depth data values, ranging from [0-255], where
255 is close to the Kinect, and 0 is far.
[Numpy array]
"""
array, _ = freenect.sync_get_depth()
array = array.astype(np.uint8)
return array
def _setValues(self, image):
"""Subdivide and update values with input from camera.
Subdivides the image into ```self.x``` by ```self.y``` segments,
and applies ```self.subdivFunc``` to each subdivision, storing the
resulting tuple in a matrix which can be obtained by getter
functions.
Args:
image: Two-dimensional array of image to parse.
"""
self.lock.acquire()
heightStrides = image.shape[0] // self.y
widthStrides = image.shape[1] // self.x
for yi in range(self.y):
yWindow = image[heightStrides * yi: heightStrides * (yi + 1)]
for xi in range(self.x):
xyWindow = yWindow[widthStrides * xi: widthStrides * (xi + 1)]
self.subdivisions[yi, xi] = self.subdivFunc(xyWindow, xi, yi)
self.lock.release()
def decayingAverage(self, xyWindow, x, y, numSamples=4):
""" Subdivision function. Returns depth, and difference in depth over time.
Performs a decay-based low-pass filter over depth measurements.
Calculates the difference in depth from the old average. This results
in two-dimensional data: "depth" and "depth derivative", which can be
mapped to, say, "motor intensity" and "motor frequency" in our
haptic-feedback vest example.
Args:
xyWindow: The subdivision we are calculating over.
x: The width index of the current subdivision.
y: The height index of the current subdivision.
numSamples: Strength of the low-pass filter in number of frames.
(default: {4})
"""
windowAverage = int(np.average(xyWindow))
oldDistance = self.getDistanceValue(x, y)
newDistChange = windowAverage - oldDistance
newDistValue = windowAverage * \
(1 / numSamples) + oldDistance * ((numSamples - 1) / numSamples)
return (newDistValue, newDistChange)
def run(self):
"""Threads the parser.
Starts continually accepting and parsing frames from the camera
in a separate thread.
Thread-safe implementation; calling getter functions will ensure
only the latest snapshot of data is captured.
"""
self.lock = RLock()
def startParsing():
while True:
self._setValues(self.cameraFunc)
Thread(target=startParsing).start()
def main(argv):
parser = CameraParser(8, 4, CameraParser.decayingAverage,
CameraParser._getKinectDepth)
print(parser.getValue(4, 4))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment