Skip to content

Instantly share code, notes, and snippets.

@seva100
Created July 22, 2022 14:10
Show Gist options
  • Save seva100/71807a726d2d153d5b5a30773999ebd6 to your computer and use it in GitHub Desktop.
Save seva100/71807a726d2d153d5b5a30773999ebd6 to your computer and use it in GitHub Desktop.
VIdeo face alignment by landmarks based on face-alignment and MTCNN supporting multi-threading
import skimage.io
import numpy
from argparse import ArgumentParser
from skimage import img_as_ubyte
from skimage.transform import resize
from tqdm import tqdm
import os
import shutil
from glob import glob
import shlex
import imageio
import numpy as np
import warnings
warnings.filterwarnings("ignore")
import subprocess
import pandas as pd
import imageio
import cv2
from skimage import transform as trans
from joblib import Parallel, delayed
def ldms_transform(img, landmark, image_size):
"""Code of this function is originally taken from Tencent/TFace repository.
"""
assert landmark.shape[0] == 68 or landmark.shape[0] == 5
assert landmark.shape[1] == 2
if landmark.shape[0] == 68:
landmark5 = np.zeros((5, 2), dtype=np.float32)
landmark5[0] = (landmark[36] + landmark[39]) / 2 # left eye
landmark5[1] = (landmark[42] + landmark[45]) / 2 # right eye
landmark5[2] = landmark[30] # nose
landmark5[3] = landmark[48] # mouth_left
landmark5[4] = landmark[54] # mouth_right
else:
landmark5 = landmark
tform = trans.SimilarityTransform()
src = np.array([
[30.2946, 51.6963],
[65.5318, 51.5014],
[48.0252, 71.7366],
[33.5493, 92.3655],
[62.7299, 92.2041]],
dtype=np.float32)
src[:, 0] += 8.0
# # bringing it closer to the center:
# src = (src - 112 / 2.0) * 0.7 + 112 / 2.0
src[:, 0] *= image_size[1] / 112.0
src[:, 1] *= image_size[0] / 112.0
# print('src:', src)
tform.estimate(landmark5, src)
M = tform.params[0:2, :]
img = cv2.warpAffine(img, M, (image_size[1], image_size[0]),
borderValue=0.0)
return img
def extract_frames(inp, fps, out_dir):
_, basename = os.path.split(inp)
frames_dir = os.path.join(
out_dir,
os.path.splitext(basename)[0] + '_frames'
)
os.makedirs(frames_dir, exist_ok=True)
cmd = f'ffmpeg -y -i {inp} -vf fps={fps} {frames_dir}/%04d.jpg -hide_banner -v 0'
subprocess.call(shlex.split(cmd))
def align_frames(inp, image_shape, out_dir, mode='face_alignment', mtcnn_detector=None):
_, basename = os.path.split(inp)
frames_dir = os.path.join(
out_dir,
os.path.splitext(basename)[0] + '_frames'
)
in_fns = list(glob(os.path.join(frames_dir, '*.jpg')))
aligned_frames_dir = os.path.join(
out_dir,
os.path.splitext(basename)[0] + '_frames_aligned'
)
os.makedirs(aligned_frames_dir, exist_ok=True)
# for fn in tqdm(in_fns):
for fn in in_fns:
try:
# img = Image.open(name)
img = imageio.imread(fn)
# img = jpeg_loader.decode(open(name, 'rb').read())[..., ::-1]
except: # broken file
print('Error when reading a file', fn)
continue
# raise Exception('')
if len(img.shape) == 2:
img = img[..., np.newaxis]
img = np.repeat(img, repeats=3, axis=2)
if img.shape[2] == 4:
img = img[..., :3]
# print('img shape:', img.shape)
h, w = img.shape[:2]
w_added = 0
if h > w:
img = np.hstack([np.zeros_like(img)[:, :(h - w) // 2],
img,
np.zeros_like(img)[:, :(h - w) // 2 + (h - w) % 2]])
# print('img shape after adding:', img.shape)
w_added = (h - w) // 2
h_added = 0
if w > h:
img = np.vstack([np.zeros_like(img)[:(w - h) // 2],
img,
np.zeros_like(img)[:(w - h) // 2 + (w - h) % 2]])
# print('img shape after adding:', img.shape)
h_added = (w - h) // 2
try:
if mode == 'face_alignment':
fa = face_alignment.FaceAlignment(
face_alignment.LandmarksType._2D,
device='cuda:0',
flip_input=False,
)
landmarks = fa.get_landmarks(img)
elif mode == 'mtcnn':
_, landmarks = mtcnn_detector(img,
min_face_size=100,
thresholds=[0.9, 0.9, 0.9],
nms_thresholds=[0.9, 0.9, 0.9]
)
except ValueError:
print('ValueError from the landmarks detector:')
print('name:', fn)
print('img shape:', img.shape)
continue
# if len(bounding_boxes) == 0: # no people found
# continue
# elif len(bounding_boxes) > 1:
# used_det = max(range(len(bounding_boxes)),
# key=lambda det: bounding_boxes[det][2] * bounding_boxes[det][3])
# else:
# used_det = 0
if len(landmarks) == 0:
continue
ldms_vec = landmarks[0]
if mode == 'mtcnn':
ldms_vec = ldms_vec.reshape(5, 2, order='F')
img_crop = ldms_transform(img, ldms_vec, image_shape)
# <in_dir>/<tag>/<name>.jpg -> <out_dir>/<tag>/<name>_<face_no>.jpg
out_name = os.path.join(aligned_frames_dir, os.path.basename(fn))
os.makedirs(os.path.dirname(out_name), exist_ok=True)
imageio.imwrite(out_name, img_crop)
def merge_frames(inp, fps, out_dir):
_, basename = os.path.split(inp)
aligned_frames_dir = os.path.join(
out_dir,
os.path.splitext(basename)[0] + '_frames_aligned'
)
out_name = os.path.join(out_dir, os.path.basename(inp))
cmd = f'ffmpeg -y -framerate {fps} -i "{aligned_frames_dir}/%04d.jpg" -start_number 0 -c:v libx264 -r {fps} -pix_fmt yuv420p {out_name} -hide_banner -v 0'
subprocess.call(shlex.split(cmd))
def remove_aux_folders(inp, out_dir):
_, basename = os.path.split(inp)
frames_dir = os.path.join(
out_dir,
os.path.splitext(basename)[0] + '_frames'
)
aligned_frames_dir = os.path.join(
out_dir,
os.path.splitext(basename)[0] + '_frames_aligned'
)
shutil.rmtree(frames_dir)
shutil.rmtree(aligned_frames_dir)
def process_videos(in_list, fps, image_shape, out_dir, mode):
if mode == 'mtcnn':
from src import detect_faces as mtcnn_detector # initializing networks here
elif mode == 'face_alignment':
mtcnn_detector = None
for inp in tqdm(in_list):
extract_frames(inp, fps, out_dir)
align_frames(inp, image_shape, out_dir, mode, mtcnn_detector)
merge_frames(inp, fps, out_dir)
remove_aux_folders(inp, out_dir)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--inp_dir", required=True, help='input directory with .mp4 videos')
parser.add_argument("--image_shape", default=(256, 256), type=lambda x: tuple(map(int, x.split(','))),
help="Image shape")
parser.add_argument("--fps", dest="fps", type=int, help="fps", default=25)
parser.add_argument("--out_dir", type=str, default='.')
parser.add_argument("--mode", type=str, default='face_alignment', help='mode (accepted: "mtcnn", "face_alignment")')
parser.add_argument("--n_jobs", dest="n_jobs", type=int, default=1, help="number of parallel processes")
parser.add_argument("--cpu", dest="cpu", action="store_true", help="cpu mode.")
parser.add_argument("--mtcnn_repo_path", type=str, help="path to the downloaded mtcnn_pytorch repo (only needed for mode == 'mtcnn'")
args = parser.parse_args()
if args.mode == 'mtcnn':
import sys
sys.path.append(args.mtcnn_repo_path)
elif args.mode == 'face_alignment':
import face_alignment
in_fns = list(glob(os.path.join(args.inp_dir, '*.mp4')))
Parallel(n_jobs=args.n_jobs)(
delayed(process_videos)(
in_fns[int(len(in_fns) / args.n_jobs * i) : int(len(in_fns) / args.n_jobs * (i + 1))],
fps=args.fps,
image_shape=args.image_shape,
out_dir=args.out_dir,
mode=args.mode
)
for i in range(args.n_jobs)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment