Skip to content

Instantly share code, notes, and snippets.

@batzner
Created November 20, 2018 15:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save batzner/aaed9cdb4cd1d05f85a8a88e67c57aac to your computer and use it in GitHub Desktop.
Save batzner/aaed9cdb4cd1d05f85a8a88e67c57aac to your computer and use it in GitHub Desktop.
'''Trains a simple convnet on the MNIST dataset with adversarial training.
During the adversarial train step's feed-forward pass, the dropout masks can be
reused by setting REUSE_DROPOUT to True
For more info, see https://stackoverflow.com/questions/53395329/should-dropout-masks-be-reused-during-adversarial-training
Requirements:
- tensorflow==1.12.0
- cleverhans bleeding edge: pip install git+https://github.com/tensorflow/cleverhans.git#egg=cleverhans
- pandas==0.22.0
- tqdm==4.28.1
'''
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.datasets import mnist
from tensorflow.python.keras.models import Sequential, Model
from tensorflow.python.keras.layers import Dense, Flatten, Dropout
from tensorflow.python.keras.layers import Conv2D
from tensorflow.python.keras.utils import to_categorical
from tensorflow.python.ops.losses.losses_impl import softmax_cross_entropy
from tensorflow.train import get_or_create_global_step
from cleverhans.attacks_tfe import FastGradientMethod
from cleverhans.model import Model as CleverhansBaseModel
from tqdm import tqdm
tf.enable_eager_execution()
np.random.seed(0)
tf.set_random_seed(0)
ADVERSARIAL_TRAINING = True
ADVERSARIAL_ALPHA = 0.5
FGSM_PARAMS = {'eps': 0.3, 'clip_min': 0., 'clip_max': 1.}
REUSE_DROPOUT = True
BATCH_SIZE = 128
EPOCHS = 20
N_CLASSES = 10
def get_model(input_shape):
# Except for the Dropout layer, this model is the same as the one used in
# cleverhans/cleverhans_tutorials/mnist_tutorial_keras_tf.py
model = Sequential()
model.add(Conv2D(64, kernel_size=(8, 8), strides=(2, 2), padding='same',
activation='relu',
input_shape=input_shape))
model.add(Conv2D(128, kernel_size=(6, 6), strides=(2, 2), padding='valid',
activation='relu'))
model.add(Conv2D(128, kernel_size=(5, 5), strides=(1, 1), padding='valid',
activation='relu'))
model.add(Flatten())
model.add(DropoutReuseMask(0.5))
model.add(Dense(N_CLASSES, name='logits'))
return model
def main():
dataset_train, dataset_test = get_datasets()
input_shape = dataset_train.output_shapes[0][1:]
model = get_model(input_shape)
ch_model = CleverhansModel(model)
fgsm = FastGradientMethod(ch_model)
# Train loop
optimizer = tf.train.AdamOptimizer()
progress_train = pd.DataFrame()
progress_test = pd.DataFrame()
for i_epoch in range(EPOCHS):
# Training
for x_batch, y_batch in tqdm(dataset_train):
if ADVERSARIAL_TRAINING:
loss, acc, loss_adv, acc_adv = train_step_adv(x_batch, y_batch,
model, optimizer,
fgsm)
step_progress = {
'train_loss': loss,
'train_loss_adv': loss_adv,
'train_acc': acc,
'train_acc_adv': acc_adv
}
else:
loss, acc = train_step(x_batch, y_batch, model, optimizer)
step_progress = {
'train_loss': loss,
'train_acc': acc
}
step_progress['epoch'] = i_epoch
step_progress['step'] = get_or_create_global_step().numpy()
progress_train = progress_train.append(step_progress,
ignore_index=True)
print('Epoch done')
print(progress_train.loc[progress_train['epoch'] == i_epoch].mean())
# Evaluation
for x_batch, y_batch in tqdm(dataset_test):
loss, acc, loss_adv, acc_adv = test_step(x_batch, y_batch, model,
fgsm)
step_progress = {
'epoch': i_epoch,
'test_loss': loss,
'test_loss_adv': loss_adv,
'test_acc': acc,
'test_acc_adv': acc_adv
}
progress_test = progress_test.append(step_progress,
ignore_index=True)
print('Evaluation done')
print(progress_test.loc[progress_test['epoch'] == i_epoch].mean())
print(progress_test.groupby('epoch')['test_acc', 'test_acc_adv'].mean())
def train_step(x_batch, y_batch, model, optimizer):
K.set_learning_phase(1)
# Compute the gradient for the legitimate batch only
with tf.GradientTape() as tape:
logits = model(x_batch, training=True)
loss = softmax_cross_entropy(y_batch, logits)
grads = tape.gradient(loss, model.variables)
optimizer.apply_gradients(zip(grads, model.variables),
global_step=get_or_create_global_step())
# Compute the accuracies
acc = np.mean(np.equal(np.argmax(y_batch, axis=-1),
np.argmax(logits, axis=-1)))
return loss.numpy(), acc
def train_step_adv(x_batch, y_batch, model, optimizer, fgsm):
K.set_learning_phase(1)
# Compute the gradient for the legitimate batch
with tf.GradientTape() as tape:
logits = model(x_batch, training=True)
loss = softmax_cross_entropy(y_batch, logits)
loss *= ADVERSARIAL_ALPHA
grads_legit = tape.gradient(loss, model.variables)
# Adversarial training
DropoutReuseMask.set_reuse_on_model(model, REUSE_DROPOUT)
x_adv = fgsm.generate(x_batch, **FGSM_PARAMS)
with tf.GradientTape() as tape:
logits_adv = model(x_adv, training=True)
loss_adv = softmax_cross_entropy(y_batch, logits_adv)
loss_adv *= (1 - ADVERSARIAL_ALPHA)
grads_adv = tape.gradient(loss_adv, model.variables)
DropoutReuseMask.set_reuse_on_model(model, False)
# Train
grads = sum_gradients(grads_legit, grads_adv)
optimizer.apply_gradients(zip(grads, model.variables),
global_step=get_or_create_global_step())
# Compute the accuracies
acc = np.mean(np.equal(np.argmax(y_batch, axis=-1),
np.argmax(logits, axis=-1)))
acc_adv = np.mean(np.equal(np.argmax(y_batch, axis=-1),
np.argmax(logits_adv, axis=-1)))
return loss.numpy(), acc, loss_adv.numpy(), acc_adv
def test_step(x_batch, y_batch, model, fgsm):
K.set_learning_phase(0)
# Feed the input
logits = model(x_batch, training=False)
loss = softmax_cross_entropy(y_batch, logits)
# Adversarial test
x_adv = fgsm.generate(x_batch, **FGSM_PARAMS)
logits_adv = model(x_adv, training=False)
loss_adv = softmax_cross_entropy(y_batch, logits_adv)
# Compute the accuracies
acc = np.mean(np.equal(np.argmax(y_batch, axis=-1),
np.argmax(logits, axis=-1)))
acc_adv = np.mean(np.equal(np.argmax(y_batch, axis=-1),
np.argmax(logits_adv, axis=-1)))
return loss.numpy(), acc, loss_adv.numpy(), acc_adv
def get_datasets():
# Fetch and format the mnist data
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# convert class vectors to binary class matrices
y_train = to_categorical(y_train, N_CLASSES)
y_test = to_categorical(y_test, N_CLASSES)
dataset_train = tf.data.Dataset.from_tensor_slices(
(tf.cast(x_train[..., tf.newaxis] / 255, tf.float32),
tf.cast(y_train, tf.float32)))
dataset_train = dataset_train.shuffle(1000).batch(BATCH_SIZE)
dataset_test = tf.data.Dataset.from_tensor_slices(
(tf.cast(x_test[..., tf.newaxis] / 255, tf.float32),
tf.cast(y_test, tf.float32)))
dataset_test = dataset_test.shuffle(1000).batch(BATCH_SIZE)
return dataset_train, dataset_test
def sum_gradients(grads_a, grads_b):
grads = []
for g_a, g_b in zip(grads_a, grads_b):
if g_a is not None:
if g_b is not None:
grads.append(g_a + g_b)
else:
grads.append(g_b)
else:
grads.append(g_b)
return grads
class DropoutReuseMask(Dropout):
def __init__(self, *args, **kwargs):
self.mask = None
self.reuse = False
super(DropoutReuseMask, self).__init__(*args, **kwargs)
def call(self, inputs, *args, **kwargs):
if not self.reuse:
# Create a new mask
mask_input = tf.ones_like(inputs)
self.mask = super(DropoutReuseMask, self).call(mask_input,
*args,
**kwargs)
return inputs * self.mask
@staticmethod
def set_reuse_on_model(model: Model, reuse):
for layer in model.layers:
if isinstance(layer, DropoutReuseMask):
layer.reuse = reuse
class CleverhansModel(CleverhansBaseModel):
def __init__(self, model):
self.model = model
super(CleverhansModel, self).__init__(nb_classes=N_CLASSES)
def fprop(self, x, **kwargs):
return {
CleverhansBaseModel.O_LOGITS: self.model(x, **kwargs)
}
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment