Last active
October 10, 2023 11:42
-
-
Save shrimo/f7dd9decb33f04766f8755445eac655a to your computer and use it in GitHub Desktop.
an attempt to speed up the calculation of the descriptor when communicating with ChatGPT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# read, akaze and show | |
import cv2 | |
import multiprocessing | |
# Function to read video frames | |
def read_video_stream(video_file, frame_queue): | |
cap = cv2.VideoCapture(video_file) | |
if not cap.isOpened(): | |
print("Error: Could not open video file.") | |
return | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_queue.put(frame) # Put the frame in the queue | |
cap.release() | |
frame_queue.put(None) # Put None to signal the end of frames | |
# Function to detect feature points using SIFT | |
def detect_feature_points(frame_queue, keypoints_queue): | |
akaze = cv2.AKAZE_create() | |
while True: | |
frame = frame_queue.get() | |
if frame is None: | |
break | |
frame = cv2.resize(frame, (1280, 720)) | |
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
keypoints = akaze.detect(gray_frame) | |
# Extracting only the coordinates of keypoints | |
keypoints_coordinates = [(kp.pt[0], kp.pt[1]) for kp in keypoints] | |
keypoints_queue.put((frame, keypoints_coordinates)) # Put frame and keypoints in the queue | |
# Function to display frames with detected keypoints | |
def display_frames_with_keypoints(keypoints_queue): | |
cv2.namedWindow("Video Stream with Keypoints", cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE) | |
while True: | |
frame, keypoints_coordinates = keypoints_queue.get() | |
if frame is None: | |
break | |
# Convert coordinates to cv2.KeyPoint objects | |
keypoints = [cv2.KeyPoint(x, y, 1) for x, y in keypoints_coordinates] | |
frame_with_keypoints = cv2.drawKeypoints(frame, keypoints, outImage=frame, | |
flags=cv2.DRAW_MATCHES_FLAGS_DEFAULT) | |
cv2.imshow("Video Stream with Keypoints", frame_with_keypoints) | |
if cv2.waitKey(25) & 0xFF == ord('q'): | |
break | |
cv2.destroyAllWindows() | |
if __name__ == '__main__': | |
video_file = '..//video/road.mp4' # Replace with the path to your video file | |
frame_queue = multiprocessing.Queue() | |
keypoints_queue = multiprocessing.Queue() | |
# Create a separate process for reading the video stream | |
video_process = multiprocessing.Process(target=read_video_stream, args=(video_file, frame_queue)) | |
# Create a separate process for detecting feature points using SIFT | |
akaze_process = multiprocessing.Process(target=detect_feature_points, args=(frame_queue, keypoints_queue)) | |
# Create a separate process for displaying frames with keypoints | |
display_process = multiprocessing.Process(target=display_frames_with_keypoints, args=(keypoints_queue,)) | |
# Start all three processes | |
video_process.start() | |
akaze_process.start() | |
display_process.start() | |
# Wait for all three processes to finish | |
video_process.join() | |
akaze_process.join() | |
display_process.join() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# read and show | |
import cv2 | |
import concurrent.futures | |
from queue import Queue | |
from typing import List, Tuple, Optional | |
import numpy as np | |
class VideoReader: | |
def __init__(self, video_file: str, frame_queue: Queue[Optional[np.ndarray]]): | |
self.video_file = video_file | |
self.frame_queue = frame_queue | |
def read_video_stream(self) -> None: | |
cap = cv2.VideoCapture(self.video_file) | |
if not cap.isOpened(): | |
print("Error: Could not open video file.") | |
return | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame = cv2.resize(frame, (1280, 720)) | |
self.frame_queue.put(frame) # Put the frame in the queue | |
cap.release() | |
self.frame_queue.put(None) # Put None to signal the end of frames | |
class FeatureDetector: | |
def __init__(self, frame_queue: Queue[Optional[np.ndarray]], | |
keypoints_queue: Queue[Tuple[Optional[np.ndarray], List[Tuple[float, float]]]]): | |
self.frame_queue = frame_queue | |
self.keypoints_queue = keypoints_queue | |
def detect_feature_points(self) -> None: | |
akaze = cv2.AKAZE_create()# type: ignore | |
while True: | |
frame = self.frame_queue.get() | |
if frame is None: | |
break | |
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
keypoints, _ = akaze.detectAndCompute(gray_frame, None) | |
# Extracting only the coordinates of keypoints | |
keypoints_coordinates = [(kp.pt[0], kp.pt[1]) for kp in keypoints] | |
self.keypoints_queue.put((frame, keypoints_coordinates)) # Put frame and keypoints in the queue | |
class FrameDisplayer: | |
def __init__(self, keypoints_queue: Queue[Tuple[Optional[np.ndarray], List[Tuple[float, float]]]]): | |
self.keypoints_queue = keypoints_queue | |
def display_frames_with_keypoints(self) -> None: | |
cv2.namedWindow("Video Stream with Keypoints", cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE) | |
while True: | |
frame, keypoints_coordinates = self.keypoints_queue.get() | |
if frame is None: | |
break | |
# Convert coordinates to cv2.KeyPoint objects | |
keypoints = [cv2.KeyPoint(x, y, 1) for x, y in keypoints_coordinates] | |
frame_with_keypoints = cv2.drawKeypoints(frame, keypoints, outImage=frame) | |
cv2.imshow("Video Stream with Keypoints", frame_with_keypoints) | |
if cv2.waitKey(25) & 0xFF == ord('q'): | |
break | |
cv2.destroyAllWindows() | |
def start() -> None: | |
video_file: str = '..//video/road.mp4' # Replace with the path to your video file | |
frame_queue: Queue[Optional[np.ndarray]] = Queue() | |
keypoints_queue: Queue[Tuple[Optional[np.ndarray], List[Tuple[float, float]]]] = Queue() | |
# Create instances of each class | |
video_reader: VideoReader = VideoReader(video_file, frame_queue) | |
feature_detector: FeatureDetector = FeatureDetector(frame_queue, keypoints_queue) | |
frame_displayer: FrameDisplayer = FrameDisplayer(keypoints_queue) | |
# Use ThreadPoolExecutor for multithreading | |
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: | |
# Submit tasks to the thread pool | |
executor.submit(video_reader.read_video_stream) | |
executor.submit(feature_detector.detect_feature_points) | |
executor.submit(frame_displayer.display_frames_with_keypoints) | |
print("All tasks completed.") | |
# start() | |
if __name__ == '__main__': | |
video_file: str = '..//video/road.mp4' # Replace with the path to your video file | |
frame_queue: Queue[Optional[np.ndarray]] = Queue() | |
keypoints_queue: Queue[Tuple[Optional[np.ndarray], List[Tuple[float, float]]]] = Queue() | |
# Create instances of each class | |
video_reader: VideoReader = VideoReader(video_file, frame_queue) | |
feature_detector: FeatureDetector = FeatureDetector(frame_queue, keypoints_queue) | |
frame_displayer: FrameDisplayer = FrameDisplayer(keypoints_queue) | |
# Use ThreadPoolExecutor for multithreading | |
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: | |
# Submit tasks to the thread pool | |
executor.submit(video_reader.read_video_stream) | |
executor.submit(feature_detector.detect_feature_points) | |
executor.submit(frame_displayer.display_frames_with_keypoints) | |
print("All tasks completed.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
import numpy as np | |
from multiprocessing import Pool | |
# Function to detect ORB features and draw keypoints and descriptors in a tile with separate colors | |
def detect_and_draw_orb(frame, tile_x, tile_y, tile_width, tile_height, tile_color): | |
tile = frame[tile_y:tile_y+tile_height, tile_x:tile_x+tile_width] | |
# Initialize ORB detector | |
orb = cv2.ORB_create() | |
# Find the keypoints and descriptors in the tile | |
kp, des = orb.detectAndCompute(tile, None) | |
# Draw keypoints in blue and descriptors in the specified tile_color on the tile | |
tile_with_keypoints = cv2.drawKeypoints(tile, kp, None, color=(255, 0, 0), flags=0) | |
tile_with_descriptors = cv2.drawKeypoints(tile_with_keypoints, kp, None, color=tile_color, | |
flags=cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS) | |
frame[tile_y:tile_y+tile_height, tile_x:tile_x+tile_width] = cv2.rectangle( | |
tile_with_descriptors, (0, 0), (tile_width - 1, tile_height - 1), (0, 0, 255), 1) | |
return tile_with_descriptors | |
def process_frame(frame): | |
frame_height, frame_width, _ = frame.shape | |
num_tiles_x = 3 # Number of tiles horizontally | |
num_tiles_y = 3 # Number of tiles vertically | |
tile_width = frame_width // num_tiles_x | |
tile_height = frame_height // num_tiles_y | |
tiles = [] | |
# Create a pool of worker processes for parallel processing | |
with Pool(processes=num_tiles_x * num_tiles_y) as pool: | |
for y in range(num_tiles_y): | |
for x in range(num_tiles_x): | |
tile_x = x * tile_width | |
tile_y = y * tile_height | |
# Assign a unique color for each tile | |
tile_color = tuple(int(255 * c) for c in np.random.rand(3)) | |
# Use multiprocessing to detect ORB features and draw descriptors in each tile | |
result = pool.apply_async(detect_and_draw_orb, (frame, tile_x, tile_y, tile_width, tile_height, tile_color)) | |
tiles.append(result) | |
pool.close() | |
pool.join() | |
# Combine the processed tiles back into the frame | |
for idx, tile_result in enumerate(tiles): | |
tile_x = (idx % num_tiles_x) * tile_width | |
tile_y = (idx // num_tiles_x) * tile_height | |
frame[tile_y:tile_y+tile_height, tile_x:tile_x+tile_width] = tile_result.get() | |
return frame | |
# Open a video capture | |
video_capture = cv2.VideoCapture('/home/mo/GitHub/OpticalCore/video/road.mp4') # Replace with your video file | |
while True: | |
ret, frame = video_capture.read() | |
if not ret: | |
break | |
processed_frame = process_frame(frame) | |
cv2.imshow('Video with ORB Detection', processed_frame) | |
if cv2.waitKey(10) & 0xFF == ord('q'): | |
break | |
video_capture.release() | |
cv2.destroyAllWindows() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment