Created
August 23, 2023 17:51
-
-
Save nousr/f45805bf72a8d03cec702bece61afdf7 to your computer and use it in GitHub Desktop.
Comfy Deforum WIP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import websocket | |
import uuid | |
import json | |
import urllib.request | |
import urllib.parse | |
from random import randint | |
import click | |
from PIL import Image | |
import io | |
import os | |
from tqdm import trange | |
INPUT_FOLDER = "/home/nousr/github/comfy/input" | |
SERVER_ADDRESS = "127.0.0.1:8188" | |
CLIENT_ID = str(uuid.uuid4()) | |
@click.group() | |
def cli(): | |
pass | |
def update_image_init(image_path, prompt): | |
prompt["23"]["inputs"]["image"] = image_path | |
prompt["2"]["inputs"]["seed"] = randint(0, 1000000) | |
def extract_image(images_pkg): | |
""" | |
Extract output image from the returned images package | |
NOTE: Assumes only a single output image for now | |
""" | |
for _, image_datas in images_pkg.items(): | |
for image_data in image_datas: | |
image = Image.open(io.BytesIO(image_data)) | |
return image | |
def save_image(image, output_path, frame_idx=None): | |
""" | |
Save the output image to the output folder | |
""" | |
if isinstance(frame_idx, int): | |
output_path = os.path.join(output_path, f"{frame_idx:05d}.png") | |
image.save(output_path) | |
return image | |
@click.command() | |
@click.argument("workflow_file", type=click.Path(exists=True)) | |
@click.option( | |
"--output-folder", | |
help="Folder to save the output images", | |
) | |
@click.option( | |
"--init-image", default=None, help="Image to initialize the workflow with" | |
) | |
@click.option("--num-frames", default=12 * 5, help="Number of frames to generate") | |
def generate_images(workflow_file, output_folder, init_image, num_frames): | |
# load prompt workflow | |
with open(workflow_file, "r") as f: | |
prompt_text = f.read() | |
prompt = json.loads(prompt_text) | |
if init_image is not None: | |
update_image_init(init_image, prompt) | |
os.makedirs(output_folder, exist_ok=True) | |
# connect to server | |
ws = websocket.WebSocket() | |
ws.connect(f"ws://{SERVER_ADDRESS}/ws?clientId={CLIENT_ID}") | |
for frame_idx in trange(num_frames): | |
images_pkg = get_images(ws, prompt) | |
image = extract_image(images_pkg) | |
image = save_image(image, os.path.join(output_folder, ), frame_idx) | |
save_image(image, os.path.join(INPUT_FOLDER, "last_frame.png"), frame_idx=None) | |
update_image_init("last_frame.png", prompt) | |
def queue_prompt(prompt): | |
data = {"prompt": prompt, "client_id": CLIENT_ID} | |
req = urllib.request.Request( | |
f"http://{SERVER_ADDRESS}/prompt", data=json.dumps(data).encode("utf-8") | |
) | |
return json.loads(urllib.request.urlopen(req).read()) | |
def get_image(filename, subfolder, folder_type): | |
data = {"filename": filename, "subfolder": subfolder, "type": folder_type} | |
url_values = urllib.parse.urlencode(data) | |
with urllib.request.urlopen( | |
f"http://{SERVER_ADDRESS}/view?{url_values}" | |
) as response: | |
return response.read() | |
def get_history(prompt_id): | |
with urllib.request.urlopen( | |
f"http://{SERVER_ADDRESS}/history/{prompt_id}" | |
) as response: | |
return json.loads(response.read()) | |
def get_images(ws, prompt): | |
prompt_id = queue_prompt(prompt)["prompt_id"] | |
output_images = {} | |
while True: | |
out = ws.recv() | |
if isinstance(out, str): | |
message = json.loads(out) | |
if message["type"] == "executing": | |
data = message["data"] | |
if data["node"] is None and data["prompt_id"] == prompt_id: | |
break # Execution is done | |
else: | |
continue # Previews are binary data | |
history = get_history(prompt_id)[prompt_id] | |
for node_id in history["outputs"]: | |
node_output = history["outputs"][node_id] | |
if "images" in node_output: | |
images_output = [] | |
for image in node_output["images"]: | |
image_data = get_image( | |
image["filename"], image["subfolder"], image["type"] | |
) | |
images_output.append(image_data) | |
output_images[node_id] = images_output | |
return output_images | |
cli.add_command(generate_images) | |
if __name__ == "__main__": | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment