Last active February 7, 2024 23:21
Webcam Lightgun Tracking Script and Leap2 Lightgun Tracking Scripts
import numpy as np
import cv2
import dxcam
import win32api
import threading
import time
#from line_profiler import LineProfiler
class webcamThread(threading.Thread):
def __init__(self):
self.running = True
self.frame = None
self.new_frame = False
self.cap = cv2.VideoCapture(cv2.CAP_DSHOW)
self.cap.set(cv2.CAP_PROP_FPS, 60)
#self.cap.set(cv2.CAP_PROP_SETTINGS, 1)
def run(self):
ret, frame =
if ret:
self.frame = frame
self.new_frame = True
def kill(self):
self.running = False
class screenCaptureThread(threading.Thread):
def __init__(self):
self.running = True
self.frame = None
self.new_frame = False = dxcam.create()
def run(self):
while(self.running and
desktop_frame =
desktop_frame = cv2.cvtColor(desktop_frame, cv2.COLOR_RGB2BGR)
#desktop_frame = cv2.resize(desktop_frame, (1920//4, 1080//4))
canonicalImage = np.zeros((480, 640), dtype=np.uint8)
canonicalImage[60:420, :] = cv2.resize(cv2.cvtColor(desktop_frame, cv2.COLOR_BGR2GRAY), (640, 360))
self.frame = canonicalImage
self.new_frame = True
def kill(self):
self.running = False
def main():
draw_debug = True
flowPoints = []
prevGrayFrame = None
homographyFlowed = None
canonicalImage = None
canonicalCorners = None
warpedCanonical = None
prev_cursor_pos = None
#For subpixel drawing
shift = 5
factor = (1 << shift)
lk_params = dict( winSize = (21, 21),
maxLevel = 4,
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01))
flowPoints = ((np.float32([[0,60],[640,60],[640,420], [0,420]]) - np.float32([[320,240]])) * 1.0) + np.float32([[320,240]])
flowPoints = flowPoints.tolist()
canonicalCorners = flowPoints.copy()
canonicalCorners = np.float32(np.asarray(canonicalCorners))
cam_cap = webcamThread()
screen_cap = screenCaptureThread()
frame_num = 0
bad_frames = 0
running = True
# Capture frame-by-frame
if cam_cap.new_frame:
t0_all = time.perf_counter()
frame = cam_cap.frame
cam_cap.new_frame = False
height, width, channels = frame.shape
frame_num += 1
if screen_cap.new_frame:
screen_cap.new_frame = False
canonicalImage = screen_cap.frame.copy()
if warpedCanonical is None:
warpedCanonical = canonicalImage.copy()
if frame_num % 60 == 0:
flowPoints = flowPoints[:4]
t0_gftt = time.perf_counter()
corners = cv2.goodFeaturesToTrack(warpedCanonical, 200, 0.01, 5)
if draw_debug:
print("Good Features to Track: ", time.perf_counter() - t0_gftt)
for i in corners:
x,y = i.ravel()
flowPoints.append((float(x), float(y)))
if draw_debug:
cv2.imshow("Warped", canonicalImage)
# Flow Feature Points from Canonical Chip to Current Chip
curGrayFrame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
pointsToFlow = np.float32(np.asarray(flowPoints))
if(prevGrayFrame is not None and warpedCanonical is not None and len(flowPoints)>0):
t0_flow = time.perf_counter()
flowedPoints , _st , _err = cv2.calcOpticalFlowPyrLK(warpedCanonical, curGrayFrame, pointsToFlow, None, **lk_params)
reversedPoints, _st1, _err1 = cv2.calcOpticalFlowPyrLK(curGrayFrame, warpedCanonical, flowedPoints, None, **lk_params)
#if draw_debug:
# print("LKT Flow: ", time.perf_counter() - t0_flow)
# FILTER OUT BAD POINTS (ie off the side or high error)
bidirectionalError = abs(pointsToFlow-reversedPoints).reshape(-1, 2).max(-1)
goodMask = []
for i in range(bidirectionalError.shape[0]):
goodMask.append(bidirectionalError[i] < 10.0 and _st[i,0] == 1 and _st1[i,0] == 1)
# Create lists of only the good points
goodToFlow = []
goodFlowed = []
for (x1, y1), (x2, y2), status in zip(pointsToFlow, flowedPoints, goodMask):
if(status and not (x1 == x2 and y1 == y2)):
goodToFlow.append((x1, y1))
goodFlowed.append((x2, y2))
t0_homo = time.perf_counter()
goodToFlow = np.float32(np.asarray(goodToFlow))
goodFlowed = np.float32(np.asarray(goodFlowed))
homography, inliers = cv2.findHomography(goodToFlow, goodFlowed)#, cv2.RANSAC, 0.5)
if(homography is not None and len(inliers) > 12):
goodToFlow = np.float32(np.asarray(flowPoints).reshape(-1, 1, 2))
homographyFlowed = cv2.perspectiveTransform(goodToFlow, homography)
homographyFlowed = np.float32(homographyFlowed.reshape(-1, 2))
# Get Transform to/from the canonical image
flowedCorner = homographyFlowed[0:4]
canonicalHomography = cv2.getPerspectiveTransform(canonicalCorners, flowedCorner)
canonicalHomography_Inv = cv2.getPerspectiveTransform(flowedCorner, canonicalCorners)
warpedCanonical = cv2.warpPerspective(canonicalImage, canonicalHomography, (width, height))
if draw_debug:
cv2.imshow("Warped", warpedCanonical)
unwarpedCurrent = cv2.warpPerspective(curGrayFrame, canonicalHomography_Inv, (width, height))
##unwarpedDiff = cv2.subtract(unwarpedCurrent.astype(np.float32), canonicalImage.astype(np.float32))
unwarpedCurrent[:60 ,:] = 0
unwarpedCurrent[420:,:] = 0
##if unwarpedSmoothDiff is None:
## unwarpedSmoothDiff = unwarpedDiff.copy()
##unwarpedSmoothDiff += (unwarpedDiff - unwarpedSmoothDiff) * 0.1
cv2.imshow("Diff", unwarpedCurrent)
raw_cursor_pos = cv2.perspectiveTransform(np.float32([[[320.0,240.0]]]), canonicalHomography_Inv).reshape(2)
raw_cursor_pos[0] = (raw_cursor_pos[0] / 640.0) * 1920.0
raw_cursor_pos[1] = (raw_cursor_pos[1] / 480.0) * 1080.0
# Filter the Cursor Position
if prev_cursor_pos is None:
prev_cursor_pos = raw_cursor_pos.copy()
offset = (raw_cursor_pos - prev_cursor_pos)
offset_length = np.linalg.norm(offset)
if offset_length > 5.0:
offset *= max(offset_length-5.0, 0.0) / offset_length
offset *= 0.0
cursor_pos = prev_cursor_pos + offset
#cursor_pos = (raw_cursor_pos - cursor_pos) * 0.2 + cursor_pos
prev_cursor_pos = cursor_pos.copy()
for i, point in enumerate(homographyFlowed):
flowPoints[i] = point
bad_frames = 0
#if draw_debug:
# print("Homo: ", time.perf_counter() - t0_homo)
bad_frames += 1
bad_frames += 1
if bad_frames > 20:
flowPoints = ((np.float32([[0,60],[640,60],[640,420], [0,420]]) - np.float32([[320,240]])) * 1.0) + np.float32([[320,240]])
flowPoints = flowPoints.tolist()
canonicalCorners = flowPoints.copy()
canonicalCorners = np.float32(np.asarray(canonicalCorners))#.reshape(-1, 1, 2)
canonicalHomography = np.eye(3)
canonicalHomography_Inv = np.eye(3)
warpedCanonical = cv2.warpPerspective(canonicalImage, canonicalHomography, (width, height))
for i in cv2.goodFeaturesToTrack(warpedCanonical, 200, 0.01, 10):
x,y = i.ravel()
flowPoints.append((float(x), float(y)))
bad_frames = 0
prevGrayFrame = curGrayFrame.copy()
if draw_debug and frame_num % 10 == 0:
drawing_frame = frame.copy()
if(homographyFlowed is not None):
homographyFlowed = np.int32(homographyFlowed)
for i in range(3):
cv2.line(drawing_frame, (homographyFlowed[i][0], homographyFlowed[i][1]), (homographyFlowed[i+1][0], homographyFlowed[i+1][1]), (0,255,0), 1, cv2.LINE_AA)
cv2.line(drawing_frame, (homographyFlowed[3][0], homographyFlowed[3][1]), (homographyFlowed[0][0], homographyFlowed[0][1]), (0,255,0), 1, cv2.LINE_AA)
#for x, y in flowPoints:
#, (int(x*factor), int(y*factor)), int(3*factor), (0,0,0), -1, cv2.LINE_AA, shift)
#, (int(x*factor), int(y*factor)), int(1*factor), (255,255,255), -1, cv2.LINE_AA, shift)
# Display the resulting frame
cv2.imshow('image', drawing_frame)
print("All: ", time.perf_counter() - t0_all)
# cv2.imshow('image', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
except Exception as e:
print("Exception!", e)
running = False # Cancel out of the infinite loop
# When everything done, release the capture
if __name__ == "__main__":
#lp = LineProfiler()
import cv2
import numpy as np
import threading
import win32api
class camThread(threading.Thread):
def __init__(self):
self.frameWidth = 512
self.running = True
self.frame = None
self.new_frame = False
self.cap = cv2.VideoCapture(cv2.CAP_MSMF)
self.cap.set(cv2.CAP_PROP_FPS, 120)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH , self.frameWidth)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frameWidth)
self.cap.set(cv2.CAP_PROP_CONVERT_RGB, 0)
def run(self):
ret, frame =
#derived_framewidth = int(math.sqrt(frame.shape[1]//2))
frame = frame.reshape(self.frameWidth, self.frameWidth*2)
frame[:, 0:self.frameWidth ] = np.rot90(frame[:, 0:self.frameWidth ], 1)
frame[:, self.frameWidth:] = np.rot90(frame[:, self.frameWidth:], 3)
self.frame = frame
self.new_frame = True
def kill(self):
self.running = False
cap = camThread()
params = cv2.SimpleBlobDetector_Params()
params.minThreshold = 200
params.maxThreshold = 255
params.filterByArea = True
params.minArea = 15
params.filterByCircularity = True
params.minCircularity = 0.1
params.filterByConvexity = True
params.minConvexity = 0.87
params.filterByInertia = True
params.minInertiaRatio = 0.01
detector = cv2.SimpleBlobDetector_create(params)
while(not cv2.waitKey(1) & 0xFF == ord('q')):
if cap.new_frame:
raw_im = cap.frame
cap.new_frame = False
# Take only the left image
raw_im = raw_im[:,:raw_im.shape[1]//2]
keypoints = detector.detect(cv2.threshold(raw_im, 200, 255, cv2.THRESH_BINARY_INV)[1])
# Draw detected blobs as red circles.
# cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS ensures the size of the circle corresponds to the size of blob
im_with_keypoints = cv2.drawKeypoints(raw_im, keypoints, np.array([]), (0,0,255), cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
# Sort out Keypoints closest to the corners
keypoints = cv2.KeyPoint_convert(keypoints)
if len(keypoints) >= 4:
top_left_keypoint = keypoints[0]
top_right_keypoint = keypoints[1]
bottom_left_keypoint = keypoints[2]
bottom_right_keypoint = keypoints[3]
for keypoint in keypoints:
if np.linalg.norm(np.array([0,0]) - keypoint) < np.linalg.norm(np.array([0,0]) - top_left_keypoint):
top_left_keypoint = keypoint
if np.linalg.norm(np.array([raw_im.shape[1],0]) - keypoint) < np.linalg.norm(np.array([raw_im.shape[1],0]) - top_right_keypoint):
top_right_keypoint = keypoint
if np.linalg.norm(np.array([0,raw_im.shape[0]]) - keypoint) < np.linalg.norm(np.array([0,raw_im.shape[0]]) - bottom_left_keypoint):
bottom_left_keypoint = keypoint
if np.linalg.norm(np.array([raw_im.shape[1],raw_im.shape[0]]) - keypoint) < np.linalg.norm(np.array([raw_im.shape[1],raw_im.shape[0]]) - bottom_right_keypoint):
bottom_right_keypoint = keypoint
# Debug Draw lines between the corners
im_with_keypoints = cv2.line(im_with_keypoints, (int(top_left_keypoint [0]), int(top_left_keypoint [1])), (int(top_right_keypoint [0]), int(top_right_keypoint [1])), (0, 0, 255), 2)
im_with_keypoints = cv2.line(im_with_keypoints, (int(top_left_keypoint [0]), int(top_left_keypoint [1])), (int(bottom_left_keypoint[0]), int(bottom_left_keypoint[1])), (0, 0, 255), 2)
im_with_keypoints = cv2.line(im_with_keypoints, (int(bottom_right_keypoint[0]), int(bottom_right_keypoint[1])), (int(top_right_keypoint [0]), int(top_right_keypoint [1])), (0, 0, 255), 2)
im_with_keypoints = cv2.line(im_with_keypoints, (int(bottom_right_keypoint[0]), int(bottom_right_keypoint[1])), (int(bottom_left_keypoint[0]), int(bottom_left_keypoint[1])), (0, 0, 255), 2)
cv2.imshow('Keypoints', im_with_keypoints)
# Figure out roughly where the crosshairs sit between the four corners
base_corners = np.float32([[0,0],[512,0],[512,512], [0,512]])
canonicalHomography_Inv = cv2.getPerspectiveTransform(np.float32([top_left_keypoint, top_right_keypoint, bottom_right_keypoint, bottom_left_keypoint]), base_corners)
raw_cursor_pos = cv2.perspectiveTransform(np.float32([[[256.0,256.0]]]), canonicalHomography_Inv).reshape(2) / 512.0
win32api.SetCursorPos((int(raw_cursor_pos[0] * 1920.0),
int(raw_cursor_pos[1] * 1080.0)))
import cv2
import time
import win32api
import threading
import numpy as np
from multiprocessing import Process, Queue
class interpolationProcess():
def __init__(self):
self.sampleQueue = Queue(); self.runningQueue = Queue()
self.interpolation_process = Process(target=self.interpolation_process_func, args=(self.sampleQueue, self.runningQueue))
def interpolation_process_func(self, sample_queue : Queue, running_queue : Queue):
'''Run Interpolation Thread in another process so it is unburdened by the GIL in the main thread'''
latestSample = (0.01, np.array([0.0, 0.0]))
secondLatestSample = (0.00, np.array([0.0, 0.0]))
lastTime = time.perf_counter()
avg_delay = 0.0
# Dequeue the latest samples
while not sample_queue.empty():
sample = sample_queue.get(block=False)
if sample[0] > latestSample[0]:
secondLatestSample = latestSample
latestSample = sample
cur_time = time.perf_counter()
if cur_time - lastTime > 1.0/240.0:
if abs(cur_time - latestSample[0] < 0.8):
avg_delay = (avg_delay * 0.995) + ((cur_time - latestSample[0]) * 0.005)
alpha = ((cur_time-avg_delay) - secondLatestSample[0]) / (latestSample[0] - secondLatestSample[0])
interpolated_pos = secondLatestSample[1] * (1.0 - alpha) + latestSample[1] * alpha
win32api.SetCursorPos((int(interpolated_pos[0]), int(interpolated_pos[1])))
lastTime += 1.0/240.0
def kill(self):
self.runningQueue.put(1) # Terminates the Interpolation Process
class camThread(threading.Thread):
def __init__(self):
self.running = True
self.frame = None
self.new_frame = False
self.cap = cv2.VideoCapture(cv2.CAP_DSHOW)
self.cap.set(cv2.CAP_PROP_FPS, 60.0)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
#self.cap.set(cv2.CAP_PROP_FPS, 120.0)
#self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
#self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 800)
#self.cap.set(cv2.CAP_PROP_EXPOSURE, -10)
#self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0)
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc('M','J','P','G'))
self.cur_time = time.perf_counter()
self.framerate = float(self.cap.get(cv2.CAP_PROP_FPS))
print("CAMERA FPS", self.framerate)
def run(self):
while(self.running and self.cap.isOpened()):
ret, frame =
temp_curtime = time.perf_counter()
if ret:
#self.cur_time = temp_curtime
if abs(self.cur_time - temp_curtime) > 0.2:
print(temp_curtime - self.cur_time)
self.cur_time = temp_curtime
self.cur_time += 1.0/self.framerate
self.cur_time += ((time.perf_counter() - self.cur_time) * 0.005) # To prevent them from getting too misaligned
self.frame = frame
self.new_frame = True
def kill(self):
self.running = False
def set_exposure(val):
global cap
cap.cap.set(cv2.CAP_PROP_EXPOSURE, -val)
threshold = 128
def set_threshold(val):
global threshold
threshold = val
epsilon = 0.1
def set_epsilon(val):
global epsilon
epsilon = val/1000.0
closing = 15
def set_closing(val):
global closing
closing = val
if __name__ == '__main__':
cap = camThread()
interpolation = interpolationProcess()
settings_window = cv2.namedWindow("Settings", cv2.WINDOW_NORMAL)
trackbar1 = cv2.createTrackbar("Exposure" , "Settings", 10, 15, set_exposure)
trackbar2 = cv2.createTrackbar("Threshold", "Settings", 80, 255,set_threshold)
trackbar3 = cv2.createTrackbar("Epsilon" , "Settings", 1, 500, set_epsilon)
trackbar4 = cv2.createTrackbar("Closing" , "Settings", 1, 15, set_closing)
while(not cv2.waitKey(1) & 0xFF == ord('q')):
if not cap.running:
if cap.new_frame:
frame = cap.frame
cap.new_frame = False
current_time = cap.cur_time
t0 = time.perf_counter()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
#gray = cv2.remap(gray, undistortMap[0], undistortMap[1], cv2.INTER_LINEAR)
ret, thresh = cv2.threshold(gray, threshold, 255, 0)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, np.ones((closing,closing),np.uint8))
cv2.imshow('Thresh', cv2.resize(thresh, (thresh.shape[1]//2, thresh.shape[0]//2)))
contours, hierarchy = cv2.findContours(thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
cv2.imshow('Contours', cv2.resize(cv2.drawContours(np.zeros_like(gray), contours, -1, (255,255,0), 1), (gray.shape[1]//2, gray.shape[0]//2)))
# Get the Biggest Quad of Points
biggest_quad = None
biggest_area = 3000
canvas = np.zeros_like(gray)
for c in contours:
area = cv2.contourArea(contour=c, oriented=False)
eps = epsilon * cv2.arcLength(c, True)
approx = cv2.approxPolyDP(c, eps, True)
if len(approx) == 4 and area > biggest_area:
biggest_quad = approx
if biggest_quad is not None:
canvas = cv2.drawContours(canvas, [biggest_quad], -1, (255,0,0), 2)
cv2.imshow('Rectangles', cv2.resize(canvas, (canvas.shape[1]//2, gray.shape[0]//2)))
biggest_quad = biggest_quad.reshape(4,2)
if len(biggest_quad) >= 4:
top_left_keypoint = biggest_quad[0]
top_right_keypoint = biggest_quad[1]
bottom_left_keypoint = biggest_quad[2]
bottom_right_keypoint = biggest_quad[3]
for keypoint in biggest_quad:
if np.linalg.norm(np.array([0,0]) - keypoint) < np.linalg.norm(np.array([0,0]) - top_left_keypoint):
top_left_keypoint = keypoint
if np.linalg.norm(np.array([gray.shape[1],0]) - keypoint) < np.linalg.norm(np.array([gray.shape[1],0]) - top_right_keypoint):
top_right_keypoint = keypoint
if np.linalg.norm(np.array([0,gray.shape[0]]) - keypoint) < np.linalg.norm(np.array([0,gray.shape[0]]) - bottom_left_keypoint):
bottom_left_keypoint = keypoint
if np.linalg.norm(np.array([gray.shape[1],gray.shape[0]]) - keypoint) < np.linalg.norm(np.array([gray.shape[1],gray.shape[0]]) - bottom_right_keypoint):
bottom_right_keypoint = keypoint
# Debug Draw lines between the corners
rect_canvas = np.zeros_like(gray)
rect_canvas = cv2.line(rect_canvas, (int(top_left_keypoint [0]), int(top_left_keypoint [1])), (int(top_right_keypoint [0]), int(top_right_keypoint [1])), (255, 0, 255), 2)
rect_canvas = cv2.line(rect_canvas, (int(top_left_keypoint [0]), int(top_left_keypoint [1])), (int(bottom_left_keypoint[0]), int(bottom_left_keypoint[1])), (255, 0, 255), 2)
rect_canvas = cv2.line(rect_canvas, (int(bottom_right_keypoint[0]), int(bottom_right_keypoint[1])), (int(top_right_keypoint [0]), int(top_right_keypoint [1])), (255, 0, 255), 2)
rect_canvas = cv2.line(rect_canvas, (int(bottom_right_keypoint[0]), int(bottom_right_keypoint[1])), (int(bottom_left_keypoint[0]), int(bottom_left_keypoint[1])), (255, 0, 255), 2)
cv2.imshow('Keypoints', cv2.resize(rect_canvas, (rect_canvas.shape[1]//2, rect_canvas.shape[0]//2)))
# Figure out roughly where the crosshairs sit between the four corners
base_corners = np.float32([[0,0],[gray.shape[1],0],[gray.shape[1],gray.shape[0]], [0,gray.shape[0]]])
canonicalHomography_Inv = cv2.getPerspectiveTransform(np.float32([top_left_keypoint, top_right_keypoint, bottom_right_keypoint, bottom_left_keypoint]), base_corners)
raw_cursor_pos = cv2.perspectiveTransform(np.float32([[[gray.shape[1]/2,gray.shape[0]/2]]]), canonicalHomography_Inv).reshape(2)
raw_cursor_pos[0] = (raw_cursor_pos[0] / gray.shape[1]) * 1920.0
raw_cursor_pos[1] = (raw_cursor_pos[1] / gray.shape[0]) * 1080.0
interpolation.sampleQueue.put((current_time, raw_cursor_pos), False)
win32api.SetCursorPos((int(raw_cursor_pos[0]), int(raw_cursor_pos[1])))
#print("Computation took:", (time.perf_counter() - t0)*1000, "ms")
# Display the Marker Tracking Overlay
cv2.imshow('Frame', cv2.resize(frame, (frame.shape[1]//2, frame.shape[0]//2)))
