Skip to content

Instantly share code, notes, and snippets.

@PizzaRules668
Last active May 31, 2022 14:17
Show Gist options
  • Save PizzaRules668/8aee0c03c1b1977b07168cb659ce19cb to your computer and use it in GitHub Desktop.
Save PizzaRules668/8aee0c03c1b1977b07168cb659ce19cb to your computer and use it in GitHub Desktop.
r/Place timelapse creator
import threading
import queue
import time
import os
import cv2
import numpy as np
import argparse
from tqdm import tqdm
# ALlow the user to add command line arguments
# Example: python fast\ timelapse.py -x 20 -y 679 --width 52 --height 80 --scale 8 --fps 45 -t 3 -o tux.mp4
parser = argparse.ArgumentParser()
parser.add_argument("-o", "--output", help="Output file name", dest="out", default="output.mp4")
parser.add_argument("-f", "--fps", help="Frames per second", dest="fps", default=30, type=int)
parser.add_argument("-x", help="X position", default=0, type=int)
parser.add_argument("-y", help="Y position", default=0, type=int)
parser.add_argument("--width", help="Width", default=2000, type=int)
parser.add_argument("--height", help="Height", default=2000, type=int)
parser.add_argument("--scale", help="Scale", default=2, type=int)
parser.add_argument("-t", "--threads", help="Number of threads", dest="threads", default=1, type=int)
done = False
# Create Queues for frame processing, and adding
todo = queue.PriorityQueue()
frames = queue.PriorityQueue()
# Create Process bars for to process frames, and compete frames
todoDone = 0
todoBar = tqdm(postfix=todoDone, desc="Frames Processed")
framesDone = 0
framesBar = tqdm(postfix=framesDone, desc="Frames Finished")
# Upscale an image
def upscale(img, factor):
if factor == 1:
return img
# Create new image of x factor
scaledImg = np.zeros((img.shape[0] * factor, img.shape[1] * factor, 3), dtype=np.uint8)
# Loop through each pixel in the image
for i in range(factor):
for j in range(factor):
# Copy the image into the new image
scaledImg[i::factor, j::factor] = img
del img
return scaledImg
# Make a Function to process frames
def frameProcessor(x, y, size, scale):
global todo, frames, todoDone, todoBar
# While Queue is not empty
while not todo.empty():
try:
# Get first item
item = todo.get()
# Read image, and scale it
img = cv2.cvtColor(
upscale(cv2.imread("images/"+item[1])[y:y+size[1], x:x+size[0]], scale),
cv2.COLOR_RGBA2RGB
)
# Add image to queue
frames.put((item[1], img))
del item
del img
if frames.qsize() > 100:
time.sleep(2)
except Exception as e:
print(f"images/{item[0]}\n{e}")
# Iterate the Progress Bar
todoDone += 1
todoBar.update()
todo.task_done()
# Create Function to add processed frames to a video
def addFrame(outFile, fps, size):
global done, frames, framesDone, framesBar, out
# Create the video writer
out = cv2.VideoWriter(outFile, cv2.VideoWriter_fourcc(*"mp4v"), fps, size)
# While not done
while not done:
try:
# Get the first frame
frame = frames.get()
# Take frame and convert it from RGBA to RGB
out.write(frame[1])
del frame
except Exception as e:
print(f"images/{frame[0]}\n{e}")
# Iterate Progress Bar
framesDone += 1
framesBar.update()
frames.task_done()
print("Done")
out.release()
def createTimeLapse(x, y, width, height, scale, fps, threadsCount, out):
global done, todoBar, framesBar
done = False
# Define the size
size = (width, height)
scaleSize = (width*scale, height*scale)
# Get list of images
files = os.listdir("images")
files.sort()
# Set max progress bar lenght to number of images
todoBar.total = len(files)
framesBar.total = len(files)
# Add all files to queue
for file in files:
todo.put((file.split(".")[0], file))
# Create t number of threads and add to threads
threads = []
for i in range(threadsCount):
threads.append(threading.Thread(target=frameProcessor, daemon=True, args=(x, y, size, scale,)))
threads[i].start()
# Start video writing thread
writer = threading.Thread(target=addFrame, daemon=True, args=(out, fps, scaleSize,))
writer.start()
# Join the queue thread back to the main thread
todo.join()
# Wait untill frames is not empty
while not frames.empty():
pass
done = True
# Join all threads back to main
for t in threads:
t.join()
# Join the writing thread to main
writer.join()
if __name__ == "__main__":
# Get the arguments from the command line
args = parser.parse_args()
# Create the TimeLapse
createTimeLapse(args.x, args.y, args.width, args.height, args.scale, args.fps, args.threads, args.out)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment