Created
July 22, 2022 14:10
-
-
Save seva100/71807a726d2d153d5b5a30773999ebd6 to your computer and use it in GitHub Desktop.
VIdeo face alignment by landmarks based on face-alignment and MTCNN supporting multi-threading
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import skimage.io | |
import numpy | |
from argparse import ArgumentParser | |
from skimage import img_as_ubyte | |
from skimage.transform import resize | |
from tqdm import tqdm | |
import os | |
import shutil | |
from glob import glob | |
import shlex | |
import imageio | |
import numpy as np | |
import warnings | |
warnings.filterwarnings("ignore") | |
import subprocess | |
import pandas as pd | |
import imageio | |
import cv2 | |
from skimage import transform as trans | |
from joblib import Parallel, delayed | |
def ldms_transform(img, landmark, image_size): | |
"""Code of this function is originally taken from Tencent/TFace repository. | |
""" | |
assert landmark.shape[0] == 68 or landmark.shape[0] == 5 | |
assert landmark.shape[1] == 2 | |
if landmark.shape[0] == 68: | |
landmark5 = np.zeros((5, 2), dtype=np.float32) | |
landmark5[0] = (landmark[36] + landmark[39]) / 2 # left eye | |
landmark5[1] = (landmark[42] + landmark[45]) / 2 # right eye | |
landmark5[2] = landmark[30] # nose | |
landmark5[3] = landmark[48] # mouth_left | |
landmark5[4] = landmark[54] # mouth_right | |
else: | |
landmark5 = landmark | |
tform = trans.SimilarityTransform() | |
src = np.array([ | |
[30.2946, 51.6963], | |
[65.5318, 51.5014], | |
[48.0252, 71.7366], | |
[33.5493, 92.3655], | |
[62.7299, 92.2041]], | |
dtype=np.float32) | |
src[:, 0] += 8.0 | |
# # bringing it closer to the center: | |
# src = (src - 112 / 2.0) * 0.7 + 112 / 2.0 | |
src[:, 0] *= image_size[1] / 112.0 | |
src[:, 1] *= image_size[0] / 112.0 | |
# print('src:', src) | |
tform.estimate(landmark5, src) | |
M = tform.params[0:2, :] | |
img = cv2.warpAffine(img, M, (image_size[1], image_size[0]), | |
borderValue=0.0) | |
return img | |
def extract_frames(inp, fps, out_dir): | |
_, basename = os.path.split(inp) | |
frames_dir = os.path.join( | |
out_dir, | |
os.path.splitext(basename)[0] + '_frames' | |
) | |
os.makedirs(frames_dir, exist_ok=True) | |
cmd = f'ffmpeg -y -i {inp} -vf fps={fps} {frames_dir}/%04d.jpg -hide_banner -v 0' | |
subprocess.call(shlex.split(cmd)) | |
def align_frames(inp, image_shape, out_dir, mode='face_alignment', mtcnn_detector=None): | |
_, basename = os.path.split(inp) | |
frames_dir = os.path.join( | |
out_dir, | |
os.path.splitext(basename)[0] + '_frames' | |
) | |
in_fns = list(glob(os.path.join(frames_dir, '*.jpg'))) | |
aligned_frames_dir = os.path.join( | |
out_dir, | |
os.path.splitext(basename)[0] + '_frames_aligned' | |
) | |
os.makedirs(aligned_frames_dir, exist_ok=True) | |
# for fn in tqdm(in_fns): | |
for fn in in_fns: | |
try: | |
# img = Image.open(name) | |
img = imageio.imread(fn) | |
# img = jpeg_loader.decode(open(name, 'rb').read())[..., ::-1] | |
except: # broken file | |
print('Error when reading a file', fn) | |
continue | |
# raise Exception('') | |
if len(img.shape) == 2: | |
img = img[..., np.newaxis] | |
img = np.repeat(img, repeats=3, axis=2) | |
if img.shape[2] == 4: | |
img = img[..., :3] | |
# print('img shape:', img.shape) | |
h, w = img.shape[:2] | |
w_added = 0 | |
if h > w: | |
img = np.hstack([np.zeros_like(img)[:, :(h - w) // 2], | |
img, | |
np.zeros_like(img)[:, :(h - w) // 2 + (h - w) % 2]]) | |
# print('img shape after adding:', img.shape) | |
w_added = (h - w) // 2 | |
h_added = 0 | |
if w > h: | |
img = np.vstack([np.zeros_like(img)[:(w - h) // 2], | |
img, | |
np.zeros_like(img)[:(w - h) // 2 + (w - h) % 2]]) | |
# print('img shape after adding:', img.shape) | |
h_added = (w - h) // 2 | |
try: | |
if mode == 'face_alignment': | |
fa = face_alignment.FaceAlignment( | |
face_alignment.LandmarksType._2D, | |
device='cuda:0', | |
flip_input=False, | |
) | |
landmarks = fa.get_landmarks(img) | |
elif mode == 'mtcnn': | |
_, landmarks = mtcnn_detector(img, | |
min_face_size=100, | |
thresholds=[0.9, 0.9, 0.9], | |
nms_thresholds=[0.9, 0.9, 0.9] | |
) | |
except ValueError: | |
print('ValueError from the landmarks detector:') | |
print('name:', fn) | |
print('img shape:', img.shape) | |
continue | |
# if len(bounding_boxes) == 0: # no people found | |
# continue | |
# elif len(bounding_boxes) > 1: | |
# used_det = max(range(len(bounding_boxes)), | |
# key=lambda det: bounding_boxes[det][2] * bounding_boxes[det][3]) | |
# else: | |
# used_det = 0 | |
if len(landmarks) == 0: | |
continue | |
ldms_vec = landmarks[0] | |
if mode == 'mtcnn': | |
ldms_vec = ldms_vec.reshape(5, 2, order='F') | |
img_crop = ldms_transform(img, ldms_vec, image_shape) | |
# <in_dir>/<tag>/<name>.jpg -> <out_dir>/<tag>/<name>_<face_no>.jpg | |
out_name = os.path.join(aligned_frames_dir, os.path.basename(fn)) | |
os.makedirs(os.path.dirname(out_name), exist_ok=True) | |
imageio.imwrite(out_name, img_crop) | |
def merge_frames(inp, fps, out_dir): | |
_, basename = os.path.split(inp) | |
aligned_frames_dir = os.path.join( | |
out_dir, | |
os.path.splitext(basename)[0] + '_frames_aligned' | |
) | |
out_name = os.path.join(out_dir, os.path.basename(inp)) | |
cmd = f'ffmpeg -y -framerate {fps} -i "{aligned_frames_dir}/%04d.jpg" -start_number 0 -c:v libx264 -r {fps} -pix_fmt yuv420p {out_name} -hide_banner -v 0' | |
subprocess.call(shlex.split(cmd)) | |
def remove_aux_folders(inp, out_dir): | |
_, basename = os.path.split(inp) | |
frames_dir = os.path.join( | |
out_dir, | |
os.path.splitext(basename)[0] + '_frames' | |
) | |
aligned_frames_dir = os.path.join( | |
out_dir, | |
os.path.splitext(basename)[0] + '_frames_aligned' | |
) | |
shutil.rmtree(frames_dir) | |
shutil.rmtree(aligned_frames_dir) | |
def process_videos(in_list, fps, image_shape, out_dir, mode): | |
if mode == 'mtcnn': | |
from src import detect_faces as mtcnn_detector # initializing networks here | |
elif mode == 'face_alignment': | |
mtcnn_detector = None | |
for inp in tqdm(in_list): | |
extract_frames(inp, fps, out_dir) | |
align_frames(inp, image_shape, out_dir, mode, mtcnn_detector) | |
merge_frames(inp, fps, out_dir) | |
remove_aux_folders(inp, out_dir) | |
if __name__ == "__main__": | |
parser = ArgumentParser() | |
parser.add_argument("--inp_dir", required=True, help='input directory with .mp4 videos') | |
parser.add_argument("--image_shape", default=(256, 256), type=lambda x: tuple(map(int, x.split(','))), | |
help="Image shape") | |
parser.add_argument("--fps", dest="fps", type=int, help="fps", default=25) | |
parser.add_argument("--out_dir", type=str, default='.') | |
parser.add_argument("--mode", type=str, default='face_alignment', help='mode (accepted: "mtcnn", "face_alignment")') | |
parser.add_argument("--n_jobs", dest="n_jobs", type=int, default=1, help="number of parallel processes") | |
parser.add_argument("--cpu", dest="cpu", action="store_true", help="cpu mode.") | |
parser.add_argument("--mtcnn_repo_path", type=str, help="path to the downloaded mtcnn_pytorch repo (only needed for mode == 'mtcnn'") | |
args = parser.parse_args() | |
if args.mode == 'mtcnn': | |
import sys | |
sys.path.append(args.mtcnn_repo_path) | |
elif args.mode == 'face_alignment': | |
import face_alignment | |
in_fns = list(glob(os.path.join(args.inp_dir, '*.mp4'))) | |
Parallel(n_jobs=args.n_jobs)( | |
delayed(process_videos)( | |
in_fns[int(len(in_fns) / args.n_jobs * i) : int(len(in_fns) / args.n_jobs * (i + 1))], | |
fps=args.fps, | |
image_shape=args.image_shape, | |
out_dir=args.out_dir, | |
mode=args.mode | |
) | |
for i in range(args.n_jobs) | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment