Skip to content

Instantly share code, notes, and snippets.

@Breakthrough
Created February 17, 2022 02:00
Show Gist options
  • Save Breakthrough/8aed9a77fd8b9a60fb37e984e33ea596 to your computer and use it in GitHub Desktop.
Save Breakthrough/8aed9a77fd8b9a60fb37e984e33ea596 to your computer and use it in GitHub Desktop.
DVR-Scan Multithread Benchmark
import threading
import time
import cv2
import numpy as np
import queue
class EndOfVideo:
pass
num_frames = 0
def decode_thread(path: str, frame_queue):
cap = cv2.VideoCapture(path)
while True:
ret_val, frame = cap.read()
if not ret_val:
frame_queue.put(EndOfVideo())
break
frame_queue.put(frame)
cap.release()
def process_thread(frame_queue, out_queue):
global num_frames
bg_subtractor = cv2.createBackgroundSubtractorMOG2(detectShadows=False)
kernel = np.ones((7, 7), np.uint8)
while True:
frame = frame_queue.get()
if isinstance(frame, EndOfVideo) and not out_queue is None:
out_queue.put(EndOfVideo())
break
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame_mask = bg_subtractor.apply(frame_gray)
frame_filt = cv2.morphologyEx(frame_mask, cv2.MORPH_OPEN, kernel)
frame_score = np.sum(frame_filt) / float(frame_filt.shape[0] * frame_filt.shape[1])
if frame_score >= 0.15 and not out_queue is None:
out_queue.put(frame)
num_frames += 1
def encode_thread(out_path, frame_queue):
vid_out = cv2.VideoWriter(
out_path, cv2.VideoWriter_fourcc('X', 'V', 'I', 'D'),
23.976, (1920, 1080))
while True:
frame = frame_queue.get()
if isinstance(frame, EndOfVideo):
break
vid_out.write(frame)
vid_out.release()
def run_single_threaded(path, out_path):
global num_frames
cap = cv2.VideoCapture(path)
vid_out = cv2.VideoWriter(
out_path, cv2.VideoWriter_fourcc('X', 'V', 'I', 'D'),
23.976, (1920, 1080))
bg_subtractor = cv2.createBackgroundSubtractorMOG2(detectShadows=False)
kernel = np.ones((7, 7), np.uint8)
while True:
ret_val, frame = cap.read()
if not ret_val:
break
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame_mask = bg_subtractor.apply(frame_gray)
frame_filt = cv2.morphologyEx(frame_mask, cv2.MORPH_OPEN, kernel)
frame_score = np.sum(frame_filt) / float(frame_filt.shape[0] * frame_filt.shape[1])
if frame_score >= 0.15:
vid_out.write(frame)
num_frames += 1
cap.release()
vid_out.release()
def run_multithreaded(path, out_path):
frame_queue = queue.Queue(10)
encode_queue = queue.Queue(10)
decode = threading.Thread(target=decode_thread, args=(path, frame_queue))
process = threading.Thread(target=process_thread, args=(frame_queue,encode_queue))
encode = threading.Thread(target=encode_thread, args=(out_path, encode_queue,))
decode.start()
process.start()
encode.start()
decode.join()
process.join()
encode.join()
if __name__ == "__main__":
vid_path = "tests/resources/simple_movement.mp4"
vid_write_path = "threading-test.avi"
start = time.time_ns()
run_single_threaded(vid_path, vid_write_path)
orig_num_frames = num_frames
end = time.time_ns()
time_single_threaded = end - start
num_frames = 0
start = time.time_ns()
run_multithreaded(vid_path, vid_write_path)
assert num_frames == orig_num_frames
end = time.time_ns()
time_multithreaded = end - start
assert num_frames > 0, "Processed no frames!"
def get_stats(num_frames, time_ns):
fps = (1000 * 1000 * num_frames) / (time_ns / 1000.0)
ms_per_frame = (time_ns / 1000) / (num_frames * 1000)
return fps, ms_per_frame
print("Benchmark Result, No Encoding: (processed %d frames):" % num_frames)
print(" Single-Thread: %3.1f FPS (%3.2f ms/frame)" % get_stats(num_frames, time_single_threaded))
print(" Multi-Threaded: %3.1f FPS (%3.2f ms/frame)" % get_stats(num_frames, time_multithreaded))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment