Skip to content

Instantly share code, notes, and snippets.

@ghostbust555
Created May 12, 2019 22:40
Show Gist options
  • Save ghostbust555/5cbcabef6dc46415721f273ecccf9048 to your computer and use it in GitHub Desktop.
Save ghostbust555/5cbcabef6dc46415721f273ecccf9048 to your computer and use it in GitHub Desktop.
triplet BPR
import datetime
import itertools
import random
from collections import defaultdict
import matplotlib
from scipy import spatial
from scipy.interpolate import interpolate
import matplotlib.pyplot as plt
import keras
from keras import Input, Model, regularizers
from keras.datasets import mnist
from keras.layers import Conv2D, LeakyReLU, Dropout, BatchNormalization, GlobalAveragePooling2D, Dense, K, Lambda
from keras.optimizers import Adam
import numpy as np
# num_cores = 4
#
# if GPU:
# num_GPU = 1
# num_CPU = 1
# if CPU:
# num_CPU = 1
# num_GPU = 0
#
# config = tf.ConfigProto(intra_op_parallelism_threads=num_cores,
# inter_op_parallelism_threads=num_cores,
# allow_soft_placement=True,
# device_count = {'CPU' : num_CPU,
# 'GPU' : num_GPU}
# )
#
# session = tf.Session(config=config)
# K.set_session(session)
class TripletNetwork:
def __init__(self, img_shape, embedding_size = 10):
self.img_shape = img_shape
self.df = 32
self.single_model = self.build_single_model(embedding_size)
self.full_model = self.build_full_model(self.single_model)
def build_single_model(self, embedding_size = 10):
def d_layer(layer_input, filters, f_size=3, normalization=True, dropout_rate=0., stride=2):
"""Discriminator layer"""
d = Conv2D(filters, kernel_size=f_size, strides=stride, padding='same')(layer_input)
d = LeakyReLU(alpha=0.2)(d)
if dropout_rate:
d = Dropout(dropout_rate)(d)
if normalization:
d = BatchNormalization()(d)
return d
img = Input(shape=self.img_shape)
d1 = d_layer(img, self.df, normalization=False) # , f_size=5, stride=1
d2 = d_layer(d1, self.df * 2, dropout_rate=.2) # , dropout_rate=.2
d3 = d_layer(d2, self.df * 4, dropout_rate=.2) # , dropout_rate=.2
d4 = d_layer(d3, self.df * 8, dropout_rate=.2) # , dropout_rate=.2
x = GlobalAveragePooling2D()(d4)
x = Dense(embedding_size, activation='tanh')(x)
embedding = Lambda(lambda tensor: K.l2_normalize(tensor))(x)
model = Model(img, embedding)
model.compile(Adam(), loss="mae")
model.summary()
return model
def build_full_model(self, single_model):
posIn = Input(shape=self.img_shape, name="positive")
negIn = Input(shape=self.img_shape, name="negative")
anchorIn = Input(shape=self.img_shape, name="anchor")
encoded_positive = single_model(posIn)
encoded_negative = single_model(negIn)
encoded_anchor = single_model(anchorIn)
DAP = Lambda(lambda tensors: K.sum(tensors[0] * tensors[1], axis=-1, keepdims=True), name='DAP_loss') # Distance for Anchor-Positive pair
DAN = Lambda(lambda tensors: K.sum(tensors[0] * tensors[1], axis=-1, keepdims=True), name='DAN_loss') # Distance for Anchor-Negative pair
Triplet_loss = Lambda(lambda loss: 1 - K.sigmoid(loss[0] - loss[1]), name='BPR_Triplet_loss')
DAP_loss = DAP([encoded_anchor, encoded_positive])
DAN_loss = DAN([encoded_anchor, encoded_negative])
Final_loss = Triplet_loss([DAP_loss, DAN_loss])
model = Model(inputs=[posIn, negIn, anchorIn], outputs=Final_loss)
model.compile(Adam(), loss="mae")
model.summary()
return model
def chunks(self, l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield np.array(l[i:i + n])
def batch_generator(self, grouped_training, num_classes, batch_size):
while(True):
batch = self.get_triplet_batch(grouped_training, num_classes, batch_size)
batch = list(batch)
empty_out = np.zeros((len(batch), 1))
batch = np.swapaxes(batch, 0, 1)
batch_list = [i for i in batch]
yield batch_list, empty_out
def train_on_batch(self, grouped_training, num_classes, batch_size):
batch_list, empty_out = next(self.batch_generator(grouped_training, num_classes, batch_size))
loss = self.full_model.train_on_batch(batch_list, empty_out)
return loss
def train_epoch(self, grouped_training, num_classes, batch_size, steps_per_epoch, current_epoch, callbacks):
loss = self.full_model.fit_generator(
generator=self.batch_generator(grouped_training, num_classes, batch_size),
steps_per_epoch=steps_per_epoch,
epochs=current_epoch+1,
verbose=2,
initial_epoch=current_epoch,
callbacks=callbacks
)
return loss
def manipulate_image(self, img):
newImg = np.rot90(img, k=random.randint(0,3))
return newImg
def get_triplet_batch(self, grouped_training, num_classes, batch_size):
shufled_classes = list(range(0, num_classes))
for i in range(batch_size):
np.random.shuffle(shufled_classes)
pos_class = shufled_classes[0]
neg_class = shufled_classes[1]
pos = random.choice(grouped_training[pos_class])
anc = random.choice(grouped_training[pos_class])
while len(grouped_training[pos_class]) > 1 and pos is anc:
anc = random.choice(grouped_training[pos_class])
neg = random.choice(grouped_training[neg_class])
yield [self.manipulate_image(pos), self.manipulate_image(neg), self.manipulate_image(anc)]
def run_model_eval(self, test_set, test_sample_size=1000):
shufled_classes = list(range(0, num_classes))
pos_anc_dist = 0
pos_neg_dist = 0
for i in range(test_sample_size):
np.random.shuffle(shufled_classes)
pos_class = shufled_classes[0]
neg_class = shufled_classes[1]
pos = random.choice(test_set[pos_class])
anc = random.choice(test_set[pos_class])
while len(test_set[pos_class]) > 1 and pos is anc:
anc = random.choice(test_set[pos_class])
neg = random.choice(test_set[neg_class])
fvs = self.single_model.predict_on_batch(
np.array([triplet.manipulate_image(pos), triplet.manipulate_image(anc), triplet.manipulate_image(neg)]))
pos_fv = fvs[0]
anc_fv = fvs[1]
neg_fv = fvs[2]
pos_anc_dist += spatial.distance.cosine(pos_fv, anc_fv)
pos_neg_dist += spatial.distance.cosine(pos_fv, neg_fv)
print("pos_anc_dis = %05f" % (pos_anc_dist / test_sample_size))
print("pos_neg_dist = %05f" % (pos_neg_dist / test_sample_size))
print("neg/pos dist ratio = %05f" % (pos_neg_dist / pos_anc_dist))
return ((pos_anc_dist / test_sample_size), (pos_neg_dist / test_sample_size), (pos_neg_dist / pos_anc_dist))
def run_model_eval_auc(self, test_set, test_sample_size=50, mini_batch_size = 32, show_plot = False):
computed_vectors = [[] for i in range(num_classes)]
for c_key in test_set.keys():
shuffled_class = test_set[c_key]
np.random.shuffle(shuffled_class)
class_size = test_set[c_key]
for i in range(0, test_sample_size, mini_batch_size):
minibatch = shuffled_class[i:min(i+min(mini_batch_size, test_sample_size), len(class_size))]
fvs = self.single_model.predict_on_batch(np.array(minibatch))
computed_vectors[c_key].extend(fvs)
tprs = [0]
fprs = [0]
for threshold in np.linspace(.002,np.sqrt(1.8),25)**2:#np.logspace(-2.1, .25, 25) #np.logspace(-9, 1, 25, base=2)
fp_count = 0
fn_count = 0
tp_count = 0
tn_count = 0
for vector_list_idx in range(len(computed_vectors)):
vector_list = computed_vectors[vector_list_idx]
pos_fv = vector_list[0]
for anc_fv_idx in range(1, len(vector_list)):
anc_fv = vector_list[anc_fv_idx]
if spatial.distance.cosine(pos_fv, anc_fv) < threshold:
tp_count += 1
else:
fn_count += 1
other_list_range = list(range(len(computed_vectors)))
other_list_range.remove(vector_list_idx)
for other_vector_list_idx in other_list_range:
for neg_fv in computed_vectors[other_vector_list_idx]:
if spatial.distance.cosine(pos_fv, neg_fv) < threshold:
fp_count += 1
else:
tn_count += 1
tpr = tp_count/(tp_count+fn_count)
fpr = fp_count/(tn_count+fp_count)
tprs.append(tpr)
fprs.append(fpr)
tprs.append(1)
fprs.append(1)
fprs, tprs = (list(t) for t in zip(*sorted(zip(fprs, tprs))))
new_fprs = []
new_tprs = []
for both in zip(fprs, tprs):
if both[0] not in new_fprs:
new_fprs.append(both[0])
new_tprs.append(both[1])
fprs = new_fprs
tprs = new_tprs
f_tpr = interpolate.interp1d(fprs, tprs, kind="quadratic")
x = np.linspace(0,1,100)
int_tprs = f_tpr(x)
auc = np.trapz(tprs, x=fprs)
int_auc = np.trapz(int_tprs, x=x)
print("AUC = %05f" % auc)
print("INT_AUC = %05f" % int_auc)
if show_plot:
#def close_event():
# plt.close() # timer calls this function after 3 seconds and closes the window
#fig = plt.figure()
# timer = fig.canvas.new_timer(
# interval=6000) # creating a timer object and setting an interval of 3000 milliseconds
# timer.add_callback(close_event)
plt.plot(fprs, tprs, color='blue', marker=".")
plt.plot(x, int_tprs, color='red')
plt.ylim([0,1])
plt.xlim([0,1])
#timer.start()
plt.show()
return auc
batch_size = 128
num_classes = 10
epochs = 12
channels = 1
# input image dimensions
img_rows, img_cols = 28, 28
# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
if K.image_data_format() == 'channels_first':
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
zipped_train = zip(x_train, y_train)
grouped_training = defaultdict(list)
for item in zipped_train:
key = item[1]
grouped_training[key].append(item[0])
zipped_test = zip(x_test, y_test)
grouped_test = defaultdict(list)
for item in zipped_test:
key = item[1]
grouped_test[key].append(item[0])
triplet = TripletNetwork((img_rows, img_cols, channels))
class ModelEval(keras.callbacks.Callback):
def __init__(self):
super(ModelEval, self).__init__()
def on_epoch_end(self, epoch, logs=None):
(p_a_dis, p_n_dis, n_p_ratio) = triplet.run_model_eval(grouped_test)
logs["p_a_dis"] = p_a_dis
logs["p_n_dis"] = p_n_dis
logs["n_p_ratio"] = n_p_ratio
class AUC(keras.callbacks.Callback):
def __init__(self):
super(AUC, self).__init__()
def on_epoch_end(self, epoch, logs=None):
auc = triplet.run_model_eval_auc(grouped_test, show_plot=False)
logs["auc"] = auc
dt = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
tb = keras.callbacks.TensorBoard(log_dir='./logs/%s' % (dt))
saver = keras.callbacks.ModelCheckpoint("models/%s" % (dt), monitor="auc", verbose=1, mode="max", save_best_only=True)
callbacks = [ModelEval(), AUC(), saver, tb]
for epoch in range(0,1000):
triplet.train_epoch(grouped_training, num_classes, batch_size, int(x_train.shape[0]/batch_size), epoch, callbacks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment