Skip to content

Instantly share code, notes, and snippets.

@Expector-Hutch
Created October 10, 2024 03:53
Show Gist options
  • Select an option

  • Save Expector-Hutch/26e80713c079e90f2363b0091207a913 to your computer and use it in GitHub Desktop.

Select an option

Save Expector-Hutch/26e80713c079e90f2363b0091207a913 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 根据视频生成可以头尾衔接的循环视频\n",
"\n",
"最近手头有一个视频,基本上是一个部分重复循环的,我想把循环的一段提取出来造成动态壁纸,但怎奈何不会用 pr,只能用 ffmpeg 配合 pillow 搞了……\n",
"\n",
"实现的关键在于找到可以首位相接的两帧画面,这就要求两个画面有极高的相似度。判断画面相似度首先需要对图像进行量化,一般有两种方案,一种是提取特征向量,一种是计算哈希。考虑到我手头视频的特征,我选择了比较简单的哈希。"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"开始之前,需要安装必要的包以及 `ffmpeg`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install ffmpeg-python Pillow numpy imagehash"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [],
"source": [
"import ffmpeg as ff\n",
"import numpy as np\n",
"from PIL import Image\n",
"from imagehash import phash, ImageHash\n",
"\n",
"from typing import Generator"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"首先需要把视频分离成帧,存入数组"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [],
"source": [
"def get_video_resolution(input_video: str) -> tuple[int]:\n",
" # 获取视频信息\n",
" probe = ff.probe(input_video)\n",
" video_info = next(stream for stream in probe['streams'] if stream['codec_type'] == 'video')\n",
" width = int(video_info['width'])\n",
" height = int(video_info['height'])\n",
" return width, height"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [],
"source": [
"def extract_frames(input_video: str) -> Generator[np.ndarray, None, None]:\n",
" # 逐帧生成图像数组\n",
" width, height = get_video_resolution(input_video)\n",
"\n",
" process = (\n",
" ff\n",
" .input(input_video)\n",
" .output('pipe:', format='rawvideo', pix_fmt='rgb24')\n",
" .run_async(pipe_stdout=True)\n",
" )\n",
"\n",
" try:\n",
" while True:\n",
" # 读取一帧数据\n",
" in_bytes = process.stdout.read(3 * width * height) # 读取一帧的数据量\n",
" if not in_bytes:\n",
" break\n",
"\n",
" # 将字节流转换为 numpy 数组\n",
" frame = (\n",
" np\n",
" .frombuffer(in_bytes, np.uint8)\n",
" .reshape([height, width, 3])\n",
" )\n",
"\n",
" yield frame\n",
" finally:\n",
" process.stdout.close()\n",
" process.wait()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"图像的哈希算法有多种,比如均值哈希(aHash)、感知哈希(pHash)以及差异哈希(dHash),各有优劣,但选择哪一种对接下来的算法影响不大,我这里以 pHash 为例。以上图像哈希算法在 `imagehash` 中均有提供,由于本篇主要讨论循环视频生成,哈希算法的具体原理就不研究了(~~肯定不是因为我不会~~)。"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [],
"source": [
"def generate_hashes(input_video: str) -> Generator[int, None, None]:\n",
" for frame in extract_frames(input_video):\n",
" # 将 numpy 数组转换为 PIL 图像\n",
" pil_image: ImageHash = Image.fromarray(frame)\n",
"\n",
" # 生成 pHash 值\n",
" phash_value = int(str(phash(pil_image)), 16)\n",
"\n",
" yield phash_value"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"图像哈希越相似,图像就越相似。两个哈希值的相似度可以用汉明距离表示,汉明距离表两个二进制数差异的位数,可以通过异或和中 $1$ 的个数计算。"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [],
"source": [
"def hamming_distance(hash1: int, hash2: int) -> int:\n",
" return bin(hash1 ^ hash2).count('1')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"接下来遍历每一帧的哈希找到距离最近的两帧即可。遍历过程如果有确定起始或结束帧可以直接遍历,复杂度 $\\mathcal{O}(n)$;如果没有固定起始帧,根据汉明权重(也就是与 $0$ 的汉明距离)排序后遍历即可,复杂度 $\\mathcal{O}(n\\log n)$。\n",
"\n",
"下面的代码以第一帧为起始帧"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"def get_similar_frame(input_video: str) -> int:\n",
" video_hashes = generate_hashes(input_video)\n",
" first_hash = next(video_hashes)\n",
"\n",
" skip_frames = 60 # 最少间隔帧数\n",
" # 跳过指定数量的帧\n",
" for _ in range(skip_frames):\n",
" next(video_hashes)\n",
"\n",
" frame_cnt = 1 + skip_frames # 计数器,表示当前帧的索引\n",
" min_dis: int = float('inf') # 当前的最短距离,需要初始化为极大值\n",
" similar_frame: int = None # 表示最相似的帧的索引\n",
" for this_hash in video_hashes:\n",
" frame_cnt += 1\n",
" this_dis = hamming_distance(this_hash, first_hash)\n",
" if this_dis < min_dis:\n",
" min_dis = this_dis\n",
" similar_frame = frame_cnt\n",
"\n",
" # 如果找到完全相同的帧则提前结束循环\n",
" # 对特定类型的视频可以显著提升效率\n",
" if min_dis == 0:\n",
" break\n",
"\n",
" return similar_frame"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"最后导出视频就可以了"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"def export_first_n_frames(input_video: str, output_video: str, n: int) -> None:\n",
" (\n",
" ff\n",
" .input(input_video)\n",
" .filter_('select', f'lt(n,{n})')\n",
" .output(output_video, vframes=n, vcodec='libx264', pix_fmt='yuv420p')\n",
" .overwrite_output()\n",
" .run()\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [],
"source": [
"input_video_path, output_video_path = \"./tycpj.mp4\", \"./tycpj_out.mp4\"\n",
"\n",
"export_first_n_frames(input_video_path, output_video_path, get_similar_frame(input_video_path) - 1)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment