Skip to content

Instantly share code, notes, and snippets.

@myurasov
Last active April 3, 2021 03:12
Show Gist options
  • Star 18 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save myurasov/6ecf449b32eb263e7d9a7f6e9aed5dc2 to your computer and use it in GitHub Desktop.
Save myurasov/6ecf449b32eb263e7d9a7f6e9aed5dc2 to your computer and use it in GitHub Desktop.
Wasserstein ACGAN in Keras
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "import os\n\nimport matplotlib.pyplot as plt\n%matplotlib inline\n%config InlineBackend.figure_format = 'retina'\n\nimport keras.backend as K\nfrom keras.datasets import mnist\nfrom keras.layers import *\nfrom keras.models import *\nfrom keras.optimizers import *\nfrom keras.initializers import *\nfrom keras.callbacks import *\nfrom keras.utils.generic_utils import Progbar",
"execution_count": 1,
"outputs": [
{
"output_type": "stream",
"name": "stderr",
"text": "Using TensorFlow backend.\n"
}
]
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "RND = 777\n\nRUN = 'F'\nOUT_DIR = 'out/' + RUN\nTENSORBOARD_DIR = '/tensorboard/wgans/' + RUN\n\n# GPU # \nGPU = \"1\"\n\n# latent vector size\nZ_SIZE = 100\n\n# number of iterations D is trained for per each G iteration\nD_ITERS = 5\n\nBATCH_SIZE = 100\nITERATIONS = 25000",
"execution_count": 2,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "np.random.seed(RND)",
"execution_count": 3,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "if not os.path.isdir(OUT_DIR): os.makedirs(OUT_DIR)",
"execution_count": 4,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "# use specific GPU\nos.environ[\"CUDA_DEVICE_ORDER\"] = \"PCI_BUS_ID\"\nos.environ[\"CUDA_VISIBLE_DEVICES\"] = GPU",
"execution_count": 5,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "K.set_image_dim_ordering('tf')",
"execution_count": 6,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "# basically return mean(y_pred),\n# but with ability to inverse it for minimization (when y_true == -1)\ndef wasserstein(y_true, y_pred):\n return K.mean(y_true * y_pred)",
"execution_count": 7,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "def create_D():\n\n # weights are initlaized from normal distribution with below params\n weight_init = RandomNormal(mean=0., stddev=0.02)\n\n input_image = Input(shape=(28, 28, 1), name='input_image')\n\n x = Conv2D(\n 32, (3, 3),\n padding='same',\n name='conv_1',\n kernel_initializer=weight_init)(input_image)\n x = LeakyReLU()(x)\n x = MaxPool2D(pool_size=2)(x)\n x = Dropout(0.3)(x)\n\n x = Conv2D(\n 64, (3, 3),\n padding='same',\n name='conv_2',\n kernel_initializer=weight_init)(x)\n x = MaxPool2D(pool_size=1)(x)\n x = LeakyReLU()(x)\n x = Dropout(0.3)(x)\n\n x = Conv2D(\n 128, (3, 3),\n padding='same',\n name='conv_3',\n kernel_initializer=weight_init)(x)\n x = MaxPool2D(pool_size=2)(x)\n x = LeakyReLU()(x)\n x = Dropout(0.3)(x)\n\n x = Conv2D(\n 256, (3, 3),\n padding='same',\n name='coonv_4',\n kernel_initializer=weight_init)(x)\n x = MaxPool2D(pool_size=1)(x)\n x = LeakyReLU()(x)\n x = Dropout(0.3)(x)\n\n features = Flatten()(x)\n\n output_is_fake = Dense(\n 1, activation='linear', name='output_is_fake')(features)\n\n output_class = Dense(\n 10, activation='softmax', name='output_class')(features)\n\n return Model(\n inputs=[input_image], outputs=[output_is_fake, output_class], name='D')",
"execution_count": 8,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "def create_G(Z_SIZE=Z_SIZE):\n DICT_LEN = 10\n EMBEDDING_LEN = Z_SIZE\n\n # weights are initlaized from normal distribution with below params\n weight_init = RandomNormal(mean=0., stddev=0.02)\n\n # class#\n input_class = Input(shape=(1, ), dtype='int32', name='input_class')\n # encode class# to the same size as Z to use hadamard multiplication later on\n e = Embedding(\n DICT_LEN, EMBEDDING_LEN,\n embeddings_initializer='glorot_uniform')(input_class)\n embedded_class = Flatten(name='embedded_class')(e)\n\n # latent var\n input_z = Input(shape=(Z_SIZE, ), name='input_z')\n\n # hadamard product\n h = multiply([input_z, embedded_class], name='h')\n\n # cnn part\n x = Dense(1024)(h)\n x = LeakyReLU()(x)\n\n x = Dense(128 * 7 * 7)(x)\n x = LeakyReLU()(x)\n x = Reshape((7, 7, 128))(x)\n\n x = UpSampling2D(size=(2, 2))(x)\n x = Conv2D(256, (5, 5), padding='same', kernel_initializer=weight_init)(x)\n x = LeakyReLU()(x)\n\n x = UpSampling2D(size=(2, 2))(x)\n x = Conv2D(128, (5, 5), padding='same', kernel_initializer=weight_init)(x)\n x = LeakyReLU()(x)\n\n x = Conv2D(\n 1, (2, 2),\n padding='same',\n activation='tanh',\n name='output_generated_image',\n kernel_initializer=weight_init)(x)\n\n return Model(inputs=[input_z, input_class], outputs=x, name='G')",
"execution_count": 9,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "D = create_D()\n\n# # remember D dropout rates\n# for l in D.layers:\n# if l.name.startswith('dropout'):\n# l._rate = l.rate\n\nD.compile(\n optimizer=RMSprop(lr=0.00005),\n loss=[wasserstein, 'sparse_categorical_crossentropy'])",
"execution_count": 10,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "input_z = Input(shape=(Z_SIZE, ), name='input_z_')\ninput_class = Input(shape=(1, ),name='input_class_', dtype='int32')",
"execution_count": 11,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "G = create_G()\n\n# create combined D(G) model\noutput_is_fake, output_class = D(G(inputs=[input_z, input_class]))\nDG = Model(inputs=[input_z, input_class], outputs=[output_is_fake, output_class])\nDG.get_layer('D').trainable = False # freeze D in generator training faze\n\nDG.compile(\n optimizer=RMSprop(lr=0.00005),\n loss=[wasserstein, 'sparse_categorical_crossentropy']\n)",
"execution_count": 12,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "# load mnist data\n(X_train, y_train), (X_test, y_test) = mnist.load_data()\n\n# use all available 70k samples\nX_train = np.concatenate((X_train, X_test))\ny_train = np.concatenate((y_train, y_test))\n\n# convert to -1..1 range, reshape to (sample_i, 28, 28, 1)\nX_train = (X_train.astype(np.float32) - 127.5) / 127.5\nX_train = np.expand_dims(X_train, axis=3)",
"execution_count": 13,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "# save 10x10 sample of generated images\nsamples_zz = np.random.normal(0., 1., (100, Z_SIZE))\ndef generate_samples(n=0, save=True):\n\n generated_classes = np.array(list(range(0, 10)) * 10)\n generated_images = G.predict([samples_zz, generated_classes.reshape(-1, 1)])\n\n rr = []\n for c in range(10):\n rr.append(\n np.concatenate(generated_images[c * 10:(1 + c) * 10]).reshape(\n 280, 28))\n img = np.hstack(rr)\n\n if save:\n plt.imsave(OUT_DIR + '/samples_%07d.png' % n, img, cmap=plt.cm.gray)\n\n return img\n\n# write tensorboard summaries\nsw = tf.summary.FileWriter(TENSORBOARD_DIR)\ndef update_tb_summary(step, sample_images=True, save_image_files=True):\n\n s = tf.Summary()\n\n # losses as is\n for names, vals in zip((('D_real_is_fake', 'D_real_class'),\n ('D_fake_is_fake', 'D_fake_class'), ('DG_is_fake',\n 'DG_class')),\n (D_true_losses, D_fake_losses, DG_losses)):\n\n v = s.value.add()\n v.simple_value = vals[-1][1]\n v.tag = names[0]\n\n v = s.value.add()\n v.simple_value = vals[-1][2]\n v.tag = names[1]\n\n # D loss: -1*D_true_is_fake - D_fake_is_fake\n v = s.value.add()\n v.simple_value = -D_true_losses[-1][1] - D_fake_losses[-1][1]\n v.tag = 'D loss (-1*D_real_is_fake - D_fake_is_fake)'\n\n # generated image\n if sample_images:\n img = generate_samples(step, save=save_image_files)\n s.MergeFromString(tf.Session().run(\n tf.summary.image('samples_%07d' % step,\n img.reshape([1, *img.shape, 1]))))\n\n sw.add_summary(s, step)\n sw.flush()",
"execution_count": 14,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "# fake = 1\n# real = -1\n\nprogress_bar = Progbar(target=ITERATIONS)\n\nDG_losses = []\nD_true_losses = []\nD_fake_losses = []\n\nfor it in range(ITERATIONS):\n\n if len(D_true_losses) > 0:\n progress_bar.update(\n it,\n values=[\n ('D_real_is_fake', np.mean(D_true_losses[-5:], axis=0)[1]),\n ('D_real_class', np.mean(D_true_losses[-5:], axis=0)[2]),\n ('D_fake_is_fake', np.mean(D_fake_losses[-5:], axis=0)[1]),\n ('D_fake_class', np.mean(D_fake_losses[-5:], axis=0)[2]),\n ('D(G)_is_fake', np.mean(DG_losses[-5:],axis=0)[1]),\n ('D(G)_class', np.mean(DG_losses[-5:],axis=0)[2])\n ]\n )\n \n else:\n progress_bar.update(it)\n\n # 1: train D on real+generated images\n\n if (it % 1000) < 25 or it % 500 == 0: # 25 times in 1000, every 500th\n d_iters = 100\n else:\n d_iters = D_ITERS\n\n for d_it in range(d_iters):\n\n # unfreeze D\n D.trainable = True\n for l in D.layers: l.trainable = True\n \n # # restore D dropout rates\n # for l in D.layers:\n # if l.name.startswith('dropout'):\n # l.rate = l._rate\n\n # clip D weights\n\n for l in D.layers:\n weights = l.get_weights()\n weights = [np.clip(w, -0.01, 0.01) for w in weights]\n l.set_weights(weights)\n\n # 1.1: maximize D output on reals === minimize -1*(D(real))\n\n # draw random samples from real images\n index = np.random.choice(len(X_train), BATCH_SIZE, replace=False)\n real_images = X_train[index]\n real_images_classes = y_train[index]\n\n D_loss = D.train_on_batch(real_images, [-np.ones(BATCH_SIZE), real_images_classes])\n D_true_losses.append(D_loss)\n\n # 1.2: minimize D output on fakes \n\n zz = np.random.normal(0., 1., (BATCH_SIZE, Z_SIZE))\n generated_classes = np.random.randint(0, 10, BATCH_SIZE)\n generated_images = G.predict([zz, generated_classes.reshape(-1, 1)])\n\n D_loss = D.train_on_batch(generated_images, [np.ones(BATCH_SIZE), generated_classes])\n D_fake_losses.append(D_loss)\n\n # 2: train D(G) (D is frozen)\n # minimize D output while supplying it with fakes, telling it that they are reals (-1)\n\n # freeze D\n D.trainable = False\n for l in D.layers: l.trainable = False\n \n # # disable D dropout layers\n # for l in D.layers:\n # if l.name.startswith('dropout'):\n # l.rate = 0.\n\n zz = np.random.normal(0., 1., (BATCH_SIZE, Z_SIZE)) \n generated_classes = np.random.randint(0, 10, BATCH_SIZE)\n\n DG_loss = DG.train_on_batch(\n [zz, generated_classes.reshape((-1, 1))],\n [-np.ones(BATCH_SIZE), generated_classes])\n\n DG_losses.append(DG_loss)\n\n if it % 10 == 0:\n update_tb_summary(it, sample_images=(it % 10 == 0), save_image_files=True)",
"execution_count": null,
"outputs": []
}
],
"metadata": {
"gist": {
"id": "6ecf449b32eb263e7d9a7f6e9aed5dc2",
"data": {
"description": "Wasserstein ACGAN in Keras",
"public": true
}
},
"language_info": {
"pygments_lexer": "ipython3",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"nbconvert_exporter": "python",
"version": "3.5.2",
"name": "python"
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3",
"language": "python"
},
"_draft": {
"nbviewer_url": "https://gist.github.com/6ecf449b32eb263e7d9a7f6e9aed5dc2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@HitLuca
Copy link

HitLuca commented Jun 7, 2018

I assume that your weight initialization is based on the fact that you want your critic to not be almost completely clipped at first iteration. Why then are the weights for the generator also initialized in the same way? Leaving the default initialization should be better for it in my opinion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment