Skip to content

Instantly share code, notes, and snippets.

@tttamaki
Created August 4, 2022 23:24
Show Gist options
  • Save tttamaki/33770f5b4bb93dc99d9c3ac13cdc325c to your computer and use it in GitHub Desktop.
Save tttamaki/33770f5b4bb93dc99d9c3ac13cdc325c to your computer and use it in GitHub Desktop.
pyav decode: seek and clip by seconds
import av
import numpy as np
def load_from_file(
filename,
any_frame=False,
backward=True
):
with av.open(filename) as container:
stream = container.streams.video[0]
cnt = stream.codec_context
print("filename:", container.name)
if container.size > 1024 * 1024:
print("filesize [MB]:", container.size // 1024 // 1024)
else:
print("filesize [kB]:", container.size // 1024)
print("bit_rate [kb/s]:", float(container.bit_rate) / 1024)
print("container format name:", container.format.name)
print("container format long name:", container.format.long_name)
print("codec name:", cnt.codec.name)
print("codec long name:", cnt.codec.long_name)
print("codec tag:", cnt.codec_tag)
for md_str in container.metadata.keys():
print(f"container metadata {md_str}:", container.metadata[md_str])
print("container duration [sec]: "
f"{float(container.duration) / av.time_base:.3f}")
if stream.duration is not None:
print("stream duration[sec]: "
f"{float(stream.duration * stream.time_base): .3f}")
print("frames:", stream.frames)
print("guessed frames:", # still inaccurate
int(float(container.duration) / av.time_base * stream.base_rate))
print(f"frame size [pix]: {cnt.width}x{cnt.height}")
print("pix_fmt:", cnt.pix_fmt)
print(f"base_rate[fps]: "
f"{stream.base_rate}={float(stream.base_rate):.2f}")
print(f"rate[fps]: "
f"{cnt.rate}={float(cnt.rate):.2f}")
print("stream timebase:", stream.time_base)
print("dts/pts icrement:",
int(1 / (stream.base_rate * stream.time_base)))
start_time = 3.0
end_time = 3.5
#
# decode frame by frame
#
# seek
container.seek(
offset=int(start_time / stream.time_base),
any_frame=any_frame,
backward=backward,
stream=stream)
for frame in container.decode(stream):
if start_time <= frame.time:
break
# decode
frames = []
for frame in container.decode(stream):
if frame.time <= end_time:
frames.append(frame)
else:
break
frames = np.stack(frames, 0)
# sort (sometimes frame order is not correct)
frame_times = [f.time for f in frames]
frames = frames[np.argsort(frame_times)]
# frame sampling
n_frames = 8
frames = frames[np.linspace(0, len(frames) - 1, n_frames).astype(int)]
# check
for i, frame in enumerate(frames):
print(f"i:{i:3d}, time:{frame.time:.3f}, "
f"dts:{frame.dts:5d}, pts:{frame.pts:5d}, "
f"type:{frame.pict_type.name} ",
end="")
if frame.key_frame:
print("keyframe", end="")
print()
# img = frame.to_image()
# img.save(
# "{}.frames.{}.{}.jpg".format(
# container.name, frame.pts, i),
# quality=80,
# )
if __name__ == '__main__':
# long (untrimmed) videos
filenames = [
"zlVkeKC6Ha8.mp4", # AVA, mp4, h264/avc
"v_nHE7u40plD0.mkv", # ActivityNet, mkv, vp9
"001YG.mp4", # Charades, mp4, h264/avc
"-4wsuPCjDBc_5_15.avi", # MSVD, avi, h264
"s07-d72-cam-002.avi", # MPII Cooking 2, avi,msmpeg4v2/MP42
# # short (trimmed) videos
"-3B32lodo2M_000059_000069.mp4", # Kinetics, mp4, h264/avc
"v_ApplyEyeMakeup_g01_c01.avi", # UCF101, avi, mpeg4/XVID
"April_09_brush_hair_u_nm_np1_ba_goo_1.avi", # HMDB, avi, mpeg4/DX50
"200000.webm", # SSv2, webm, vp9
]
for filename in filenames:
print("==================")
try:
load_from_file(filename, any_frame=False, backward=True)
except BaseException:
continue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment