Face2Face
import os, argparse | |
import tensorflow as tf | |
from tensorflow.python.framework import graph_util | |
dir = os.path.dirname(os.path.realpath(__file__)) | |
def freeze_graph(model_folder): | |
# We retrieve our checkpoint fullpath | |
checkpoint = tf.train.get_checkpoint_state(model_folder) | |
input_checkpoint = checkpoint.model_checkpoint_path | |
# We precise the file fullname of our freezed graph | |
absolute_model_folder = '/'.join(input_checkpoint.split('/')[:-1]) | |
output_graph = absolute_model_folder + '/frozen_model.pb' | |
# Before exporting our graph, we need to precise what is our output node | |
# This is how TF decides what part of the Graph he has to keep and what part it can dump | |
# NOTE: this variable is plural, because you can have multiple output nodes | |
output_node_names = 'generate_output/output' | |
# We clear devices to allow TensorFlow to control on which device it will load operations | |
clear_devices = True | |
# We import the meta graph and retrieve a Saver | |
saver = tf.train.import_meta_graph(input_checkpoint + '.meta', clear_devices=clear_devices) | |
# We retrieve the protobuf graph definition | |
graph = tf.get_default_graph() | |
input_graph_def = graph.as_graph_def() | |
# We start a session and restore the graph weights | |
with tf.Session() as sess: | |
saver.restore(sess, input_checkpoint) | |
# We use a built-in TF helper to export variables to constants | |
output_graph_def = graph_util.convert_variables_to_constants( | |
sess, # The session is used to retrieve the weights | |
input_graph_def, # The graph_def is used to retrieve the nodes | |
output_node_names.split(",") # The output node names are used to select the usefull nodes | |
) | |
# Finally we serialize and dump the output graph to the filesystem | |
with tf.gfile.GFile(output_graph, 'wb') as f: | |
f.write(output_graph_def.SerializeToString()) | |
print('%d ops in the final graph.' % len(output_graph_def.node)) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--model-folder', type=str, help='Model folder to export') | |
args = parser.parse_args() | |
freeze_graph(args.model_folder) |
import os | |
import cv2 | |
import dlib | |
import time | |
import argparse | |
import numpy as np | |
from imutils import video | |
DOWNSAMPLE_RATIO = 4 | |
def reshape_for_polyline(array): | |
return np.array(array, np.int32).reshape((-1, 1, 2)) | |
def main(): | |
os.makedirs('original', exist_ok=True) | |
os.makedirs('landmarks', exist_ok=True) | |
cap = cv2.VideoCapture(args.filename) | |
fps = video.FPS().start() | |
count = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
frame_resize = cv2.resize(frame, None, fx=1 / DOWNSAMPLE_RATIO, fy=1 / DOWNSAMPLE_RATIO) | |
gray = cv2.cvtColor(frame_resize, cv2.COLOR_BGR2GRAY) | |
faces = detector(gray, 1) | |
black_image = np.zeros(frame.shape, np.uint8) | |
t = time.time() | |
# Perform if there is a face detected | |
if len(faces) == 1: | |
for face in faces: | |
detected_landmarks = predictor(gray, face).parts() | |
landmarks = [[p.x * DOWNSAMPLE_RATIO, p.y * DOWNSAMPLE_RATIO] for p in detected_landmarks] | |
jaw = reshape_for_polyline(landmarks[0:17]) | |
left_eyebrow = reshape_for_polyline(landmarks[22:27]) | |
right_eyebrow = reshape_for_polyline(landmarks[17:22]) | |
nose_bridge = reshape_for_polyline(landmarks[27:31]) | |
lower_nose = reshape_for_polyline(landmarks[30:35]) | |
left_eye = reshape_for_polyline(landmarks[42:48]) | |
right_eye = reshape_for_polyline(landmarks[36:42]) | |
outer_lip = reshape_for_polyline(landmarks[48:60]) | |
inner_lip = reshape_for_polyline(landmarks[60:68]) | |
color = (255, 255, 255) | |
thickness = 3 | |
cv2.polylines(black_image, [jaw], False, color, thickness) | |
cv2.polylines(black_image, [left_eyebrow], False, color, thickness) | |
cv2.polylines(black_image, [right_eyebrow], False, color, thickness) | |
cv2.polylines(black_image, [nose_bridge], False, color, thickness) | |
cv2.polylines(black_image, [lower_nose], True, color, thickness) | |
cv2.polylines(black_image, [left_eye], True, color, thickness) | |
cv2.polylines(black_image, [right_eye], True, color, thickness) | |
cv2.polylines(black_image, [outer_lip], True, color, thickness) | |
cv2.polylines(black_image, [inner_lip], True, color, thickness) | |
# Display the resulting frame | |
count += 1 | |
print(count) | |
cv2.imwrite("original/{}.png".format(count), frame) | |
cv2.imwrite("landmarks/{}.png".format(count), black_image) | |
fps.update() | |
print('[INFO] elapsed time: {:.2f}'.format(time.time() - t)) | |
if count == args.number: # only take 400 photos | |
break | |
elif cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
else: | |
print("No face detected") | |
fps.stop() | |
print('[INFO] elapsed time (total): {:.2f}'.format(fps.elapsed())) | |
print('[INFO] approx. FPS: {:.2f}'.format(fps.fps())) | |
cap.release() | |
cv2.destroyAllWindows() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--file', dest='filename', type=str, help='Name of the video file.') | |
parser.add_argument('--num', dest='number', type=int, help='Number of train data to be created.') | |
parser.add_argument('--landmark-model', dest='face_landmark_shape_file', type=str, help='Face landmark model file.') | |
args = parser.parse_args() | |
# Create the face predictor and landmark predictor | |
detector = dlib.get_frontal_face_detector() | |
predictor = dlib.shape_predictor(args.face_landmark_shape_file) | |
main() |
import argparse | |
import tensorflow as tf | |
CROP_SIZE = 256 # scale_size = CROP_SIZE | |
ngf = 64 | |
ndf = 64 | |
def preprocess(image): | |
with tf.name_scope('preprocess'): | |
# [0, 1] => [-1, 1] | |
return image * 2 - 1 | |
def deprocess(image): | |
with tf.name_scope('deprocess'): | |
# [-1, 1] => [0, 1] | |
return (image + 1) / 2 | |
def conv(batch_input, out_channels, stride): | |
with tf.variable_scope('conv'): | |
in_channels = batch_input.get_shape()[3] | |
filter = tf.get_variable('filter', [4, 4, in_channels, out_channels], dtype=tf.float32, | |
initializer=tf.random_normal_initializer(0, 0.02)) | |
# [batch, in_height, in_width, in_channels], [filter_width, filter_height, in_channels, out_channels] | |
# => [batch, out_height, out_width, out_channels] | |
padded_input = tf.pad(batch_input, [[0, 0], [1, 1], [1, 1], [0, 0]], mode='CONSTANT') | |
conv = tf.nn.conv2d(padded_input, filter, [1, stride, stride, 1], padding='VALID') | |
return conv | |
def lrelu(x, a): | |
with tf.name_scope('lrelu'): | |
# adding these together creates the leak part and linear part | |
# then cancels them out by subtracting/adding an absolute value term | |
# leak: a*x/2 - a*abs(x)/2 | |
# linear: x/2 + abs(x)/2 | |
# this block looks like it has 2 inputs on the graph unless we do this | |
x = tf.identity(x) | |
return (0.5 * (1 + a)) * x + (0.5 * (1 - a)) * tf.abs(x) | |
def batchnorm(input): | |
with tf.variable_scope('batchnorm'): | |
# this block looks like it has 3 inputs on the graph unless we do this | |
input = tf.identity(input) | |
channels = input.get_shape()[3] | |
offset = tf.get_variable('offset', [channels], dtype=tf.float32, initializer=tf.zeros_initializer()) | |
scale = tf.get_variable('scale', [channels], dtype=tf.float32, | |
initializer=tf.random_normal_initializer(1.0, 0.02)) | |
mean, variance = tf.nn.moments(input, axes=[0, 1, 2], keep_dims=False) | |
variance_epsilon = 1e-5 | |
normalized = tf.nn.batch_normalization(input, mean, variance, offset, scale, variance_epsilon=variance_epsilon) | |
return normalized | |
def deconv(batch_input, out_channels): | |
with tf.variable_scope('deconv'): | |
batch, in_height, in_width, in_channels = [int(d) for d in batch_input.get_shape()] | |
filter = tf.get_variable('filter', [4, 4, out_channels, in_channels], dtype=tf.float32, | |
initializer=tf.random_normal_initializer(0, 0.02)) | |
# [batch, in_height, in_width, in_channels], [filter_width, filter_height, out_channels, in_channels] | |
# => [batch, out_height, out_width, out_channels] | |
conv = tf.nn.conv2d_transpose(batch_input, filter, [batch, in_height * 2, in_width * 2, out_channels], | |
[1, 2, 2, 1], padding='SAME') | |
return conv | |
def process_image(x): | |
with tf.name_scope('load_images'): | |
raw_input = tf.image.convert_image_dtype(x, dtype=tf.float32) | |
raw_input.set_shape([None, None, 3]) | |
# break apart image pair and move to range [-1, 1] | |
width = tf.shape(raw_input)[1] # [height, width, channels] | |
a_images = preprocess(raw_input[:, :width // 2, :]) | |
b_images = preprocess(raw_input[:, width // 2:, :]) | |
inputs, targets = [a_images, b_images] | |
# synchronize seed for image operations so that we do the same operations to both | |
# input and output images | |
def transform(image): | |
r = image | |
# area produces a nice downscaling, but does nearest neighbor for upscaling | |
# assume we're going to be doing downscaling here | |
r = tf.image.resize_images(r, [CROP_SIZE, CROP_SIZE], method=tf.image.ResizeMethod.AREA) | |
return r | |
with tf.name_scope('input_images'): | |
input_images = tf.expand_dims(transform(inputs), 0) | |
with tf.name_scope('target_images'): | |
target_images = tf.expand_dims(transform(targets), 0) | |
return input_images, target_images | |
# Tensor('batch:1', shape=(1, 256, 256, 3), dtype=float32) -> 1 batch size | |
def create_generator(generator_inputs, generator_outputs_channels): | |
layers = [] | |
# encoder_1: [batch, 256, 256, in_channels] => [batch, 128, 128, ngf] | |
with tf.variable_scope('encoder_1'): | |
output = conv(generator_inputs, ngf, stride=2) | |
layers.append(output) | |
layer_specs = [ | |
ngf * 2, # encoder_2: [batch, 128, 128, ngf] => [batch, 64, 64, ngf * 2] | |
ngf * 4, # encoder_3: [batch, 64, 64, ngf * 2] => [batch, 32, 32, ngf * 4] | |
ngf * 8, # encoder_4: [batch, 32, 32, ngf * 4] => [batch, 16, 16, ngf * 8] | |
ngf * 8, # encoder_5: [batch, 16, 16, ngf * 8] => [batch, 8, 8, ngf * 8] | |
ngf * 8, # encoder_6: [batch, 8, 8, ngf * 8] => [batch, 4, 4, ngf * 8] | |
ngf * 8, # encoder_7: [batch, 4, 4, ngf * 8] => [batch, 2, 2, ngf * 8] | |
ngf * 8, # encoder_8: [batch, 2, 2, ngf * 8] => [batch, 1, 1, ngf * 8] | |
] | |
for out_channels in layer_specs: | |
with tf.variable_scope('encoder_%d' % (len(layers) + 1)): | |
rectified = lrelu(layers[-1], 0.2) | |
# [batch, in_height, in_width, in_channels] => [batch, in_height/2, in_width/2, out_channels] | |
convolved = conv(rectified, out_channels, stride=2) | |
output = batchnorm(convolved) | |
layers.append(output) | |
layer_specs = [ | |
(ngf * 8, 0.5), # decoder_8: [batch, 1, 1, ngf * 8] => [batch, 2, 2, ngf * 8 * 2] | |
(ngf * 8, 0.5), # decoder_7: [batch, 2, 2, ngf * 8 * 2] => [batch, 4, 4, ngf * 8 * 2] | |
(ngf * 8, 0.5), # decoder_6: [batch, 4, 4, ngf * 8 * 2] => [batch, 8, 8, ngf * 8 * 2] | |
(ngf * 8, 0.0), # decoder_5: [batch, 8, 8, ngf * 8 * 2] => [batch, 16, 16, ngf * 8 * 2] | |
(ngf * 4, 0.0), # decoder_4: [batch, 16, 16, ngf * 8 * 2] => [batch, 32, 32, ngf * 4 * 2] | |
(ngf * 2, 0.0), # decoder_3: [batch, 32, 32, ngf * 4 * 2] => [batch, 64, 64, ngf * 2 * 2] | |
(ngf, 0.0), # decoder_2: [batch, 64, 64, ngf * 2 * 2] => [batch, 128, 128, ngf * 2] | |
] | |
num_encoder_layers = len(layers) | |
for decoder_layer, (out_channels, dropout) in enumerate(layer_specs): | |
skip_layer = num_encoder_layers - decoder_layer - 1 | |
with tf.variable_scope('decoder_%d' % (skip_layer + 1)): | |
if decoder_layer == 0: | |
# first decoder layer doesn't have skip connections | |
# since it is directly connected to the skip_layer | |
input = layers[-1] | |
else: | |
input = tf.concat([layers[-1], layers[skip_layer]], axis=3) | |
rectified = tf.nn.relu(input) | |
# [batch, in_height, in_width, in_channels] => [batch, in_height*2, in_width*2, out_channels] | |
output = deconv(rectified, out_channels) | |
output = batchnorm(output) | |
if dropout > 0.0: | |
output = tf.nn.dropout(output, keep_prob=1 - dropout) | |
layers.append(output) | |
# decoder_1: [batch, 128, 128, ngf * 2] => [batch, 256, 256, generator_outputs_channels] | |
with tf.variable_scope('decoder_1'): | |
input = tf.concat([layers[-1], layers[0]], axis=3) | |
rectified = tf.nn.relu(input) | |
output = deconv(rectified, generator_outputs_channels) | |
output = tf.tanh(output) | |
layers.append(output) | |
return layers[-1] | |
def create_model(inputs, targets): | |
with tf.variable_scope('generator') as scope: | |
out_channels = int(targets.get_shape()[-1]) | |
outputs = create_generator(inputs, out_channels) | |
return outputs | |
def convert(image): | |
return tf.image.convert_image_dtype(image, dtype=tf.uint8, saturate=True, name='output') # output tensor | |
def generate_output(x): | |
with tf.name_scope('generate_output'): | |
test_inputs, test_targets = process_image(x) | |
# inputs and targets are [batch_size, height, width, channels] | |
model = create_model(test_inputs, test_targets) | |
# deprocess files | |
outputs = deprocess(model) | |
# reverse any processing on images so they can be written to disk or displayed to user | |
converted_outputs = convert(outputs) | |
return converted_outputs | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--model-input', dest='input_folder', type=str, help='Model folder to import.') | |
parser.add_argument('--model-output', dest='output_folder', type=str, help='Model (reduced) folder to export.') | |
args = parser.parse_args() | |
x = tf.placeholder(tf.uint8, shape=(256, 512, 3), name='image_tensor') # input tensor | |
y = generate_output(x) | |
with tf.Session() as sess: | |
# Restore original model | |
saver = tf.train.Saver() | |
checkpoint = tf.train.latest_checkpoint(args.input_folder) | |
saver.restore(sess, checkpoint) | |
# Export reduced model used for prediction | |
saver = tf.train.Saver() | |
saver.save(sess, '{}/reduced_model'.format(args.output_folder)) | |
print("Model is exported to {}".format(checkpoint)) |
import argparse | |
import cv2 | |
import dlib | |
import numpy as np | |
import tensorflow as tf | |
from imutils import video | |
CROP_SIZE = 256 | |
DOWNSAMPLE_RATIO = 4 | |
def reshape_for_polyline(array): | |
"""Reshape image so that it works with polyline.""" | |
return np.array(array, np.int32).reshape((-1, 1, 2)) | |
def resize(image): | |
"""Crop and resize image for pix2pix.""" | |
height, width, _ = image.shape | |
if height != width: | |
# crop to correct ratio | |
size = min(height, width) | |
oh = (height - size) // 2 | |
ow = (width - size) // 2 | |
cropped_image = image[oh:(oh + size), ow:(ow + size)] | |
image_resize = cv2.resize(cropped_image, (CROP_SIZE, CROP_SIZE)) | |
return image_resize | |
def load_graph(frozen_graph_filename): | |
"""Load a (frozen) Tensorflow model into memory.""" | |
graph = tf.Graph() | |
with graph.as_default(): | |
od_graph_def = tf.GraphDef() | |
with tf.gfile.GFile(frozen_graph_filename, 'rb') as fid: | |
serialized_graph = fid.read() | |
od_graph_def.ParseFromString(serialized_graph) | |
tf.import_graph_def(od_graph_def, name='') | |
return graph | |
def main(): | |
# TensorFlow | |
graph = load_graph(args.frozen_model_file) | |
image_tensor = graph.get_tensor_by_name('image_tensor:0') | |
output_tensor = graph.get_tensor_by_name('generate_output/output:0') | |
sess = tf.Session(graph=graph) | |
# OpenCV | |
cap = cv2.VideoCapture(args.video_source) | |
fps = video.FPS().start() | |
while True: | |
ret, frame = cap.read() | |
# resize image and detect face | |
frame_resize = cv2.resize(frame, None, fx=1 / DOWNSAMPLE_RATIO, fy=1 / DOWNSAMPLE_RATIO) | |
gray = cv2.cvtColor(frame_resize, cv2.COLOR_BGR2GRAY) | |
faces = detector(gray, 1) | |
black_image = np.zeros(frame.shape, np.uint8) | |
for face in faces: | |
detected_landmarks = predictor(gray, face).parts() | |
landmarks = [[p.x * DOWNSAMPLE_RATIO, p.y * DOWNSAMPLE_RATIO] for p in detected_landmarks] | |
jaw = reshape_for_polyline(landmarks[0:17]) | |
left_eyebrow = reshape_for_polyline(landmarks[22:27]) | |
right_eyebrow = reshape_for_polyline(landmarks[17:22]) | |
nose_bridge = reshape_for_polyline(landmarks[27:31]) | |
lower_nose = reshape_for_polyline(landmarks[30:35]) | |
left_eye = reshape_for_polyline(landmarks[42:48]) | |
right_eye = reshape_for_polyline(landmarks[36:42]) | |
outer_lip = reshape_for_polyline(landmarks[48:60]) | |
inner_lip = reshape_for_polyline(landmarks[60:68]) | |
color = (255, 255, 255) | |
thickness = 3 | |
cv2.polylines(black_image, [jaw], False, color, thickness) | |
cv2.polylines(black_image, [left_eyebrow], False, color, thickness) | |
cv2.polylines(black_image, [right_eyebrow], False, color, thickness) | |
cv2.polylines(black_image, [nose_bridge], False, color, thickness) | |
cv2.polylines(black_image, [lower_nose], True, color, thickness) | |
cv2.polylines(black_image, [left_eye], True, color, thickness) | |
cv2.polylines(black_image, [right_eye], True, color, thickness) | |
cv2.polylines(black_image, [outer_lip], True, color, thickness) | |
cv2.polylines(black_image, [inner_lip], True, color, thickness) | |
# generate prediction | |
combined_image = np.concatenate([resize(black_image), resize(frame_resize)], axis=1) | |
image_rgb = cv2.cvtColor(combined_image, cv2.COLOR_BGR2RGB) # OpenCV uses BGR instead of RGB | |
generated_image = sess.run(output_tensor, feed_dict={image_tensor: image_rgb}) | |
image_bgr = cv2.cvtColor(np.squeeze(generated_image), cv2.COLOR_RGB2BGR) | |
image_normal = np.concatenate([resize(frame_resize), image_bgr], axis=1) | |
image_landmark = np.concatenate([resize(black_image), image_bgr], axis=1) | |
if args.display_landmark == 0: | |
cv2.imshow('frame', image_normal) | |
else: | |
cv2.imshow('frame', image_landmark) | |
fps.update() | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
fps.stop() | |
print('[INFO] elapsed time (total): {:.2f}'.format(fps.elapsed())) | |
print('[INFO] approx. FPS: {:.2f}'.format(fps.fps())) | |
sess.close() | |
cap.release() | |
cv2.destroyAllWindows() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-src', '--source', dest='video_source', type=int, | |
default=0, help='Device index of the camera.') | |
parser.add_argument('--show', dest='display_landmark', type=int, default=0, choices=[0, 1], | |
help='0 shows the normal input and 1 the facial landmark.') | |
parser.add_argument('--landmark-model', dest='face_landmark_shape_file', type=str, help='Face landmark model file.') | |
parser.add_argument('--tf-model', dest='frozen_model_file', type=str, help='Frozen TensorFlow model file.') | |
args = parser.parse_args() | |
# Create the face predictor and landmark predictor | |
detector = dlib.get_frontal_face_detector() | |
predictor = dlib.shape_predictor(args.face_landmark_shape_file) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment