Skip to content

Instantly share code, notes, and snippets.

@jagin
Last active February 24, 2023 13:19
Show Gist options
  • Save jagin/caaca4171e88318f471c5b4244dceda7 to your computer and use it in GitHub Desktop.
Save jagin/caaca4171e88318f471c5b4244dceda7 to your computer and use it in GitHub Desktop.
Pipeline generator task capturing video frames (see: https://medium.com/deepvisionguru/video-processing-pipeline-with-opencv-ac10187d75b)
import cv2
from pipeline.pipeline import Pipeline
class CaptureVideo(Pipeline):
def __init__(self, src=0):
self.cap = cv2.VideoCapture(src)
if not self.cap.isOpened():
raise IOError(f"Cannot open video {src}")
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
super(CaptureVideo, self).__init__()
def generator(self):
image_idx = 0
while self.has_next():
ret, image = self.cap.read()
if not ret:
# no frames has been grabbed
break
data = {
"image_id": f"{image_idx:05d}",
"image": image,
}
if self.filter(data):
image_idx += 1
yield self.map(data)
def cleanup(self):
# Closes video file or capturing device
self.cap.release()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment