Skip to content

Instantly share code, notes, and snippets.

@manashmandal
Last active June 7, 2018 14:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save manashmandal/b687b6b6bcefe832feefef9fb5780b94 to your computer and use it in GitHub Desktop.
Save manashmandal/b687b6b6bcefe832feefef9fb5780b94 to your computer and use it in GitHub Desktop.
import os
from tqdm import tqdm
import cv2
import numpy as np
from pprint import pprint
from os.path import join
import glob
from sklearn.utils import shuffle
from keras.utils import to_categorical
SEED = 20
img_shape = (50, 50)
random_state = np.random.RandomState(SEED)
choice = random_state.choice
APPLE_DIRECTORY = "./apple/"
BALL_DIRECTORY = "./ball/"
BANANA_DIRECTORY = "./banana/"
LABELS_DICTIONARY = {
'apple' : 0,
'ball' : 1,
'banana' : 2
}
def get_image_count(directory, index=0):
if index >= len(os.listdir(directory)):
raise ValueError("Index must be less than number of subdirectory in the top level directory")
if index == -1:
total_count = 0
# Get all image counts
for subdir in os.listdir(directory):
total_count += len(
os.listdir(
join(directory, subdir)
)
)
return total_count
return len(
os.listdir(join(directory ,os.listdir(directory)[index]))
)
def get_image_from_directory_by_index(
directory, index, index1, index2, shape=(224, 224)):
images = []
image_count = index2 - index1
if image_count > get_image_count(directory, index=index):
image_count = get_image_count(directory, index=index)
subdirs = os.listdir(directory)
if index >= len(subdirs):
raise ValueError("Index must be less than number of subdirectories in the top level directory")
imagepaths = glob.glob( join( join( directory, subdirs[index] ), '*' ) )[index1:index2]
for imgpath in imagepaths:
img = cv2.imread(imgpath, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, img_shape, cv2.INTER_CUBIC)
img = img / 255.0
images.append(img)
return np.array(images)
def generate_label(label, count):
return np.ones(count) * LABELS_DICTIONARY[label]
def get_image_from_directory(directory, image_count, shape=(224, 224)):
images = []
current_img_count = 0
# Get total image count
total_image_count= get_image_count(directory, -1)
# Image count cant be greater than
if image_count > total_image_count:
return False, total_image_count
# while current_img_count != image_count:
for subdir in os.listdir(directory):
_subdir = glob.glob(join(join(directory, subdir), '*'))
for imagepath in _subdir:
img = cv2.imread(imagepath, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, shape, interpolation=cv2.INTER_CUBIC)
img = img / 255.0
images.append(img)
current_img_count += 1
if current_img_count == image_count:
return np.array(images)
import numpy as np
from sklearn.utils import shuffle
from numpy.random import choice
from imageutils import get_image_from_directory
Q_IMG_INDEX = 0
P_IMG_INDEX = 1
N_IMG_INDEX = 2
Q_INDEX = 3
P_INDEX = 4
N_INDEX = 5
NUM_SAMPLES = 1000
BATCH_SIZE = 64
# Example
apple_indices = shuffle(list(range(NUM_SAMPLES)))
ball_indices = shuffle(list(range(NUM_SAMPLES)))
banana_indices = shuffle(list(range(NUM_SAMPLES)))
cap_indices = shuffle(list(range(NUM_SAMPLES)))
camera_indices = shuffle(list(range(NUM_SAMPLES)))
scissors_indices = shuffle(list(range(NUM_SAMPLES)))
tomatoes_indices = shuffle(list(range(NUM_SAMPLES)))
cell_phone_indices = shuffle(list(range(NUM_SAMPLES)))
print("LOADING APPLE IMAGES")
apple = get_image_from_directory('./apple', NUM_SAMPLES, shape=(50, 50))
print("LOADING BALL IMAGES")
ball = get_image_from_directory('./ball', NUM_SAMPLES, shape=(50, 50))
print("LOADING BANANA IMAGES")
banana = get_image_from_directory('./banana', NUM_SAMPLES, shape=(50, 50))
print("LOADING CAP IMAGES")
cap = get_image_from_directory('./cap', NUM_SAMPLES, shape=(50, 50))
print("LOADING CAMERA IMAGES")
camera = get_image_from_directory('./camera', NUM_SAMPLES, shape=(50, 50))
print("LOADING SCISSORS IMAGES")
scissors = get_image_from_directory('./scissors', NUM_SAMPLES, shape=(50, 50))
print("LOADING TOMATO IMAGES")
tomatoes = get_image_from_directory('./tomato', NUM_SAMPLES, shape=(50, 50))
print("LOADING CELL PHONE IMAGES")
cell_phone = get_image_from_directory('./cell_phone', NUM_SAMPLES, shape=(50, 50))
classes = ['apple', 'ball', 'banana', 'cap', 'tomato', 'scissors', 'cell_phone', 'camera']
data_dict_indices = {
'apple' : apple_indices,
'ball' : ball_indices,
'banana' : banana_indices,
'cap' : cap_indices,
'camera' : camera_indices,
'scissors' : scissors_indices,
'tomato' : tomatoes_indices,
'cell_phone' : cell_phone_indices
}
data_dict = {
'apple' : apple,
'ball' : ball,
'banana' : banana,
'cap' : cap,
'tomato' : tomatoes,
'scissors' : scissors,
'camera' : camera,
'cell_phone' : cell_phone
}
CLASS_INDEX = {
k : v for v, k in enumerate(list(data_dict_indices.keys()))
}
INDEX2CLASS = {
v : k for k, v in zip(CLASS_INDEX.keys(), CLASS_INDEX.values())
}
def onehot(labels, max=8):
return np.eye(max)[np.asarray(labels, dtype=np.int32)]
# Make sure indices are shuffled before fed into this function
def generate_one_sample(data_class):
classes = list(data_class.keys())
num_classes = list(range(len(data_class.keys())))
# Get which one will be positive and which one will be negative class
query_positive_class = choice(num_classes, 1)[0]
negative_class = choice(list(set(num_classes) - set([query_positive_class.tolist()])), 1)[0]
query_positive_class_label = classes[query_positive_class]
negative_class_label = classes[negative_class]
query_positive_indices = choice( data_class[query_positive_class_label], 2 ).tolist()
negative_index = choice(data_class[negative_class_label], 1).tolist()[0]
return (query_positive_indices[0], query_positive_indices[1], negative_index), (query_positive_class_label, query_positive_class_label, negative_class_label)
def generate_triplet_batch_numpy(data_dict_indices, batch_size=32):
train_labels = []
for i in range(batch_size):
train, labels = generate_one_sample(data_dict_indices)
train = list(train)
label = [ CLASS_INDEX[label] for label in labels ]
train_label = train + label
train_labels.append(train_label)
return np.array(train_labels)
def triplet(data_dict, train_index):
q_placeholder = np.zeros((len(train_index), 50, 50, 3))
p_placeholder = np.zeros((len(train_index), 50, 50, 3))
n_placeholder = np.zeros((len(train_index), 50, 50, 3))
for c in CLASS_INDEX.keys():
q_placeholder_loc = np.where(train_index[:, Q_INDEX] == CLASS_INDEX[c])[0]
q_placeholder[
q_placeholder_loc
] = data_dict[c][ train_index[:, Q_IMG_INDEX] [q_placeholder_loc]]
p_placeholder_loc = np.where(train_index[:, P_INDEX] == CLASS_INDEX[c])[0]
p_placeholder[
p_placeholder_loc
] = data_dict[c][ train_index[:, P_IMG_INDEX] [p_placeholder_loc]]
n_placeholder_loc = np.where(train_index[:, N_INDEX] == CLASS_INDEX[c])[0]
n_placeholder[
n_placeholder_loc
] = data_dict[c][ train_index[:, N_IMG_INDEX] [n_placeholder_loc]]
return (q_placeholder, p_placeholder, n_placeholder)
def triplet_generator(data_dict, train_indices, batch_size=64, _set='TRAIN'):
query, positive, negative = triplet(data_dict, train_indices)
query_index = train_indices[:, Q_INDEX]
positive_index = train_indices[:, P_INDEX]
negative_index = train_indices[:, N_INDEX]
n_samples = len(query)
while True:
for i in range(0, n_samples, batch_size ):
upper_limit = min(i + batch_size, n_samples)
# print("Batch {} : {}".format(_set, i // batch_size))
yield (
query[ i: upper_limit],
positive [i : upper_limit],
negative [i : upper_limit],
onehot(query_index[i: upper_limit]),
onehot(positive_index[i : upper_limit]),
onehot(negative_index[i : upper_limit])
)
# train_indices = np.load('train_indices.npy')
# train_gen = triplet_generator(data_dict, train_indices)
# test_indices = np.load('test_indices.npy')
# test_gen = triplet_generator(data_dict, test_indices, _set='TEST')
# train_indices = np.load('train_2500.npy')
# train_gen = triplet_generator(data_dict, train_indices)
# test_indices = np.load('test_2500.npy')
# test_gen = triplet_generator(data_dict, test_indices, _set='TEST')
train_indices = np.asarray( np.load('train_8_class_6k_samples.npy'), dtype=np.int32)
train_gen = triplet_generator(data_dict, train_indices)
test_indices = np.asarray(np.load('test_8_class_4k_samples.npy'),dtype=np.int32)
test_gen = triplet_generator(data_dict, test_indices, _set='TEST')
import tensorflow as tf
import numpy as np
from tqdm import tqdm
from datetime import datetime
import matplotlib.pyplot as plt
from imageutils import get_image_from_directory
PREDICTION = True
if PREDICTION == True:
NUM_SAMPLES = 1000
data_dict = {
'apple' : 'apple',
'ball' : 'ball',
'banana' : 'banana',
'cap' : 'cap',
'tomato' : 'tomatoes',
'scissors' : 'scissors',
'camera' : 'camera',
'cell_phone' : 'cell_phone'
}
CLASS_INDEX = {
k : v for v, k in enumerate(list(data_dict.keys()))
}
INDEX2CLASS = {
v : k for k, v in zip(CLASS_INDEX.keys(), CLASS_INDEX.values())
}
print("LOADING APPLE IMAGES")
apple = get_image_from_directory('./apple', NUM_SAMPLES, shape=(50, 50))
print("LOADING BALL IMAGES")
ball = get_image_from_directory('./ball', NUM_SAMPLES, shape=(50, 50))
print("LOADING BANANA IMAGES")
banana = get_image_from_directory('./banana', NUM_SAMPLES, shape=(50, 50))
print("LOADING CAP IMAGES")
cap = get_image_from_directory('./cap', NUM_SAMPLES, shape=(50, 50))
print("LOADING CAMERA IMAGES")
camera = get_image_from_directory('./camera', NUM_SAMPLES, shape=(50, 50))
print("LOADING SCISSORS IMAGES")
scissors = get_image_from_directory('./scissors', NUM_SAMPLES, shape=(50, 50))
print("LOADING TOMATO IMAGES")
tomatoes = get_image_from_directory('./tomato', NUM_SAMPLES, shape=(50, 50))
print("LOADING CELL PHONE IMAGES")
cell_phone = get_image_from_directory('./cell_phone', NUM_SAMPLES, shape=(50, 50))
to_extract_feature = np.vstack((
apple, ball, banana, cap, camera, scissors, tomatoes, cell_phone
))
to_extract_feature_labels = np.vstack((
np.ones(NUM_SAMPLES) * CLASS_INDEX['apple'],
np.ones(NUM_SAMPLES) * CLASS_INDEX['ball'],
np.ones(NUM_SAMPLES) * CLASS_INDEX['banana'],
np.ones(NUM_SAMPLES) * CLASS_INDEX['cap'],
np.ones(NUM_SAMPLES) * CLASS_INDEX['camera'],
np.ones(NUM_SAMPLES) * CLASS_INDEX['scissors'],
np.ones(NUM_SAMPLES) * CLASS_INDEX['tomato'],
np.ones(NUM_SAMPLES) * CLASS_INDEX['cell_phone']
))
else:
from tg3 import train_gen, test_gen, BATCH_SIZE
HEIGHT = 50
WIDTH = 50
CHANNEL = 3
CLASSES = 8
IMAGE_INPUT = tf.placeholder(tf.float32, [None, HEIGHT, WIDTH, CHANNEL])
X1 = tf.placeholder(tf.float32, [None, HEIGHT, WIDTH, CHANNEL])
y1 = tf.placeholder(tf.float32, [None, CLASSES])
X2 = tf.placeholder(tf.float32, [None, HEIGHT, WIDTH, CHANNEL])
y2 = tf.placeholder(tf.float32, [None, CLASSES])
X3 = tf.placeholder(tf.float32, [None, HEIGHT, WIDTH, CHANNEL])
y3 = tf.placeholder(tf.float32, [None, CLASSES])
# Conv 1_1
kernel11_shape = [3, 3, 3, 64]
kernel11_name = 'kernel11'
conv11_shape = [1, 1, 1, 1]
biases11_shape = [64]
biases11_name = 'biases11'
conv11_padding = 'SAME'
# Conv 1_2
kernel12_shape = [3, 3, 64, 64]
kernel12_name = 'kernel12'
conv12_shape = [1, 1, 1, 1]
conv12_padding = 'SAME'
biases12_shape = [64]
biases12_name = 'biases12'
# Pool 1
pool1_ksize = [1, 2, 2, 1]
pool1_strides = [1, 2, 2, 1]
pool1_padding = 'SAME'
pool1_name = 'pool1'
# Conv 2_1
kernel21_name = 'kernel21'
kernel21_shape = [3, 3, 64, 128]
conv21_shape = [1, 1, 1, 1]
conv21_padding = 'SAME'
biases21_shape = [128]
biases21_name = 'biases21'
# Conv 2_2
kernel22_name = 'kernel22'
kernel22_shape = [3, 3, 128, 128]
conv22_shape = [1, 1, 1, 1]
conv22_padding = 'SAME'
biases22_shape = [128]
biases22_name = 'biases22'
# pool 2
pool2_ksize = [1, 2, 2, 1]
pool2_strides = [1, 2, 2, 1]
pool2_padding = 'SAME'
pool2_name = 'pool2'
# Conv 3_1
kernel31_name = 'kernel31'
kernel31_shape = [3, 3, 128, 256]
conv31_shape = [1, 1, 1, 1]
conv31_padding = 'SAME'
biases31_shape = [256]
biases31_name = 'biases31'
# Conv 3_2
kernel32_name = 'kernel32'
kernel32_shape = [3, 3, 256, 256]
conv32_shape = [1, 1, 1, 1]
conv32_padding = 'SAME'
biases32_shape = [256]
biases32_name = 'biases32'
# Conv 3_3
kernel33_name = 'kernel33'
kernel33_shape = [3, 3, 256, 256]
conv33_shape = [1, 1, 1, 1]
conv33_padding = 'SAME'
biases33_shape = [256]
biases33_name = 'biases33'
# Pool 3
pool3_ksize = [1, 2, 2, 1]
pool3_strides = [1, 2, 2, 1]
pool3_padding = 'SAME'
pool3_name = 'pool3'
# Conv 4_1
kernel41_name = 'kernel41'
kernel41_shape = [3, 3, 256, 512]
conv41_shape = [1, 1, 1, 1]
conv41_padding = 'SAME'
biases41_shape = [512]
biases41_name = 'biases41'
# Conv 4_2
kernel42_name = 'kernel42'
kernel42_shape = [3, 3, 512, 512]
conv42_shape = [1, 1, 1, 1]
conv42_padding = 'SAME'
biases42_shape = [512]
biases42_name = 'biases42'
# Conv 4_3
kernel43_name = 'kernel43'
kernel43_shape = [3, 3, 512, 512]
conv43_shape = [1, 1, 1, 1]
conv43_padding = 'SAME'
biases43_shape = [512]
biases43_name = 'biases43'
# Pool 4
pool4_ksize = [1, 2, 2, 1]
pool4_strides = [1, 2, 2, 1]
pool4_padding = 'SAME'
pool4_name = 'pool4'
# Conv 5_1
kernel51_name = 'kernel51'
kernel51_shape = [3, 3, 512, 512]
conv51_shape = [1, 1, 1, 1]
conv51_padding = 'SAME'
biases51_shape = [512]
biases51_name = 'biases51'
# Conv 5_2
kernel52_name = 'kernel52'
kernel52_shape = [3, 3, 512, 512]
conv52_shape = [1, 1, 1, 1]
conv52_padding = 'SAME'
biases52_shape = [512]
biases52_name = 'biases52'
# Conv 5_3
kernel53_name = 'kernel53'
kernel53_shape = [3, 3, 512, 512]
conv53_shape = [1, 1, 1, 1]
conv53_padding = 'SAME'
biases53_shape = [512]
biases53_name = 'biases53'
# Pool 5
pool5_ksize = [1, 2, 2, 1]
pool5_strides = [1, 2, 2, 1]
pool5_padding = 'SAME'
pool5_name = 'pool5'
# fully connected 1
fc1w_name = 'fc1'
fc1w_shape = [2048, 4096]
fc1b_name = 'fb1'
fc1b_shape = [4096]
# Fully connected 2
fc2w_name = 'fc2'
fc2b_name = 'fb2'
fc2w_shape = [4096, 4096]
fc2b_shape = [4096]
# fully connected 3
fc3w_name = 'fc3'
fc3b_name = 'fb3'
fc3w_shape = [4096, CLASSES]
fc3b_shape = [CLASSES]
triplet_variables = None
classification_variables = None
with tf.variable_scope('vgg') as scope:
kernel11 = tf.get_variable(name=kernel11_name, shape=kernel11_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases11 = tf.get_variable(name=biases11_name, shape=biases11_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel12 = tf.get_variable(name=kernel12_name, shape=kernel12_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases12 = tf.get_variable(name=biases12_name, shape=biases12_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel21 = tf.get_variable(name=kernel21_name, shape=kernel21_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases21 = tf.get_variable(name=biases21_name, shape=biases21_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel22 = tf.get_variable(name=kernel22_name, shape=kernel22_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases22 = tf.get_variable(name=biases22_name, shape=biases22_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel31 = tf.get_variable(name=kernel31_name, shape=kernel31_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases31 = tf.get_variable(name=biases31_name, shape=biases31_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel32 = tf.get_variable(name=kernel32_name, shape=kernel32_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases32 = tf.get_variable(name=biases32_name, shape=biases32_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel33 = tf.get_variable(name=kernel33_name, shape=kernel33_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases33 = tf.get_variable(name=biases33_name, shape=biases33_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel41 = tf.get_variable(name=kernel41_name, shape=kernel41_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases41 = tf.get_variable(name=biases41_name, shape=biases41_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel42 = tf.get_variable(name=kernel42_name, shape=kernel42_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases42 = tf.get_variable(name=biases42_name, shape=biases42_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel43 = tf.get_variable(name=kernel43_name, shape=kernel43_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases43 = tf.get_variable(name=biases43_name, shape=biases43_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel51 = tf.get_variable(name=kernel51_name, shape=kernel51_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases51 = tf.get_variable(name=biases51_name, shape=biases51_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel52 = tf.get_variable(name=kernel52_name, shape=kernel52_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases52 = tf.get_variable(name=biases52_name, shape=biases52_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
kernel53 = tf.get_variable(name=kernel53_name, shape=kernel53_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1))
biases53 = tf.get_variable(name=biases53_name, shape=biases53_shape, dtype=tf.float32, initializer=tf.zeros_initializer())
fc1w = tf.get_variable(name=fc1w_name, shape=fc1w_shape, dtype=tf.float32, initializer=tf.glorot_normal_initializer())
fc1b = tf.get_variable(name=fc1b_name, shape=fc1b_shape, dtype=tf.float32, initializer=tf.ones_initializer())
fc2w = tf.get_variable(name=fc2w_name, shape=fc2w_shape, dtype=tf.float32, initializer=tf.glorot_normal_initializer())
fc2b = tf.get_variable(name=fc2b_name, shape=fc2b_shape, dtype=tf.float32, initializer=tf.ones_initializer())
fc3w = tf.get_variable(name=fc3w_name, shape=fc3w_shape, dtype=tf.float32, initializer=tf.glorot_normal_initializer())
fc3b = tf.get_variable(name=fc3b_name, shape=fc3b_shape, dtype=tf.float32, initializer=tf.ones_initializer())
triplet_variables = [
kernel11, biases11, kernel12, biases12, kernel21, biases21, kernel22, biases22, kernel31, biases31, kernel32, biases32, kernel33, biases33, kernel41, biases41, kernel42, biases42, kernel43, biases43, kernel51, biases51, kernel52, biases52, kernel53, biases53, fc1w, fc1b, fc2w, fc2b
]
classification_variables = triplet_variables + [fc3w, fc3b]
def extract_feature(image):
# conv 11
kernel11 = tf.get_variable(kernel11_name)
conv11 = tf.nn.conv2d(image, kernel11, conv11_shape, conv11_padding)
biases11 = tf.get_variable(biases11_name)
out11 = tf.nn.bias_add(conv11, biases11)
activation11 = tf.nn.relu(out11)
# Conv 12
kernel12 = tf.get_variable(kernel12_name)
conv12 = tf.nn.conv2d(activation11, kernel12, conv12_shape, conv12_padding )
biases12 = tf.get_variable(biases12_name)
out12 = tf.nn.bias_add(conv12, biases12)
activation12 = tf.nn.relu(out12)
pool1 = tf.nn.max_pool( activation12 , pool1_ksize, pool1_strides, pool1_padding, name=pool1_name )
# Conv 21
kernel21 = tf.get_variable(kernel21_name)
conv21 = tf.nn.conv2d(pool1, kernel21, conv21_shape, conv21_padding)
biases21 = tf.get_variable(biases21_name)
out21 = tf.nn.bias_add(conv21, biases21)
activation21 = tf.nn.relu(out21)
# Conv 22
kernel22 = tf.get_variable(kernel22_name)
conv22 = tf.nn.conv2d(activation21, kernel22, conv22_shape, conv22_padding)
biases22 = tf.get_variable(biases22_name)
out22 = tf.nn.bias_add(conv22, biases22)
activation22 = tf.nn.relu(out22)
pool2 = tf.nn.max_pool( activation22, pool2_ksize, pool2_strides, pool2_padding, name=pool2_name )
# Conv 31
kernel31 = tf.get_variable(kernel31_name)
conv31 = tf.nn.conv2d(pool2, kernel31, conv31_shape, conv31_padding)
biases31 = tf.get_variable(biases31_name)
out31 = tf.nn.bias_add(conv31, biases31)
activation31 = tf.nn.relu(out31)
# Conv 32
kernel32 = tf.get_variable(kernel32_name)
conv32 = tf.nn.conv2d(activation31, kernel32, conv32_shape, conv32_padding)
biases32 = tf.get_variable(biases32_name)
out32 = tf.nn.bias_add(conv32, biases32)
activation32 = tf.nn.relu(out32)
# Conv 33
kernel33 = tf.get_variable(kernel33_name)
conv33 = tf.nn.conv2d(activation32, kernel33, conv33_shape, conv33_padding)
biases33 = tf.get_variable(biases33_name)
out33 = tf.nn.bias_add(conv33, biases33)
activation33 = tf.nn.relu(out33)
pool3 = tf.nn.max_pool(activation33, pool3_ksize, pool3_strides, pool3_padding, name=pool3_name)
# Conv 41
kernel41 = tf.get_variable(kernel41_name)
conv41 = tf.nn.conv2d(pool3, kernel41, conv41_shape, conv41_padding)
biases41 = tf.get_variable(biases41_name)
out41 = tf.nn.bias_add(conv41, biases41)
activation41 = tf.nn.relu(out41)
# Conv 42
kernel42 = tf.get_variable(kernel42_name)
conv42 = tf.nn.conv2d(activation41, kernel42, conv42_shape, conv42_padding)
biases42 = tf.get_variable(biases42_name)
out42 = tf.nn.bias_add(conv42, biases42)
activation42 = tf.nn.relu(out42)
# Conv 43
kernel43 = tf.get_variable(kernel43_name)
conv43 = tf.nn.conv2d(activation42, kernel43, conv43_shape, conv43_padding)
biases43 = tf.get_variable(biases43_name)
out43 = tf.nn.bias_add(conv43, biases43)
activation43 = tf.nn.relu(out43)
pool4 = tf.nn.max_pool(activation43, pool4_ksize, pool4_strides, pool4_padding, name=pool4_name )
# Conv 51
kernel51 = tf.get_variable(kernel51_name)
conv51 = tf.nn.conv2d(pool4, kernel51, conv51_shape, conv51_padding)
biases51 = tf.get_variable(biases51_name)
out51 = tf.nn.bias_add(conv51, biases51)
activation51 = tf.nn.relu(out51)
# Conv 52
kernel52 = tf.get_variable(kernel52_name)
conv52 = tf.nn.conv2d(activation51, kernel52, conv52_shape, conv52_padding)
biases52 = tf.get_variable(biases52_name)
out52 = tf.nn.bias_add(conv52, biases52)
activation52 = tf.nn.relu(out52)
# Conv 53
kernel53 = tf.get_variable(kernel53_name)
conv53 = tf.nn.conv2d(activation52, kernel53, conv53_shape, conv53_padding)
biases53 = tf.get_variable(biases53_name)
out53 = tf.nn.bias_add(conv53, biases53)
activation53 = tf.nn.relu(out53)
pool5 = tf.nn.max_pool(activation53, pool5_ksize, pool5_strides, pool5_padding, name=pool5_name)
# FC1
# print(pool5)
print(np.prod(pool5.get_shape()[1:]))
fc1w = tf.get_variable(fc1w_name)
fc1b = tf.get_variable(fc1b_name)
pool5_flat = tf.reshape(pool5, [-1, fc1w_shape[0] ])
fc1l_out = tf.nn.bias_add( tf.matmul( pool5_flat, fc1w ), fc1b )
fc1l_activation = tf.nn.relu(fc1l_out)
# FC2
fc2w = tf.get_variable(fc2w_name)
fc2b = tf.get_variable(fc2b_name)
fc2l_out = tf.nn.bias_add( tf.matmul( fc1l_activation, fc2w ), fc2b )
fc2l_activation = tf.nn.relu(fc2l_out)
# FC3
fc3w = tf.get_variable(fc3w_name)
fc3b = tf.get_variable(fc3b_name)
fc3l_out = tf.nn.bias_add( tf.matmul( fc2l_activation, fc3w ), fc3b )
#fc3l_activation = tf.nn.softmax( fc3l_out )
return fc2l_activation, fc3l_out
def triplet_loss(anchor, positive, negative, alpha):
"""Calculate the triplet loss according to the FaceNet paper
Args:
anchor: the embeddings for the anchor images.
positive: the embeddings for the positive images.
negative: the embeddings for the negative images.
Returns:
the triplet loss according to the FaceNet paper as a float tensor.
"""
with tf.variable_scope('triplet_loss'):
pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, positive)), 1)
neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, negative)), 1)
basic_loss = tf.add(tf.subtract(pos_dist,neg_dist), alpha)
loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0), 0)
return loss
# Load weights
def load_weights(sess):
with tf.variable_scope('vgg', reuse=True):
print(len(classification_variables))
weights = np.load('vgg16_weights.npz')
weight_keys = sorted(weights.keys())
for i, k in enumerate(weight_keys[:-6]):
print("Loading {} to {}".format( i, k ))
sess.run(
classification_variables[i].assign( weights[k] )
)
with tf.variable_scope('vgg', reuse=tf.AUTO_REUSE):
# Feature extraction operation
image_features = extract_feature(IMAGE_INPUT)
query_features, query_output = extract_feature(X1)
positive_features, positive_output = extract_feature(X2)
negative_features, negative_output = extract_feature(X3)
_triplet_loss = triplet_loss( query_features, positive_features, negative_features, 0.1 )
classification_loss1 = tf.losses.softmax_cross_entropy( onehot_labels=y1, logits=query_output )
classification_loss2 = tf.losses.softmax_cross_entropy( onehot_labels=y2, logits=positive_output )
classification_loss3 = tf.losses.softmax_cross_entropy( onehot_labels=y3, logits=negative_output )
beta = 0.01
regularization_strength = 0.2
l2_loss = tf.add_n([ tf.nn.l2_loss(v) for v in tf.trainable_variables() if 'biases' not in v.name ]) * regularization_strength
all_loss = tf.reduce_sum([ classification_loss1, classification_loss2, classification_loss3, beta * _triplet_loss, l2_loss ], name='total_loss')
train_classification = tf.train.RMSPropOptimizer(0.00001).minimize(all_loss) #, var_list=classification_variables)
correct_prediction = tf.cast( tf.squeeze( [ tf.equal( tf.argmax( extract_feature(X1)[1] , axis=1), tf.argmax(y1, axis=1) ),
tf.equal( tf.argmax( extract_feature(X2)[1] , axis=1), tf.argmax(y2, axis=1) ),
tf.equal( tf.argmax( extract_feature(X3)[1] , axis=1), tf.argmax(y3, axis=1) ) ] ), tf.float32)
accuracy = tf.reduce_mean( correct_prediction )
# Saver
saver = tf.train.Saver()
def train(max_epochs=10, train_sample_count=6000):
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
# Load weights
load_weights(sess)
epochs = train_sample_count // BATCH_SIZE + 1
# Tracking Loss
train_accuracy = []
train_triplet_losses = []
test_accuracy = []
test_triplet_losses = []
with tqdm(total=max_epochs) as pbar_global:
for k in range(max_epochs):
with tqdm(total=epochs) as pbar:
for i in range(epochs):
query, positive, negative, label1, label2, label3 = next(train_gen)
_, a, tl = sess.run([ train_classification, accuracy, _triplet_loss ], feed_dict={ X1: query, y1: label1,
X2: positive, X3: negative, y2: label2, y3: label3
})
pbar.update(1)
pbar.set_description("Train Acc: {0:.2f} - Triplet: {0:.2f}".format( a, tl ))
train_accuracy.append(a)
train_triplet_losses.append(tl)
tquery, tpositive, tnegative, tlabel1, tlabel2, tlabel3 = next(test_gen)
ta, ttl = sess.run([ accuracy, _triplet_loss], feed_dict= { X1: tquery, y1: tlabel1, X2: tpositive, y2: tlabel2, X3: tnegative, y3: tlabel3})
test_accuracy.append(ta)
test_triplet_losses.append(ttl)
pbar_global.update(1)
pbar_global.set_description("Test Acc: {0:.2f} - Triplet: {0:.2f}".format(ta, ttl))
np.save('./logs/train_accuracy_{}.npy'.format(k), np.array(train_accuracy))
np.save('./logs/train_triplet_loss_{}.npy'.format(k), np.array(train_triplet_losses))
np.save('./logs/test_accuracy_{}.npy'.format(k), np.array(test_accuracy))
np.save('./logs/test_triplet_loss_{}.npy'.format(k), np.array(test_triplet_losses))
saver.save(sess, './models/model_{}.ckpt'.format(k))
# # Second session
def test():
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver.restore(sess, './models/model_0.ckpt')
tquery, tpositive, tnegative, tlabel1, tlabel2, tlabel3 = next(test_gen)
ta, ttl = sess.run([ accuracy, _triplet_loss], feed_dict= { X1: tquery, y1: tlabel1, X2: tpositive, y2: tlabel2, X3: tnegative, y3: tlabel3})
print("Accuracy {} - Loss - {}".format(ta, ttl))
def extract_features_from_images(images, labels, filename='features', batch_size=1000):
with tf.Session() as sess:
saver.restore(sess, './models/model_9.ckpt')
n_samples = len(images)
for i in tqdm(range(0, n_samples, batch_size )):
upper_limit = min(i + batch_size, n_samples)
features = sess.run(image_features, feed_dict={ IMAGE_INPUT: images[i: upper_limit] })
np.save('./features/' + filename + '_{}.npy'.format(i), np.array(features[0]))
np.save('./features/' + filename + '_labels_{}.npy'.format(i), labels[ i: upper_limit ])
if __name__ == '__main__':
#train()
# test()
extract_features_from_images(
to_extract_feature, to_extract_feature_labels
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment