Skip to content

Instantly share code, notes, and snippets.

@Eugeny
Last active March 7, 2023 11:40
Show Gist options
  • Save Eugeny/5e1d496b5b01ea5be0d6ddb631d7702e to your computer and use it in GitHub Desktop.
Save Eugeny/5e1d496b5b01ea5be0d6ddb631d7702e to your computer and use it in GitHub Desktop.
Frame accurate video reader - OpenCV VideoCapture replacement

OpenCV's VideoCapture is broken and hasn't been fixed for the last 5 years: opencv/opencv#9053

This is a PyAV based replacement. Unlike other implementations it can seek at any time.

How to use:

reader = VideoReader('video.mp4')
reader.seek(reader.total_frames - 100)  # frame number 
while True:
  frame = reader.read()
  if not frame:
    break
    
  # frame is an ndarray - do something with it
  print(f'frame {reader.position}: {frame}')
  
reader.close()
import av
class VideoReader:
def __init__(self, path):
self.container = av.container.open(path)
self.position = 0
self.stream = self.container.streams.video[0]
self.total_frames = self.stream.frames
self.seek(0)
def iter_frames(self):
for packet in self.container.demux(self.stream):
if packet.dts is None:
continue
for frame in packet.decode():
yield frame
def close(self):
self.container.close()
def read(self):
try:
frame = next(self.iter)
except StopIteration:
self.end = True
return None
self.position += 1
return frame.to_rgb().to_ndarray(format='bgr24')
def seek(self, frame):
pts = int(frame * self.stream.duration / self.stream.frames)
self.container.seek(pts, stream=self.stream)
for j, f in enumerate(self.iter_frames()):
if j > 100:
raise RuntimeError('Did not find target within 100 frames of seek')
if f.pts >= pts - 1:
break
self.end = False
self.position = frame
self.iter = iter(self.iter_frames())
@angelcarro
Copy link

This class was quite useful for me. However, I think I found some bugs.
seek() reads the requested frame from the stream and does not add this frame to the output of read(). In fact, when opening a file, the first frame is skipped with the call to self.seek(0).
One solution is to prepend the last read frame to self.iter, so the next call to read() provides the correct frame:

def seek(self, frame):
    pts = int(frame * self.stream.duration / self.stream.frames)
    self.container.seek(pts, stream=self.stream)
    self.iter = self.iter_frames()
    for j, f in enumerate(self.iter):
        if j > self.max_seek_search:
            raise RuntimeError(f'Did not find target within {self.max_seek_search} frames of seek')
        if f.pts >= pts - 1:
            self.end = False
            self.position = frame
            self.iter = itertools.chain([f], self.iter)
            return
    self.end = True
    self.position = -1

Other problem I found was with discarding the packets with dts set as None. In some videos I tested, the last frame was discarded. This is, the last packet had the dts set to None but also contained the last frame of the stream. I changed iter_frames() to avoid discarding any frame:

def iter_frames(self):
    for frame in self.container.decode(self.stream):
        yield frame

@petered
Copy link

petered commented Dec 21, 2022

Thanks for this @Eugeny and also @whizmo for your modifications. I've used this to make a version that just allows you to request frames directly without thinking about the current state of the reader. E.g.

reader = VideoReader(path="path/to/video.mp4')
frame = reader.request_frame(20)  # Ask for the 20th frame
cv2.imshow('frame', frame.image)
cv2.waitKey(1)

It should be fast when frames are requested in sequence, or when the requested frame was requested recently

Gist: https://gist.github.com/petered/db8e334c7aefdf367af1b11e6eefe733

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment