Created
December 19, 2014 23:55
-
-
Save tomislacker/1517c6de1551b1b88067 to your computer and use it in GitHub Desktop.
Hacky Motion Tracking Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import cv | |
import sys | |
import argparse | |
import os.path | |
import time | |
import math | |
class MovementEvent: | |
subjectFile = False | |
startTime = False | |
startStamp = False | |
lastActTime = False | |
actFrames = 0 | |
idleFrames = 0 | |
endTime = False | |
endStamp = False | |
threshold = False | |
timeoutSecs = False | |
timeoutFrames = False | |
def __init__(self, subjectFile = False, threshold = False, timeoutFrames = False, timeoutSecs = False): | |
if threshold == False: | |
raise ValueError('Requires threshold') | |
if timeoutFrames == False and timeoutSecs == False: | |
raise ValueError('Requires timeoutFrames OR timeoutSecs parameter') | |
if timeoutFrames != False and timeoutSecs != False: | |
raise ValueError('Requires timeoutFrames OR timeoutSecs parameter -- not both') | |
self.subjectFile = subjectFile | |
self.threshold = threshold | |
self.timeoutFrames = timeoutFrames | |
self.timeoutSecs = timeoutSecs | |
def getMicrotime(self, getAsFloat = True): | |
if getAsFloat: | |
return time.time() | |
else: | |
return '%f %d' % math.modf(time.time()) | |
def putFrame(self, movement, timestamp): | |
# print 'MovementEvent::putFrame(' + repr(movement) + ', ' + repr(timestamp) + ')' | |
if movement >= self.threshold: | |
if self.startTime == False: | |
# print 'MovementEvent::putFrame -- NEW' | |
self.startTime = self.getMicrotime() | |
self.startStamp = timestamp | |
# elif self.actFrames == 0: | |
# print 'MovementEvent::putFrame -- Active' | |
self.lastActTime = self.getMicrotime() | |
self.actFrames += 1 | |
self.idleFrames = 0 | |
elif self.startTime != False: | |
# if self.idleFrames == 0: | |
# print 'MovementEvent::putFrame -- idle' | |
self.actFrames = 0 | |
self.idleFrames += 1 | |
if self.timeoutSecs == True and self.timeoutSecs <= ( self.getMicrotime() - self.lastActTime): | |
# Expire motion (seconds) | |
self.endStamp = timestamp | |
self.endOfMovement('Seconds', self.timeoutSecs) | |
elif self.timeoutFrames == True and self.timeoutFrames <= self.idleFrames: | |
# Expire motion (frame count) | |
self.endStamp = timestamp | |
self.endOfMovement('Frames', self.timeoutFrames) | |
def endOfMovement(self, cause = 'Unknown', value = False): | |
# print 'MovementEvent::endOfMovement("' + repr(cause) + '", ' + repr(value) + ')' | |
self.endTime = self.getMicrotime() | |
print repr(self.subjectFile) + ',' + repr(self.startStamp) + ',' + repr(self.endStamp) | |
self.resetEvent() | |
def resetEvent(self): | |
#print 'MovementEvent::resetEvent()' | |
self.startTime = False | |
self.lastActTime = False | |
self.actFrames = 0 | |
self.idleFrames = 0 | |
self.endTime = False | |
class Target: | |
inputFile = False | |
# outputFile = False | |
# outputForce = False | |
threshold = 1500 | |
# threshframes = 5 | |
timeoutSecs = 30 | |
xOutput = False | |
movementEvent = False | |
def __init__(self): | |
### | |
# Define an argument parser for handling any parameters passed via | |
# command line to our program | |
### | |
parser = argparse.ArgumentParser( | |
description='Detect motion in recorded video and output a summary of interesting timestamps' | |
) | |
# Add parameter for specifying what file to read | |
parser.add_argument( | |
'-i' | |
,'--input' | |
,metavar='FILE' | |
,default=False | |
,help='Path to video file' | |
) | |
# Add parameter for specifying what file to output to | |
# parser.add_argument( | |
# '-o' | |
# ,'--output' | |
# ,metavar='FILE' | |
# ,default=False | |
# ,help='Path to CSV output file' | |
# ) | |
# Add parameter for specifying how many sqpx should be our threshold | |
# at which to decide a video frame is interesting | |
parser.add_argument( | |
'-t' | |
,'--threshold' | |
,metavar='INT' | |
,type=int | |
,default=1500 | |
,help='Square pixel area that is modified that we decide is interesting (Default: 1500)' | |
) | |
# Add parameter for specifying how many frames in a row we should have | |
# our threshold (specified above) surpassed before officially declaring | |
# the starting time as interesting -- and queuing that for later output | |
# parser.add_argument( | |
# '-f' | |
# ,'--threshframes' | |
# ,metavar='INT' | |
# ,type=int | |
# ,default=5 | |
# ,help='Number of frames that our threshold is consecutively surpassed in to declare a time segment interesting (Default: 5)' | |
# ) | |
parser.add_argument( | |
'-T' | |
,'--timeoutsecs' | |
,metavar='INT' | |
,type=int | |
,default=30 | |
,help='Number of seconds before (Default: 30)' | |
) | |
parser.add_argument( | |
'-X' | |
,action='store_true' | |
,help='Enable X output' | |
) | |
parser.add_argument( | |
'--force' | |
,action='store_true' | |
,help='Force output (Removes output file if exists)' | |
) | |
# Allow the argparse library to crunch away at the command line arguments | |
args = parser.parse_args() | |
# Ensure input file exists | |
try: | |
if os.path.isfile(args.input) == False: | |
raise ValueError('Input file does exist', args.input) | |
except TypeError: | |
raise ValueError('No input file specified') | |
else: | |
self.inputFile = args.input | |
# See if we want to remove the existing output file | |
# self.outputForce = args.force | |
# Ensure output file does not exist | |
# try: | |
# if args.output == False: | |
# inputFilepath, inputFileext = os.path.splitext(self.inputFile) | |
# args.output = inputFilepath + '.csv' | |
# | |
# if os.path.isfile(args.output) == True and self.outputForce == False: | |
# raise ValueError('Output file exists', args.output) | |
# except TypeError: | |
# print 'NOTICE: Not saving output of "interesting" timeslices' | |
# self.outputFile = False | |
# else: | |
# self.outputFile = args.output | |
# Ensure our threshold value is within range | |
if args.threshold < 1: | |
raise ValueError('Threshold cannot be less than 1', args.threshold) | |
self.threshold = args.threshold | |
# Ensure our threshframes value is within range | |
# if args.threshframes < 1: | |
# raise ValueError('Threshframes cannot be less than 1', args.threshframes) | |
# self.threshframes = args.threshframes | |
if args.timeoutsecs < 1: | |
raise ValueError('timeoutsecs cannot be less than 1', args.timeoutsecs) | |
self.timeoutSecs = args.timeoutsecs | |
# Initiate the capture processing from the input file and spin up a | |
# visual window | |
self.xOutput = args.X | |
print ' inputFile: ' + repr(self.inputFile) | |
# print ' outputFile: ' + repr(self.outputFile) | |
# print ' outputForce: ' + repr(self.outputForce) | |
print ' threshold: ' + repr(self.threshold) | |
print ' timeoutSecs: ' + repr(self.timeoutSecs) | |
# print 'threshframes: ' + repr(self.threshframes) | |
print ' xOutput: ' + repr(self.xOutput) | |
self.capture = cv.CaptureFromFile(self.inputFile) | |
if self.xOutput == True: | |
cv.NamedWindow("Target", 1) | |
def run(self): | |
# Capture first frame to get size | |
frame = cv.QueryFrame(self.capture) | |
frame_size = cv.GetSize(frame) | |
color_image = cv.CreateImage(cv.GetSize(frame), 8, 3) | |
grey_image = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_8U, 1) | |
moving_average = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_32F, 3) | |
first = True | |
# Initialize our MovementEvent tracking class | |
self.movementEvent = MovementEvent( | |
subjectFile = self.inputFile | |
,threshold = self.threshold | |
,timeoutSecs = self.timeoutSecs | |
) | |
while True: | |
closest_to_left = cv.GetSize(frame)[0] | |
closest_to_right = cv.GetSize(frame)[1] | |
color_image = cv.QueryFrame(self.capture) | |
# Smooth to get rid of false positives | |
cv.Smooth(color_image, color_image, cv.CV_GAUSSIAN, 3, 0) | |
if first: | |
difference = cv.CloneImage(color_image) | |
temp = cv.CloneImage(color_image) | |
cv.ConvertScale(color_image, moving_average, 1.0, 0.0) | |
first = False | |
else: | |
cv.RunningAvg(color_image, moving_average, 0.020, None) | |
# Convert the scale of the moving average. | |
cv.ConvertScale(moving_average, temp, 1.0, 0.0) | |
# Minus the current frame from the moving average. | |
cv.AbsDiff(color_image, temp, difference) | |
# Convert the image to grayscale. | |
cv.CvtColor(difference, grey_image, cv.CV_RGB2GRAY) | |
# Convert the image to black and white. | |
cv.Threshold(grey_image, grey_image, 70, 255, cv.CV_THRESH_BINARY) | |
# Dilate and erode to get people blobs | |
cv.Dilate(grey_image, grey_image, None, 18) | |
cv.Erode(grey_image, grey_image, None, 10) | |
storage = cv.CreateMemStorage(0) | |
contour = cv.FindContours(grey_image, storage, cv.CV_RETR_CCOMP, cv.CV_CHAIN_APPROX_SIMPLE) | |
points = [] | |
movementArea = 0 | |
while contour: | |
bound_rect = cv.BoundingRect(list(contour)) | |
contour = contour.h_next() | |
# Compute the bounding points to the boxes that will be drawn | |
# on the screen | |
pt1 = (bound_rect[0], bound_rect[1]) | |
pt2 = (bound_rect[0] + bound_rect[2], bound_rect[1] + bound_rect[3]) | |
# Add this latest bounding box to the overall area that is being | |
# detected as movement | |
movementArea += ( ( pt2[0] - pt1[0] ) * ( pt2[1] - pt1[1] ) ); | |
points.append(pt1) | |
points.append(pt2) | |
cv.Rectangle(color_image, pt1, pt2, cv.CV_RGB(255,0,0), 1) | |
self.movementEvent.putFrame( | |
movement = movementArea | |
,timestamp = cv.GetCaptureProperty(self.capture, cv.CV_CAP_PROP_POS_MSEC) | |
) | |
#if movementArea > self.movementThreshold: | |
# print 'MA: ' + repr(movementArea) + ' @ ' + repr(cv.GetCaptureProperty(self.capture, cv.CV_CAP_PROP_POS_MSEC)) | |
if len(points): | |
center_point = reduce(lambda a, b: ((a[0] + b[0]) / 2, (a[1] + b[1]) / 2), points) | |
cv.Circle(color_image, center_point, 40, cv.CV_RGB(255, 255, 255), 1) | |
cv.Circle(color_image, center_point, 30, cv.CV_RGB(255, 100, 0), 1) | |
cv.Circle(color_image, center_point, 20, cv.CV_RGB(255, 255, 255), 1) | |
cv.Circle(color_image, center_point, 10, cv.CV_RGB(255, 100, 0), 1) | |
if self.xOutput == True: | |
cv.ShowImage("Target", color_image) | |
# Listen for ESC key | |
c = cv.WaitKey(7) % 0x100 | |
if c == 27: | |
break | |
if __name__=="__main__": | |
t = Target() | |
try: | |
t.run() | |
#print 'TEST ONLY' | |
except KeyboardInterrupt: | |
print '' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment