Skip to content

Instantly share code, notes, and snippets.

@baardev
Created February 1, 2024 19:20
Show Gist options
  • Save baardev/3a430a46604d01b2b0ba1bf2b2afbb7b to your computer and use it in GitHub Desktop.
Save baardev/3a430a46604d01b2b0ba1bf2b2afbb7b to your computer and use it in GitHub Desktop.
#!/bin/env python
from urllib import request
import os, sys, getopt
from glob import glob
import tempfile as tmp
import shutil
from pathlib import Path
import more_itertools
from colorama import init, Fore
import subprocess
import websocket #! NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
import uuid
import json
import urllib.request
import urllib.parse
from pprint import pprint
import calendar
import time
init()
server_address = "127.0.0.1:8188"
client_id = str(uuid.uuid4())
def split_path(pstr):
dirname = os.path.dirname(pstr)
if dirname == "" or dirname == ".":
dirname = os.getcwd()
basename = os.path.basename(pstr)
ns = basename.split(".")
ext = ns[-1]
nameonly = "".join(ns[:-1])
fullpath = f"{dirname}/{basename}"
return {
"dirname": dirname,
"basename": basename,
"ext": ext,
"nameonly": nameonly,
"fullpath": fullpath,
}
def tryit(kwargs, arg, default):
try:
rs = kwargs[arg]
except:
rs = default
return rs
def prunlive(cmd, **kwargs):
# print("+++++++++++++",cmd)
debug = tryit(kwargs, "debug", False)
dryrun = tryit(kwargs, "dryrun", False)
# cmd = str(cmd).replace("~","\xC2\xA0")
if dryrun == "print":
print(Fore.YELLOW + cmd + Fore.RESET)
return
# cmd = cmd.replace("~", "X")
# cmd = cmd.replace("~", "\u00A0")
scmd = cmd.split()
# print("===========", scmd)
for i in range(len(scmd)):
scmd[i] = scmd[i].replace("~", " ")
scmd[i] = scmd[i].replace('"', "")
if debug:
print(Fore.YELLOW + cmd + Fore.RESET)
# pprint(scmd)
process = subprocess.Popen(scmd, stdout=subprocess.PIPE)
for line in process.stdout:
print(Fore.RED, end="")
sys.stdout.write(line.decode("utf-8"))
print(Fore.RESET, end="")
def get_timestamp():
current_GMT = time.gmtime()
time_stamp = calendar.timegm(current_GMT)
return time_stamp
#! https://github.com/comfyanonymous/ComfyUI/blob/master/script_examples/websockets_api_example.py
def get_history(prompt_id):
with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response:
return json.loads(response.read())
def get_queue_id():
prompt_id = queue_prompt(prompt)['prompt_id']
def queue_prompt(prompt):
p = {"prompt": prompt, "client_id": client_id}
data = json.dumps(p).encode('utf-8')
req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
return json.loads(urllib.request.urlopen(req).read())
#! https://github.com/comfyanonymous/ComfyUI/blob/master/script_examples/basic_api_example.py
def showhelp():
print("help")
rs = """
-h, --help show help
-v, --video
-d, --debug
-s, --stage 'prep'|'anim'|'merge'
-x, --experimental 'fswap`
"""
exit()
# v ────────────────────────────────────────────────────────────────────────────────────────────────────────────
# [ get args
argv = sys.argv[1:]
try:
opts, args = getopt.getopt(
argv,
"hv:ds:a:x:",
[ 'help',
'video=',
'debug',
'stage=',
'atmpl=',
'experimental=',
],
)
except Exception as e:
print(str(e))
mtype = False
# pfile = False # "/home/jw/src/sdc/settings/COMFY/monsters_API.json"
stage = False
debug = False
video = "/home/jw/Videos/0_ORIGINAL.mp4"
anim_template_name="anim_pre_workflow_02_API.json"
experimental = False
for opt, arg in opts:
if opt in ("-h", "--help"):showhelp()
if opt in ("-v", "--video"):video = arg
if opt in ("-d", "--debug"):debug = True
if opt in ("-s", "--stage"):stage = arg
if opt in ("-a", "--atmpl"):anim_template_name = arg
if opt in ("-x", "--experimental"):experimental = arg
if stage == False:
print("-s, --stage missing")
showhelp()
groupsize = 26 # anim
interpx = 6 # merge
# groupsize = 16 # anim
# interpx = 4 # merge
extract_count = 0 # 0 = all
overlap = 0 # deprecated
fps = 8
os.environ["TMPDIR"] = "/fstmp"
settings_dir = "/home/jw/src/sdc/settings/COMFY"
output_dir = "/home/jw/src/ComfyUI/output"
prep_template_name="p_ZZ_API.json"
mp4_output_wc = "p_??_*.mp4"
def maketmpname(str,**kwargs):
"""
Create a temp filename that is prefixed with a string.
Optionally, create a directory
"""
create = tryit(kwargs,'create',False)
paths = split_path(tmp.mktemp())
newname = f"{paths['dirname']}/{str}_{paths['basename']}"
if create == True:
os.mkdir(newname)
return(newname)
def get_sorted_files(spec):
"""
return a sorted list of filenames
"""
files = glob(spec)
files = sorted(files)
return files
def get_video_frames(video, **kwargs):
"""
extract the frames of a video and return a sorted list of filenames
"""
count = tryit(kwargs,'count',0)
debug = tryit(kwargs,'debug',False)
tmpdir = maketmpname("EXT",create=True)
if debug: print(f"made dir: {tmpdir}")
cmd = f"ffmpeg -y -loglevel warning -i {video} -r {fps}/1 {tmpdir}/%04d.png"
prunlive(cmd, debug = debug)
files = get_sorted_files(f"{tmpdir}/*png")
if count == 0:
return files
else:
return files[:count]
def cleandir(dir):
"""
delete all files in a dir
"""
files = get_sorted_files(dir)
print(files)
for f in files:
if os.path.isfile(f):
os.unlink(f)
if os.path.isdir(f):
shutil.rmtree(f)
def createInterps(last,first,**kwargs):
"""
Create n interpoalted frames based on two existing frames.
Return a list of newly created frames
If debug==True, adds a red dot to teh interpolated frames
"""
interpx = tryit(kwargs,'interpx',8)
debug = tryit(kwargs,'debug',False)
#! make target dirs
indir = maketmpname("IN",create=True)
if debug: print(f"made indir: {indir}")
outdir = maketmpname("OUT",create=True)
if debug: print(f"made outdir: {outdir}")
#! copy first and last images to targets
if debug: print(f"copying [{last}] => [{indir}/1.png]")
shutil.copy(last,indir+"/1.png")
if debug: print(f"copying [{first}] => [{indir}/2.png]")
shutil.copy(first,indir+"/2.png")
#! call interp script (runs in conda env 'rife')
cmd = f"/home/jw/src/rife/simple_interp.sh {indir} {outdir} {interpx}"
prunlive(cmd,debug=debug)
files = get_sorted_files(f"{outdir}/*.png")
if debug == True:
#! for debugging, mark interp images with icon
from PIL import Image, ImageDraw, ImageFilter
im2 = Image.open('/home/jw/share/dot512.png')
for file in files:
im1 = Image.open(file)
back_im1 = im1.copy()
back_im1.paste(im2, (10,10))
back_im1.save(file, quality=95)
return files
def extract_video(video):
"""
Extracts the frames from the generated video clips in subfolder specific to the video
"""
parts = split_path(video)
exdir = maketmpname(parts["nameonly"],create=True)
cmd = f"ffmpeg -y -loglevel panic -i {video} -r {fps}/1 {exdir}/%04d.png"
prunlive(cmd, debuf=True)
files = get_sorted_files(f"{exdir}/*.png")
return files
def wait_until_finished(prompt_id):
history = {}
while(len(history)==0):
history = get_history(prompt_id)
print(".", end="", flush=True)
time.sleep(5)
print("\n")
return True
def save_prompt(prompt,str,n):
data = json.dumps(prompt, indent=4)
with open(f"/home/jw/src/ComfyUI/output/__{str}_{n}.json", "w") as f:
f.write(data)
def update_template(prompt_text, i):
prompt_text = prompt_text.replace("HED_00", f"HED_{i:02d}")
prompt_text = prompt_text.replace("POSE_00", f"POSE_{i:02d}")
prompt_text = prompt_text.replace("SEG_00", f"SEG_{i:02d}")
return json.loads(prompt_text)
if __name__ == "__main__":
timestamp = get_timestamp()
cleandir(f"{os.environ['TMPDIR']}/*")
files = get_video_frames(video, count=extract_count)
tdirs = []
#! now split into groups
fgroups = list(more_itertools.chunked(files, groupsize))
print(f"Created [{len(fgroups)}] groups of [{groupsize}] from [{len(files)}] frames")
if stage == "anim": cleandir(f"{output_dir}/p_*")
for i in range(0,len(fgroups)-1):
print(Fore.MAGENTA + f"██████████████████████████████████████████████ {i}/{len(fgroups)-1} " + Fore.RESET)
if stage == "prep":
print(Fore.YELLOW + f"██████████████████████████████████████████████ PREPPING " + Fore.RESET)
#! load and run prep template prompt, This creates the ControNet input images in 'output_dir'
prompt_text = Path(f"{settings_dir}/{prep_template_name}").read_text()
prompt = update_template(prompt_text,i)
prompt_id = queue_prompt(prompt)['prompt_id']
#! manually move output dirs to input
#! manually move output dirs to input
if stage == "anim":
print(Fore.YELLOW + f"██████████████████████████████████████████████ ANIMATING " + Fore.RESET)
#! create folder by group; 00, 01, 02, ...
tmpdir = f"{os.environ['TMPDIR']}/{i:02d}"
print(f"Target: {tmpdir}")
os.mkdir(tmpdir)
tdirs.append(tmpdir)
#! copy and renname/renumber subset of files to folder
for j in range(0,len(fgroups[i])):
shutil.copy(fgroups[i][j], f"{tmpdir}/{(j+1):03d}.png")
#! submit the prompt
prompt_text = Path(f"{settings_dir}/{anim_template_name}").read_text() #! load template
prompt = update_template(prompt_text,i)
#! adjust accordingly
prompt['9']['inputs']['batch_size'] = groupsize
prompt['55']['inputs']['image_load_cap'] = groupsize
prompt['119']['inputs']['frame_rate'] = fps
prompt['133']['inputs']['image_load_cap'] = groupsize
prompt['137']['inputs']['image_load_cap'] = groupsize
#! this is where we assign the new seed image to the last image of the previous group
if experimental == "fswap":
if i > 0:
newseed = f"{tdirs[i-1]}/{groupsize:03d}.png"
prompt['30']['inputs']['image']=newseed
if debug:
print(f"Setting seed image to [{newseed}]")
save_prompt(prompt,stage,i)
prompt_id = queue_prompt(prompt)['prompt_id']
#! WAIT FOR THE QUEUE TO COMLETE
wait_until_finished(prompt_id)
if stage == "merge":
print(Fore.YELLOW+f"██████████████████████████████████████████████ MERGING "+Fore.RESET)
allfiles = [] # stub for storage of all files
exdirimgs = [] # stub for storage of all clip files
files = get_sorted_files(f"{output_dir}/{mp4_output_wc}")
# pprint(files)
# exit()
for file in files:
exdirimgs.append(extract_video(file))
# exit()
# pprint(exdirimgs)
# exit()
for i in range(len(exdirimgs)-1):
allfiles.append(exdirimgs[i])
last = exdirimgs[i][-1]
first = exdirimgs[i+1][0]
print(f"Interpolating: {last} <=> {first} {interpx}x")
newfiles_ary = createInterps(last,first, interpx=interpx,debug=debug)
#! add the new files to the end of ary
allfiles.append(newfiles_ary)
#! now rename in sequence
tmpdir = maketmpname("FINAL",create=True)
k = 0
for group in allfiles:
for filename in group:
shutil.move(filename,f"{tmpdir}/{k:03d}.png")
k += 1
cmd = f"ffmpeg -y -loglevel warning -hide_banner -hwaccel auto -y -framerate {fps} -pattern_type glob -i {tmpdir}/*.png -r {fps} -vcodec libx264 -preset medium -crf 23 -vf minterpolate=mi_mode=blend,fifo -pix_fmt yuv420p -movflags +faststart {output_dir}/final.mp4"
prunlive(cmd,debug=debug)
print("FINAL VIDEO: "+Fore.CYAN+f"{output_dir}/final.mp4"+Fore.RESET)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment