-
-
Save bsod90/fbeca5fd3d021e43aead278d176f07fb to your computer and use it in GitHub Desktop.
A script for automatically reframing landscape videos to portrait while keeping the subject centered
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from collections import defaultdict | |
import cv2 | |
import imageio | |
import pickle | |
import argparse | |
import ffmpeg | |
import tempfile | |
import numpy as np | |
from deep_sort_realtime.deepsort_tracker import DeepSort | |
from ultralytics import YOLO | |
GREEN = (0, 255, 0) | |
WHITE = (255, 255, 255) | |
def detect_people(frame, model): | |
# Apply the YOLOv8 detector to the frame and keep only people (class_id == 0) | |
# mps here means it will use the hardware acceleration on macOS | |
# Change it to cpu if you're on Linux or cuda if you have an Nvidia GPU | |
detections = model(frame, device="mps")[0] | |
for data in detections.boxes.data.tolist(): | |
confidence = data[4] | |
class_id = data[5] | |
if confidence >= 0.5 and class_id == 0: | |
xmin, ymin, xmax, ymax = int(data[0]), int( | |
data[1]), int(data[2]), int(data[3]) | |
yield [[xmin, ymin, xmax - xmin, ymax - ymin], confidence, class_id] | |
def bbox_center(bbox): | |
return (int((bbox[0] + bbox[2]) // 2), int((bbox[1] + bbox[3]) // 2)) | |
def filter_top_percent_tracks(track_durations, top_percent): | |
# Calculate the number of tracks to keep (top N%) | |
num_tracks_to_keep = int(len(track_durations) * top_percent) | |
# Sort tracks by duration and keep the top 40% | |
sorted_tracks = sorted(track_durations.items(), | |
key=lambda item: item[1], reverse=True) | |
top_tracks = sorted_tracks[:num_tracks_to_keep] | |
# Create a new dictionary with only the top tracks | |
filtered_track_durations = { | |
track_id: duration for track_id, duration in top_tracks} | |
return filtered_track_durations | |
def find_subjects(frames, track_durations): | |
subjects = [] | |
if len(track_durations) > 100: | |
# The perentage of tracks to keep varies depnding on the crowdiness of the video | |
track_durations = filter_top_percent_tracks(track_durations, 0.2) | |
for frame in frames: | |
longest_duration = 0 | |
subject_center = None | |
# Don't even try to re-center frames with more than 8 people | |
if len(frame) <= 8: | |
for track in frame: | |
track_id = track['track_id'] | |
duration = track_durations.get(track_id, 0) | |
if duration > longest_duration: | |
longest_duration = duration | |
subject_center = bbox_center( | |
track['bbox']) | |
subjects.append(subject_center) | |
return subjects | |
def track(video_path, subjects_fn, preview): | |
# Open the source file using OpenCV | |
cap = cv2.VideoCapture(video_path) | |
# Initialize the YOLOv8 detector | |
# It will automatically download the model weights on the first run | |
detector = YOLO("yolov8l.pt") | |
# Also initialize the DeepSort tracker. | |
# The embedder parameter specifies the model to use for feature extraction. | |
# In our case we're going to use one of the pre-trained variants of a CLIP model. | |
tracker = DeepSort(max_age=10, embedder='clip_ViT-B/32', | |
embedder_gpu=False) | |
cap = cv2.VideoCapture(video_path) | |
frame_count = 0 | |
detections = [] | |
tracks = [] | |
# For our subject detection logic we'll need to know the total durion of each track | |
# Due to this, we can't do subject detection in "online", but we'll have to do a second pass | |
# On the first pass we'll just accumulate tracks and their durations in this dictionary | |
track_durations = defaultdict(int) | |
tracks_per_frame = [] | |
subjects = [] | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_count += 1 | |
# Skip every other frame to speed up processing | |
if frame_count % 2 == 0: | |
# Detect people | |
detections = list(detect_people(frame, detector)) | |
tracks = tracker.update_tracks(detections, frame=frame) | |
tracks_per_frame.append([]) | |
for track in tracks: | |
# if the track is not confirmed, ignore it | |
if not track.is_confirmed(): | |
continue | |
# Update track durations and save some pre-frame info for the second pass | |
track_durations[track.track_id] = track.age | |
tracks_per_frame[-1].append({ | |
'track_id': track.track_id, | |
'bbox': track.to_ltrb(), | |
}) | |
# Draw the bounding box and the track id on the frame | |
# And display a preview window to track progress | |
track_id = track.track_id | |
ltrb = track.to_ltrb() | |
xmin, ymin, xmax, ymax = int(ltrb[0]), int( | |
ltrb[1]), int(ltrb[2]), int(ltrb[3]) | |
cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), GREEN, 2) | |
cv2.rectangle(frame, (xmin, ymin - 20), | |
(xmin + 20, ymin), GREEN, -1) | |
cv2.putText(frame, str(track_id), (xmin + 5, ymin - 8), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.5, WHITE, 2) | |
# Display the frame | |
if preview: | |
cv2.imshow('Processed Frame', frame) | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
# After all tracking is done, run subject detection and dump the result into an intermediary file | |
subjects = find_subjects( | |
tracks_per_frame, track_durations) | |
pickle.dump(subjects, open(subjects_fn, 'wb')) | |
cap.release() | |
cv2.destroyAllWindows() | |
def ease_camera_towards_subject(current_pos, target_pos, damping_factor): | |
# Calculate the distance vector between current position and target | |
distance_vector = np.array(target_pos) - np.array(current_pos) | |
# Apply damping to the distance vector | |
eased_vector = distance_vector * damping_factor | |
# Update the current position | |
new_pos = np.array(current_pos) + eased_vector | |
return tuple(new_pos.astype(int)) | |
def center_subject_in_frame(frame, new_size, subject_position, last_position, damping_factor): | |
original_height, original_width = frame.shape[:2] | |
new_width, new_height = new_size | |
# Calculate desired top-left corner for centered subject | |
subject_center_x, subject_center_y = subject_position | |
desired_x = max(0, min(original_width - new_width, | |
subject_center_x - new_width // 2)) | |
desired_y = max(0, min(original_height - new_height, | |
subject_center_y - new_height // 2)) | |
# Apply easing towards the subject | |
new_x, new_y = ease_camera_towards_subject( | |
last_position, (desired_x, desired_y), damping_factor) | |
# Ensure the new position is within bounds | |
new_x = max(0, min(new_x, original_width - new_width)) | |
new_y = max(0, min(new_y, original_height - new_height)) | |
# Crop the frame to the new dimensions | |
cropped_frame = frame[new_y:new_y + new_height, new_x:new_x + new_width] | |
return cropped_frame, (new_x, new_y), (new_x, new_y, new_width, new_height) | |
def round_to_multiple(number, multiple): | |
return round(number / multiple) * multiple | |
def reframe(video_path, subjects_fn, preview): | |
# We could parametrize this one too, but I'm just using this script for my | |
# vertical IG videos so, 9:16 it is :P | |
target_aspect_ratio = (9, 16) | |
cap = cv2.VideoCapture(video_path) | |
# Get the original video dimensions | |
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) | |
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
# Determine the base dimension (shortest side) | |
base_dimension = min(width, height) | |
# Calculate target dimensions maintaining aspect ratio | |
target_aspect_ratio_width, target_aspect_ratio_height = target_aspect_ratio | |
if width < height: # Landscape to portrait | |
new_width = int(base_dimension) | |
new_height = int( | |
base_dimension * target_aspect_ratio_height / target_aspect_ratio_width) | |
else: # Portrait to landscape or same orientation | |
new_height = int(base_dimension) | |
new_width = int(base_dimension * | |
target_aspect_ratio_width / target_aspect_ratio_height) | |
# Ensure new dimensions do not exceed original dimensions | |
new_width = int(min(round_to_multiple(new_width, 16), width)) | |
new_height = int(min(round_to_multiple(new_height, 16), height)) | |
frame_center = (int(width // 2), int(height // 2)) | |
# Create two temporary files to store the reframed video and the original audio | |
with tempfile.NamedTemporaryFile(suffix='.mp3') as temp_audio, \ | |
tempfile.NamedTemporaryFile(suffix='.mp4') as temp_video: | |
# I tried using OpenCV's VideoWriter but it segfaults on macOS, hence imageio | |
writer = imageio.get_writer( | |
temp_video.name, fps=fps, format='mp4', codec='libx264', quality=10) | |
subjects = pickle.load(open(subjects_fn, 'rb')) | |
frame_count = 0 | |
last_crop_position = (0, 0) | |
last_subject_position = (int(width // 2), int(height // 2)) | |
lost_subject_for = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# If no subject is found, just stick with the last position for a few seconds | |
# hoping it will reappear. If not - ease back to the center | |
if not subjects[frame_count]: | |
subject = last_subject_position | |
lost_subject_for += 1 | |
else: | |
subject = subjects[frame_count] | |
last_subject_position = subject | |
lost_subject_for = 0 | |
# Drift back towards the center if the subject is lost for too long | |
LOST_SUBJECT_THRESHOLD_SEC = 3 | |
if lost_subject_for > LOST_SUBJECT_THRESHOLD_SEC * fps: | |
subject = frame_center | |
# The last parameter is the damping factor | |
# It determines how quickly the camera moves towards the subject | |
# I found 0.1 to be a good overall value | |
cropped_frame, last_crop_position, crop_bbox = center_subject_in_frame( | |
frame, (new_width, new_height), subject, last_crop_position, 0.1 | |
) | |
# Write the new frame to the output video | |
writer.append_data(cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB)) | |
# Also create some markings on the original frame for the live preview | |
cv2.rectangle(frame, (int(subject[0]) - 5, int(subject[1]) - 5), | |
(int(subject[0]) + 5, int(subject[1]) + 5), GREEN, 2) | |
cv2.rectangle(frame, (crop_bbox[0], crop_bbox[1]), (crop_bbox[0] + crop_bbox[2], | |
crop_bbox[1] + crop_bbox[3]), GREEN, 2) | |
if lost_subject_for > 0: | |
cv2.putText(frame, f"Lost subject for {lost_subject_for / fps} seconds", | |
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, WHITE, 2) | |
# Display the live preview | |
if preview: | |
cv2.imshow('Processed Frame', frame) | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
frame_count += 1 | |
cap.release() | |
writer.close() | |
# Extract audio from the original video | |
ffmpeg.input(video_path).output(temp_audio.name, | |
q=0, map='a').run(overwrite_output=True) | |
# Combine the new video with the original audio | |
input_video_stream = ffmpeg.input(temp_video.name) | |
input_audio_stream = ffmpeg.input(temp_audio.name) | |
# Specify your desired codec here. hevc_videotoolbox is the hardware accelerated codec on macOS | |
ffmpeg.output(input_video_stream, input_audio_stream, f"{video_path.split('.')[0]}_reframed.mp4", | |
codec='aac', vcodec='libx264', pix_fmt='yuv420p', vf='format=yuv420p', profile='main', level='4.0').run(overwrite_output=True) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Process a GoPro video.") | |
parser.add_argument('video_path', type=str, | |
help='Path to the GoPro video file') | |
parser.add_argument('command', type=str, | |
help='Action to perform', choices=['track', 'reframe']) | |
parser.add_argument('--preview', dest='preview', action='store_true', | |
help='Display the processed video in a window', default=False) | |
args = parser.parse_args() | |
subjects_fn = f'{args.video_path.split(".")[0]}_subjects.pickle' | |
if args.command == 'track': | |
track(args.video_path, subjects_fn, args.preview) | |
elif args.command == 'reframe': | |
reframe(args.video_path, subjects_fn, args.preview) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment