Skip to content

Instantly share code, notes, and snippets.

@tomislacker
Created December 19, 2014 23:55
Show Gist options
  • Save tomislacker/1517c6de1551b1b88067 to your computer and use it in GitHub Desktop.
Save tomislacker/1517c6de1551b1b88067 to your computer and use it in GitHub Desktop.
Hacky Motion Tracking Example
#!/usr/bin/env python
import cv
import sys
import argparse
import os.path
import time
import math
class MovementEvent:
subjectFile = False
startTime = False
startStamp = False
lastActTime = False
actFrames = 0
idleFrames = 0
endTime = False
endStamp = False
threshold = False
timeoutSecs = False
timeoutFrames = False
def __init__(self, subjectFile = False, threshold = False, timeoutFrames = False, timeoutSecs = False):
if threshold == False:
raise ValueError('Requires threshold')
if timeoutFrames == False and timeoutSecs == False:
raise ValueError('Requires timeoutFrames OR timeoutSecs parameter')
if timeoutFrames != False and timeoutSecs != False:
raise ValueError('Requires timeoutFrames OR timeoutSecs parameter -- not both')
self.subjectFile = subjectFile
self.threshold = threshold
self.timeoutFrames = timeoutFrames
self.timeoutSecs = timeoutSecs
def getMicrotime(self, getAsFloat = True):
if getAsFloat:
return time.time()
else:
return '%f %d' % math.modf(time.time())
def putFrame(self, movement, timestamp):
# print 'MovementEvent::putFrame(' + repr(movement) + ', ' + repr(timestamp) + ')'
if movement >= self.threshold:
if self.startTime == False:
# print 'MovementEvent::putFrame -- NEW'
self.startTime = self.getMicrotime()
self.startStamp = timestamp
# elif self.actFrames == 0:
# print 'MovementEvent::putFrame -- Active'
self.lastActTime = self.getMicrotime()
self.actFrames += 1
self.idleFrames = 0
elif self.startTime != False:
# if self.idleFrames == 0:
# print 'MovementEvent::putFrame -- idle'
self.actFrames = 0
self.idleFrames += 1
if self.timeoutSecs == True and self.timeoutSecs <= ( self.getMicrotime() - self.lastActTime):
# Expire motion (seconds)
self.endStamp = timestamp
self.endOfMovement('Seconds', self.timeoutSecs)
elif self.timeoutFrames == True and self.timeoutFrames <= self.idleFrames:
# Expire motion (frame count)
self.endStamp = timestamp
self.endOfMovement('Frames', self.timeoutFrames)
def endOfMovement(self, cause = 'Unknown', value = False):
# print 'MovementEvent::endOfMovement("' + repr(cause) + '", ' + repr(value) + ')'
self.endTime = self.getMicrotime()
print repr(self.subjectFile) + ',' + repr(self.startStamp) + ',' + repr(self.endStamp)
self.resetEvent()
def resetEvent(self):
#print 'MovementEvent::resetEvent()'
self.startTime = False
self.lastActTime = False
self.actFrames = 0
self.idleFrames = 0
self.endTime = False
class Target:
inputFile = False
# outputFile = False
# outputForce = False
threshold = 1500
# threshframes = 5
timeoutSecs = 30
xOutput = False
movementEvent = False
def __init__(self):
###
# Define an argument parser for handling any parameters passed via
# command line to our program
###
parser = argparse.ArgumentParser(
description='Detect motion in recorded video and output a summary of interesting timestamps'
)
# Add parameter for specifying what file to read
parser.add_argument(
'-i'
,'--input'
,metavar='FILE'
,default=False
,help='Path to video file'
)
# Add parameter for specifying what file to output to
# parser.add_argument(
# '-o'
# ,'--output'
# ,metavar='FILE'
# ,default=False
# ,help='Path to CSV output file'
# )
# Add parameter for specifying how many sqpx should be our threshold
# at which to decide a video frame is interesting
parser.add_argument(
'-t'
,'--threshold'
,metavar='INT'
,type=int
,default=1500
,help='Square pixel area that is modified that we decide is interesting (Default: 1500)'
)
# Add parameter for specifying how many frames in a row we should have
# our threshold (specified above) surpassed before officially declaring
# the starting time as interesting -- and queuing that for later output
# parser.add_argument(
# '-f'
# ,'--threshframes'
# ,metavar='INT'
# ,type=int
# ,default=5
# ,help='Number of frames that our threshold is consecutively surpassed in to declare a time segment interesting (Default: 5)'
# )
parser.add_argument(
'-T'
,'--timeoutsecs'
,metavar='INT'
,type=int
,default=30
,help='Number of seconds before (Default: 30)'
)
parser.add_argument(
'-X'
,action='store_true'
,help='Enable X output'
)
parser.add_argument(
'--force'
,action='store_true'
,help='Force output (Removes output file if exists)'
)
# Allow the argparse library to crunch away at the command line arguments
args = parser.parse_args()
# Ensure input file exists
try:
if os.path.isfile(args.input) == False:
raise ValueError('Input file does exist', args.input)
except TypeError:
raise ValueError('No input file specified')
else:
self.inputFile = args.input
# See if we want to remove the existing output file
# self.outputForce = args.force
# Ensure output file does not exist
# try:
# if args.output == False:
# inputFilepath, inputFileext = os.path.splitext(self.inputFile)
# args.output = inputFilepath + '.csv'
#
# if os.path.isfile(args.output) == True and self.outputForce == False:
# raise ValueError('Output file exists', args.output)
# except TypeError:
# print 'NOTICE: Not saving output of "interesting" timeslices'
# self.outputFile = False
# else:
# self.outputFile = args.output
# Ensure our threshold value is within range
if args.threshold < 1:
raise ValueError('Threshold cannot be less than 1', args.threshold)
self.threshold = args.threshold
# Ensure our threshframes value is within range
# if args.threshframes < 1:
# raise ValueError('Threshframes cannot be less than 1', args.threshframes)
# self.threshframes = args.threshframes
if args.timeoutsecs < 1:
raise ValueError('timeoutsecs cannot be less than 1', args.timeoutsecs)
self.timeoutSecs = args.timeoutsecs
# Initiate the capture processing from the input file and spin up a
# visual window
self.xOutput = args.X
print ' inputFile: ' + repr(self.inputFile)
# print ' outputFile: ' + repr(self.outputFile)
# print ' outputForce: ' + repr(self.outputForce)
print ' threshold: ' + repr(self.threshold)
print ' timeoutSecs: ' + repr(self.timeoutSecs)
# print 'threshframes: ' + repr(self.threshframes)
print ' xOutput: ' + repr(self.xOutput)
self.capture = cv.CaptureFromFile(self.inputFile)
if self.xOutput == True:
cv.NamedWindow("Target", 1)
def run(self):
# Capture first frame to get size
frame = cv.QueryFrame(self.capture)
frame_size = cv.GetSize(frame)
color_image = cv.CreateImage(cv.GetSize(frame), 8, 3)
grey_image = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_8U, 1)
moving_average = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_32F, 3)
first = True
# Initialize our MovementEvent tracking class
self.movementEvent = MovementEvent(
subjectFile = self.inputFile
,threshold = self.threshold
,timeoutSecs = self.timeoutSecs
)
while True:
closest_to_left = cv.GetSize(frame)[0]
closest_to_right = cv.GetSize(frame)[1]
color_image = cv.QueryFrame(self.capture)
# Smooth to get rid of false positives
cv.Smooth(color_image, color_image, cv.CV_GAUSSIAN, 3, 0)
if first:
difference = cv.CloneImage(color_image)
temp = cv.CloneImage(color_image)
cv.ConvertScale(color_image, moving_average, 1.0, 0.0)
first = False
else:
cv.RunningAvg(color_image, moving_average, 0.020, None)
# Convert the scale of the moving average.
cv.ConvertScale(moving_average, temp, 1.0, 0.0)
# Minus the current frame from the moving average.
cv.AbsDiff(color_image, temp, difference)
# Convert the image to grayscale.
cv.CvtColor(difference, grey_image, cv.CV_RGB2GRAY)
# Convert the image to black and white.
cv.Threshold(grey_image, grey_image, 70, 255, cv.CV_THRESH_BINARY)
# Dilate and erode to get people blobs
cv.Dilate(grey_image, grey_image, None, 18)
cv.Erode(grey_image, grey_image, None, 10)
storage = cv.CreateMemStorage(0)
contour = cv.FindContours(grey_image, storage, cv.CV_RETR_CCOMP, cv.CV_CHAIN_APPROX_SIMPLE)
points = []
movementArea = 0
while contour:
bound_rect = cv.BoundingRect(list(contour))
contour = contour.h_next()
# Compute the bounding points to the boxes that will be drawn
# on the screen
pt1 = (bound_rect[0], bound_rect[1])
pt2 = (bound_rect[0] + bound_rect[2], bound_rect[1] + bound_rect[3])
# Add this latest bounding box to the overall area that is being
# detected as movement
movementArea += ( ( pt2[0] - pt1[0] ) * ( pt2[1] - pt1[1] ) );
points.append(pt1)
points.append(pt2)
cv.Rectangle(color_image, pt1, pt2, cv.CV_RGB(255,0,0), 1)
self.movementEvent.putFrame(
movement = movementArea
,timestamp = cv.GetCaptureProperty(self.capture, cv.CV_CAP_PROP_POS_MSEC)
)
#if movementArea > self.movementThreshold:
# print 'MA: ' + repr(movementArea) + ' @ ' + repr(cv.GetCaptureProperty(self.capture, cv.CV_CAP_PROP_POS_MSEC))
if len(points):
center_point = reduce(lambda a, b: ((a[0] + b[0]) / 2, (a[1] + b[1]) / 2), points)
cv.Circle(color_image, center_point, 40, cv.CV_RGB(255, 255, 255), 1)
cv.Circle(color_image, center_point, 30, cv.CV_RGB(255, 100, 0), 1)
cv.Circle(color_image, center_point, 20, cv.CV_RGB(255, 255, 255), 1)
cv.Circle(color_image, center_point, 10, cv.CV_RGB(255, 100, 0), 1)
if self.xOutput == True:
cv.ShowImage("Target", color_image)
# Listen for ESC key
c = cv.WaitKey(7) % 0x100
if c == 27:
break
if __name__=="__main__":
t = Target()
try:
t.run()
#print 'TEST ONLY'
except KeyboardInterrupt:
print ''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment