Skip to content

Instantly share code, notes, and snippets.

@nousr
Created August 23, 2023 17:51
Show Gist options
  • Save nousr/f45805bf72a8d03cec702bece61afdf7 to your computer and use it in GitHub Desktop.
Save nousr/f45805bf72a8d03cec702bece61afdf7 to your computer and use it in GitHub Desktop.
Comfy Deforum WIP
import websocket
import uuid
import json
import urllib.request
import urllib.parse
from random import randint
import click
from PIL import Image
import io
import os
from tqdm import trange
INPUT_FOLDER = "/home/nousr/github/comfy/input"
SERVER_ADDRESS = "127.0.0.1:8188"
CLIENT_ID = str(uuid.uuid4())
@click.group()
def cli():
pass
def update_image_init(image_path, prompt):
prompt["23"]["inputs"]["image"] = image_path
prompt["2"]["inputs"]["seed"] = randint(0, 1000000)
def extract_image(images_pkg):
"""
Extract output image from the returned images package
NOTE: Assumes only a single output image for now
"""
for _, image_datas in images_pkg.items():
for image_data in image_datas:
image = Image.open(io.BytesIO(image_data))
return image
def save_image(image, output_path, frame_idx=None):
"""
Save the output image to the output folder
"""
if isinstance(frame_idx, int):
output_path = os.path.join(output_path, f"{frame_idx:05d}.png")
image.save(output_path)
return image
@click.command()
@click.argument("workflow_file", type=click.Path(exists=True))
@click.option(
"--output-folder",
help="Folder to save the output images",
)
@click.option(
"--init-image", default=None, help="Image to initialize the workflow with"
)
@click.option("--num-frames", default=12 * 5, help="Number of frames to generate")
def generate_images(workflow_file, output_folder, init_image, num_frames):
# load prompt workflow
with open(workflow_file, "r") as f:
prompt_text = f.read()
prompt = json.loads(prompt_text)
if init_image is not None:
update_image_init(init_image, prompt)
os.makedirs(output_folder, exist_ok=True)
# connect to server
ws = websocket.WebSocket()
ws.connect(f"ws://{SERVER_ADDRESS}/ws?clientId={CLIENT_ID}")
for frame_idx in trange(num_frames):
images_pkg = get_images(ws, prompt)
image = extract_image(images_pkg)
image = save_image(image, os.path.join(output_folder, ), frame_idx)
save_image(image, os.path.join(INPUT_FOLDER, "last_frame.png"), frame_idx=None)
update_image_init("last_frame.png", prompt)
def queue_prompt(prompt):
data = {"prompt": prompt, "client_id": CLIENT_ID}
req = urllib.request.Request(
f"http://{SERVER_ADDRESS}/prompt", data=json.dumps(data).encode("utf-8")
)
return json.loads(urllib.request.urlopen(req).read())
def get_image(filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
with urllib.request.urlopen(
f"http://{SERVER_ADDRESS}/view?{url_values}"
) as response:
return response.read()
def get_history(prompt_id):
with urllib.request.urlopen(
f"http://{SERVER_ADDRESS}/history/{prompt_id}"
) as response:
return json.loads(response.read())
def get_images(ws, prompt):
prompt_id = queue_prompt(prompt)["prompt_id"]
output_images = {}
while True:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message["type"] == "executing":
data = message["data"]
if data["node"] is None and data["prompt_id"] == prompt_id:
break # Execution is done
else:
continue # Previews are binary data
history = get_history(prompt_id)[prompt_id]
for node_id in history["outputs"]:
node_output = history["outputs"][node_id]
if "images" in node_output:
images_output = []
for image in node_output["images"]:
image_data = get_image(
image["filename"], image["subfolder"], image["type"]
)
images_output.append(image_data)
output_images[node_id] = images_output
return output_images
cli.add_command(generate_images)
if __name__ == "__main__":
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment