Skip to content

Instantly share code, notes, and snippets.

@maleadt
Created August 11, 2011 18:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maleadt/1140303 to your computer and use it in GitHub Desktop.
Save maleadt/1140303 to your computer and use it in GitHub Desktop.
Webcam-based power monitor
#!/usr/bin/python2
#
# Configuration
#
# System modules
import sys
import argparse # python-argparse
import logging
import time
# Image processing
import cv
# Calibration data
segment = [
215, 78, # x, y
150, 47 ]
segment_offsets = [
[1, -10], # bottom left
[10, -1], # bottom
[19, -10], # bottom right
[11, -20], # center
[3, -30], # top left
[12, -37], # top
[21, -28] ] # top right
digit_positions = [
[115, 41],
[79, 41],
[43, 41] ]
segment_threshold = 5
number_segments = [
[1, 1, 1, 0, 1, 1, 1],
[0, 0, 1, 0, 0, 0, 1],
[1, 1, 0, 1, 0, 1, 1],
[0, 1, 1, 1, 0, 1, 1],
[0, 0, 1, 1, 1, 0, 1],
[0, 1, 1, 1, 1, 1, 0],
[1, 1, 1, 1, 1, 1, 0],
[0, 0, 1, 0, 0, 1, 1],
[1, 1, 1, 1, 1, 1, 1],
[0, 1, 1, 1, 1, 1, 1] ]
#
# Auxiliary
#
class PowerMonitor:
# Member data
logger = logging.getLogger('PowerMonitor')
# Constructor
def __init__(self):
# Open and configure device
self.capture = cv.CaptureFromCAM(-1)
if not self.capture:
raise Exception("could not open device")
cv.SetCaptureProperty(self.capture, cv.CV_CAP_PROP_FRAME_HEIGHT, 480)
cv.SetCaptureProperty(self.capture, cv.CV_CAP_PROP_FRAME_WIDTH, 640)
# Auxiliary
def _getFrame(self):
frame = cv.QueryFrame(self.capture)
if not frame:
raise Exception("could not get a frame")
cv.SaveImage("/tmp/webcam_input.jpg", frame)
return frame
def _preprocess(self, frame):
# Convert to grayscale
grayscale = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_8U, 1)
cv.CvtColor(frame, grayscale, cv.CV_RGB2GRAY)
# Crop to relevant region
crop = cv.GetSubRect(grayscale, (segment[0], segment[1], segment[2], segment[3]))
# Perform a binary adaptive threshold
cv.AdaptiveThreshold(crop, crop, 255, cv.CV_THRESH_BINARY, cv.CV_ADAPTIVE_THRESH_MEAN_C, 53, 20)
#cv.SaveImage("threshold.jpg", crop)
return crop
def _recognize(self, image):
final_number = 0
order = 0.1
debug = cv.CreateImage(cv.GetSize(image), cv.IPL_DEPTH_8U, 3)
cv.CvtColor(image, debug, cv.CV_GRAY2RGB)
for digit in range (0, 3):
# Get segments
segments = []
for segment in range(0, 7):
x_origin = digit_positions[digit][0] + segment_offsets[segment][0]
y_origin = digit_positions[digit][1] + segment_offsets[segment][1]
# Calculate
count = 0
for x in range(x_origin-1, x_origin+2):
for y in range(y_origin-1, y_origin+2):
pixel = cv.Get2D(image, y, x)
if pixel[0] == 0:
count = count + 1
# Save
segments.append(count >= segment_threshold)
# Debug output
for x in range(x_origin-1, x_origin+2):
for y in range(y_origin-1, y_origin+2):
if (count >= segment_threshold):
cv.Set2D(debug, y, x, cv.RGB(0, 255, 0))
else:
cv.Set2D(debug, y, x, cv.RGB(255, 0, 0))
# Calculate number
guess = -1
for number in range(0, 10):
matches = 0
for segment in range(0, 7):
if segments[segment] == number_segments[number][segment]:
matches = matches + 1
if matches == 7:
guess = number
break;
if guess == -1:
cv.SaveImage("/tmp/webcam_segments_failure.jpg", debug)
raise Exception("could not guess digit %i" % digit)
final_number = final_number + guess * order
order = order * 10
cv.SaveImage("/tmp/webcam_segments.jpg", debug)
return final_number
# Getters
def getConsumption(self):
# Acquire and preprocess a frame
frame = self._getFrame()
preprocessed = self._preprocess(frame)
# Perform the recognition
number = self._recognize(preprocessed)
return number
def getConsumptionRobustly(self):
consumption = None
tries = 0
while consumption is None and tries < 5:
try:
consumption = self.getConsumption()
except Exception, err:
tries = tries + 1
time.sleep(0.1)
if consumption is None:
raise Exception("could not fetch power consumption")
return consumption
#
# Main
#
def main(args):
# Configure logging
rootlogger = logging.getLogger()
if args.verbose:
rootlogger.setLevel(logging.DEBUG)
else:
rootlogger.setLevel(logging.INFO)
monitor = PowerMonitor()
# Different execution modi
if args.mode == "single":
print monitor.getConsumptionRobustly()
sys.exit(0)
elif args.mode == "watch":
while True:
print monitor.getConsumptionRobustly()
time.sleep(args.sleep)
parser = argparse.ArgumentParser(description='Power monitor.')
parser.add_argument('--verbose', '-v', help='print more information', action='store_true')
parser.add_argument('--mode', '-m', help='mode of execution', choices=['single', 'watch', 'fetch'], default='single')
parser.add_argument('--sleep', '-s', help='amount of time between checks', type=int, default=5)
main(parser.parse_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment