Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Encode proxies for DaVinci Resolve using FFMpeg
# Windows: Create a shortcut to a .bat file in 'shell:sendto' to call this script with files passed as arguments
# Use with 'save_proxy_clip_list.py' gist to quickly pull used timeline clips into FFMPEG.
# Use 'link_proxies.py' gist to relink proxies correctly.
# Bug in current Resolve release links clips wrongly to one or two proxies only.
# This assumes FFMPEG is on path.
import os, sys, shutil
import subprocess
import argparse
import pathlib
import winsound
from winsound import Beep
from datetime import datetime
ap = argparse.ArgumentParser(description='Watchfolder or manually queued Resolve proxy encodes',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
ap.add_argument('path', nargs='+',
help='Path of a file or a folder of files.')
ap.add_argument("-d", "--dryrun", required=False,
help="use to skip encoding for debug purposes")
ap.add_argument("-y", "--yes", required=False,
help="assume yes at any prompts")
args = ap.parse_args()
# Globals:
###################################################
# Vars
acceptable_exts = ['.mp4', '.mov', '.mxf']
encodable = []
skipped = []
cwd = os.getcwd()
proxy_path_root = "S:\\ProxyMedia"
###################################################
# Logging path
if not os.path.exists(proxy_path_root):
raise Exception(f"{proxy_path_root} does not exist. Please check write path.")
log_path = os.path.join(proxy_path_root, "ProxyEncoder.log")
def confirm(message):
answer = input(message + "\n")
print("\n")
if "y" in answer.lower():
return True
elif "n" in answer.lower():
return False
else:
print(f"Invalid response, '{answer}'. Please answer 'yes' or 'no'")
confirm(message)
def print_and_log(message, log_only=False, add_time=True):
if not log_only:
print(message)
with open(log_path, "a+") as changelog:
changelog.write(f"[{datetime.now()}] {message} \n")
def get_vids(filepaths):
try:
for path in filepaths:
print_and_log(f"Checking path: '{path}'", log_only=True)
if os.path.isfile(path):
if os.path.splitext(path)[1].casefold() in acceptable_exts:
print_and_log(f"Queued {path}")
# Get folder structure
p = pathlib.Path(path)
output_dir = os.path.join(proxy_path_root, os.path.dirname(p.relative_to(*p.parts[:1])))
# Make folder structure
os.makedirs(output_dir, exist_ok=True)
# Return full output file path
output_file = os.path.join(output_dir, os.path.splitext(os.path.basename(path))[0])
encodable.append({"source_media":path, "output_file":output_file})
else:
print_and_log(f"Skipping {path}") # todo: Directory scanning not working yet. Only works on files. "Too many values to unpack"
skipped.append(path)
elif os.path.isdir(path):
print(f"Scanning directory: {path}")
for root, files in os.walk(path, topdown=True):
for file in files:
get_vids(os.path.join(root, file))
except Exception as e:
print_and_log(e)
return skipped, encodable
def parse_list(file):
f = open(file, 'r')
lines = f.readlines()
lines = [i.strip() for i in lines]
return lines
def encode(src_path, output_path):
try:
print("\n")
filename = os.path.basename(src_path)
print_and_log(f"Encoding '{filename}'")
#-pix_fmt yuv422p10le -c:v prores_ks -profile:v 3 -vendor ap10 -vf scale=1280:720,fps=50/1
subprocess.run((f"ffmpeg.exe -y -i \"{src_path}\" -c:v dnxhd -profile:v dnxhr_sq -vf scale=1280:720,fps=50/1,format=yuv422p -c:a pcm_s16le -ar 48000 -v warning -stats -hide_banner \"{output_path}.mxf\""),
shell=True, stdout=subprocess.PIPE)
winsound.Beep(600, 200) # Success beep
return True
except Exception as e:
print_and_log("\n\n----------------------------", add_time=False)
print_and_log(e)
print_and_log("----------------------------\n\n", add_time=False)
print_and_log(f"Failed encoding: {src_path}")
winsound.Beep(375, 150) # Fail beep
if __name__ == "__main__":
new_encode = f"################# {datetime.now().strftime('%A, %d, %B, %y, %I:%M %p')} #################"
print_and_log(new_encode, log_only=True, add_time=False)
filepaths = args.path
for file in filepaths:
if ".txt" in os.path.splitext(file)[1]:
print(f"Parsing list from file '{file}'\n")
txt_file_paths = parse_list(file)
print(txt_file_paths)
filepaths.remove(file) # Remove the text file for processing
# Get encodable files from text clip list
skipped, encodable_from_txt = get_vids(txt_file_paths)
# Get any dirs, files passed for processing
skipped, encodable_loose = get_vids(filepaths)
# Combine encode lists
encodable = encodable_from_txt + encodable_loose
print(encodable)
for video in encodable:
print_and_log(f"Queued {video['source_media']}")
# Confirm encode
if not args.yes:
if not confirm("Encode the above files?"):
print("Aborting encode.")
sys.exit(1)
# Encode loose files
for file in encodable:
print(type(file))
if not args.dryrun:
if encode(file['source_media'], file['output_file']):
print_and_log(f"Successfully encoded: {file}")
print(f"Done encoding. Check log file: '{log_path}'")
# Finished jingle
for i in range(1, 10):
winsound.Beep(i * 100, 200)
# Save proxy clip list
# Currently no way exists to create proxies of clips used in active timeline.
# Can't right click or create smart bin to filter by usage and timeline.
# This creates a text file of clips from active timeline that encode_resolve_proxies can read.
# May change it to csv/json/yml someday.
# Don't forget to update clip_list_path
import os, sys
import traceback
import tkinter
import tkinter.messagebox
from tkinter import filedialog
from python_get_resolve import GetResolve
from datetime import datetime
clip_list_path = "S:\\ProxyMedia\\ClipLists"
def save_clip_list(clips):
# initial_dir = os.path.join(clip_list_path, project.GetName())
os.makedirs(clip_list_path, exist_ok=True)
root = tkinter.Tk()
root.withdraw()
f = filedialog.asksaveasfile(initialdir = clip_list_path, initialfile = f"{project.GetName()}_{timeline.GetName()}_cliplist.txt", title="Generate clip list",
filetypes = (("txt file", "*.txt"), ("all files", "*.*")))
if f is None:
exit(0)
# write project name as header
# f.write(f"{str(project.GetName())}\n")
# write clip list
for clip in clips:
f.write(f"{str(clip)}\n")
f.close()
def get_media_paths():
acceptable_exts = [".mov",".mp4",".mxf",".avi"]
media_paths = []
track_len = timeline.GetTrackCount("video")
print(f"Video track count: {track_len}")
for i in range(track_len):
items = timeline.GetItemListInTrack("video", i)
if items is None:
continue
for item in items:
for ext in acceptable_exts:
if ext.lower() in item.GetName().lower():
try:
media = item.GetMediaPoolItem()
path = media.GetClipProperty("File Path")
except:
print(f"Skipping {item.GetName()}, no linked media pool item.")
continue
if path not in media_paths:
media_paths.append(path)
media_paths = list(dict.fromkeys(media_paths))
# else:
# print(f"Skipping {item.GetName()}, not of type {ext}")
return media_paths
if __name__ == "__main__":
try:
# Get global variables
resolve = GetResolve()
global project
project = resolve.GetProjectManager().GetCurrentProject()
global timeline
timeline = project.GetCurrentTimeline()
clips = get_media_paths()
save_clip_list(clips)
except Exception as e:
tb = traceback.format_exc()
print(tb)
root = tkinter.Tk()
root.withdraw()
tkinter.messagebox.showinfo("ERROR", tb)
print("ERROR - " + str(e))
@in03
Copy link
Author

in03 commented May 21, 2021

Hey there! I've since fleshed this out to a full github repository. The requirements are a little different though. I've used Celery and a server running a RabbitMQ broker to distribute jobs instead of the more naïve file-based approach above.
https://github.com/in03/Resolve-Proxy-Encoder

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment