Skip to content

Instantly share code, notes, and snippets.

@shrimo
Last active October 10, 2023 11:42
Show Gist options
  • Save shrimo/f7dd9decb33f04766f8755445eac655a to your computer and use it in GitHub Desktop.
Save shrimo/f7dd9decb33f04766f8755445eac655a to your computer and use it in GitHub Desktop.
an attempt to speed up the calculation of the descriptor when communicating with ChatGPT
#!/usr/bin/python3
# read, akaze and show
import cv2
import multiprocessing
# Function to read video frames
def read_video_stream(video_file, frame_queue):
cap = cv2.VideoCapture(video_file)
if not cap.isOpened():
print("Error: Could not open video file.")
return
while True:
ret, frame = cap.read()
if not ret:
break
frame_queue.put(frame) # Put the frame in the queue
cap.release()
frame_queue.put(None) # Put None to signal the end of frames
# Function to detect feature points using SIFT
def detect_feature_points(frame_queue, keypoints_queue):
akaze = cv2.AKAZE_create()
while True:
frame = frame_queue.get()
if frame is None:
break
frame = cv2.resize(frame, (1280, 720))
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
keypoints = akaze.detect(gray_frame)
# Extracting only the coordinates of keypoints
keypoints_coordinates = [(kp.pt[0], kp.pt[1]) for kp in keypoints]
keypoints_queue.put((frame, keypoints_coordinates)) # Put frame and keypoints in the queue
# Function to display frames with detected keypoints
def display_frames_with_keypoints(keypoints_queue):
cv2.namedWindow("Video Stream with Keypoints", cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
while True:
frame, keypoints_coordinates = keypoints_queue.get()
if frame is None:
break
# Convert coordinates to cv2.KeyPoint objects
keypoints = [cv2.KeyPoint(x, y, 1) for x, y in keypoints_coordinates]
frame_with_keypoints = cv2.drawKeypoints(frame, keypoints, outImage=frame,
flags=cv2.DRAW_MATCHES_FLAGS_DEFAULT)
cv2.imshow("Video Stream with Keypoints", frame_with_keypoints)
if cv2.waitKey(25) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
if __name__ == '__main__':
video_file = '..//video/road.mp4' # Replace with the path to your video file
frame_queue = multiprocessing.Queue()
keypoints_queue = multiprocessing.Queue()
# Create a separate process for reading the video stream
video_process = multiprocessing.Process(target=read_video_stream, args=(video_file, frame_queue))
# Create a separate process for detecting feature points using SIFT
akaze_process = multiprocessing.Process(target=detect_feature_points, args=(frame_queue, keypoints_queue))
# Create a separate process for displaying frames with keypoints
display_process = multiprocessing.Process(target=display_frames_with_keypoints, args=(keypoints_queue,))
# Start all three processes
video_process.start()
akaze_process.start()
display_process.start()
# Wait for all three processes to finish
video_process.join()
akaze_process.join()
display_process.join()
#!/usr/bin/python3
# read and show
import cv2
import concurrent.futures
from queue import Queue
from typing import List, Tuple, Optional
import numpy as np
class VideoReader:
def __init__(self, video_file: str, frame_queue: Queue[Optional[np.ndarray]]):
self.video_file = video_file
self.frame_queue = frame_queue
def read_video_stream(self) -> None:
cap = cv2.VideoCapture(self.video_file)
if not cap.isOpened():
print("Error: Could not open video file.")
return
while True:
ret, frame = cap.read()
if not ret:
break
frame = cv2.resize(frame, (1280, 720))
self.frame_queue.put(frame) # Put the frame in the queue
cap.release()
self.frame_queue.put(None) # Put None to signal the end of frames
class FeatureDetector:
def __init__(self, frame_queue: Queue[Optional[np.ndarray]],
keypoints_queue: Queue[Tuple[Optional[np.ndarray], List[Tuple[float, float]]]]):
self.frame_queue = frame_queue
self.keypoints_queue = keypoints_queue
def detect_feature_points(self) -> None:
akaze = cv2.AKAZE_create()# type: ignore
while True:
frame = self.frame_queue.get()
if frame is None:
break
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
keypoints, _ = akaze.detectAndCompute(gray_frame, None)
# Extracting only the coordinates of keypoints
keypoints_coordinates = [(kp.pt[0], kp.pt[1]) for kp in keypoints]
self.keypoints_queue.put((frame, keypoints_coordinates)) # Put frame and keypoints in the queue
class FrameDisplayer:
def __init__(self, keypoints_queue: Queue[Tuple[Optional[np.ndarray], List[Tuple[float, float]]]]):
self.keypoints_queue = keypoints_queue
def display_frames_with_keypoints(self) -> None:
cv2.namedWindow("Video Stream with Keypoints", cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
while True:
frame, keypoints_coordinates = self.keypoints_queue.get()
if frame is None:
break
# Convert coordinates to cv2.KeyPoint objects
keypoints = [cv2.KeyPoint(x, y, 1) for x, y in keypoints_coordinates]
frame_with_keypoints = cv2.drawKeypoints(frame, keypoints, outImage=frame)
cv2.imshow("Video Stream with Keypoints", frame_with_keypoints)
if cv2.waitKey(25) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
def start() -> None:
video_file: str = '..//video/road.mp4' # Replace with the path to your video file
frame_queue: Queue[Optional[np.ndarray]] = Queue()
keypoints_queue: Queue[Tuple[Optional[np.ndarray], List[Tuple[float, float]]]] = Queue()
# Create instances of each class
video_reader: VideoReader = VideoReader(video_file, frame_queue)
feature_detector: FeatureDetector = FeatureDetector(frame_queue, keypoints_queue)
frame_displayer: FrameDisplayer = FrameDisplayer(keypoints_queue)
# Use ThreadPoolExecutor for multithreading
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks to the thread pool
executor.submit(video_reader.read_video_stream)
executor.submit(feature_detector.detect_feature_points)
executor.submit(frame_displayer.display_frames_with_keypoints)
print("All tasks completed.")
# start()
if __name__ == '__main__':
video_file: str = '..//video/road.mp4' # Replace with the path to your video file
frame_queue: Queue[Optional[np.ndarray]] = Queue()
keypoints_queue: Queue[Tuple[Optional[np.ndarray], List[Tuple[float, float]]]] = Queue()
# Create instances of each class
video_reader: VideoReader = VideoReader(video_file, frame_queue)
feature_detector: FeatureDetector = FeatureDetector(frame_queue, keypoints_queue)
frame_displayer: FrameDisplayer = FrameDisplayer(keypoints_queue)
# Use ThreadPoolExecutor for multithreading
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks to the thread pool
executor.submit(video_reader.read_video_stream)
executor.submit(feature_detector.detect_feature_points)
executor.submit(frame_displayer.display_frames_with_keypoints)
print("All tasks completed.")
import cv2
import numpy as np
from multiprocessing import Pool
# Function to detect ORB features and draw keypoints and descriptors in a tile with separate colors
def detect_and_draw_orb(frame, tile_x, tile_y, tile_width, tile_height, tile_color):
tile = frame[tile_y:tile_y+tile_height, tile_x:tile_x+tile_width]
# Initialize ORB detector
orb = cv2.ORB_create()
# Find the keypoints and descriptors in the tile
kp, des = orb.detectAndCompute(tile, None)
# Draw keypoints in blue and descriptors in the specified tile_color on the tile
tile_with_keypoints = cv2.drawKeypoints(tile, kp, None, color=(255, 0, 0), flags=0)
tile_with_descriptors = cv2.drawKeypoints(tile_with_keypoints, kp, None, color=tile_color,
flags=cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
frame[tile_y:tile_y+tile_height, tile_x:tile_x+tile_width] = cv2.rectangle(
tile_with_descriptors, (0, 0), (tile_width - 1, tile_height - 1), (0, 0, 255), 1)
return tile_with_descriptors
def process_frame(frame):
frame_height, frame_width, _ = frame.shape
num_tiles_x = 3 # Number of tiles horizontally
num_tiles_y = 3 # Number of tiles vertically
tile_width = frame_width // num_tiles_x
tile_height = frame_height // num_tiles_y
tiles = []
# Create a pool of worker processes for parallel processing
with Pool(processes=num_tiles_x * num_tiles_y) as pool:
for y in range(num_tiles_y):
for x in range(num_tiles_x):
tile_x = x * tile_width
tile_y = y * tile_height
# Assign a unique color for each tile
tile_color = tuple(int(255 * c) for c in np.random.rand(3))
# Use multiprocessing to detect ORB features and draw descriptors in each tile
result = pool.apply_async(detect_and_draw_orb, (frame, tile_x, tile_y, tile_width, tile_height, tile_color))
tiles.append(result)
pool.close()
pool.join()
# Combine the processed tiles back into the frame
for idx, tile_result in enumerate(tiles):
tile_x = (idx % num_tiles_x) * tile_width
tile_y = (idx // num_tiles_x) * tile_height
frame[tile_y:tile_y+tile_height, tile_x:tile_x+tile_width] = tile_result.get()
return frame
# Open a video capture
video_capture = cv2.VideoCapture('/home/mo/GitHub/OpticalCore/video/road.mp4') # Replace with your video file
while True:
ret, frame = video_capture.read()
if not ret:
break
processed_frame = process_frame(frame)
cv2.imshow('Video with ORB Detection', processed_frame)
if cv2.waitKey(10) & 0xFF == ord('q'):
break
video_capture.release()
cv2.destroyAllWindows()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment