Last active
May 2, 2019 21:36
-
-
Save pat-coady/567d30961616f8ec868421eefe6c4a1a to your computer and use it in GitHub Desktop.
TensorFlow QueueRunner Example 1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# TensorFlow QueueRunner Example \\#1\n", | |
"\n", | |
"*Patrick Coady (pcoady@alum.mit.edu)*\n", | |
"\n", | |
"This example cascades queues to build a data pipeline. This example is a bit contrived to work 3 queues into one graph.\n", | |
"\n", | |
"This example notebook feeds the graph from a NumPy array. The next example notebook will feed the pipeline directly from files." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Queue Pipeline\n", | |
"\n", | |
"![Queue Pipeline](https://learningai.io//assets/tf-queue-runner/queue_arch.png)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"import tensorflow as tf\n", | |
"import numpy as np" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"x:\n", | |
"[[ 0. 1. 2.]\n", | |
" [ 3. 4. 5.]\n", | |
" [ 6. 7. 8.]]\n", | |
" ...\n", | |
"y:\n", | |
"[[ -2.]\n", | |
" [-11.]\n", | |
" [-20.]]\n", | |
" ...\n", | |
"\n", | |
"Sum of all x and y elements equals the # of rows: 100.0\n" | |
] | |
} | |
], | |
"source": [ | |
"# Build toy data set\n", | |
"x_in = np.arange(3*100, dtype=np.float32).reshape((-1, 3))\n", | |
"y_in = np.reshape(-np.sum(x_in, axis=1) + 1, (-1, 1))\n", | |
"# y is negative of x-columns plus 1\n", | |
"print('x:')\n", | |
"print(x_in[:3,:])\n", | |
"print(' ...')\n", | |
"print('y:')\n", | |
"print(y_in[:3,:])\n", | |
"print(' ...')\n", | |
"sum_all = np.sum(x_in)+np.sum(y_in)\n", | |
"print('\\nSum of all x and y elements equals the # of rows: {}'.format(sum_all))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": false, | |
"scrolled": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"\n", | |
"Sum of all x and y elements equals the # of rows: 90.0\n" | |
] | |
} | |
], | |
"source": [ | |
"g = tf.Graph()\n", | |
"with g.as_default():\n", | |
" with tf.variable_scope('inputs', initializer=tf.zeros_initializer()):\n", | |
" # we'll load these variables once session is launched\n", | |
" x = tf.get_variable('x', shape=(100, 3), dtype=tf.float32)\n", | |
" y = tf.get_variable('y', shape=(100, 1), dtype=tf.float32)\n", | |
" \n", | |
" # Queue #1: Pull examples 1 row at a time\n", | |
" q1_x = tf.train.input_producer(x, num_epochs=1, shuffle=False).dequeue()\n", | |
" q1_y = tf.train.input_producer(y, num_epochs=1, shuffle=False).dequeue()\n", | |
" # Note: tf.train.input_producer automatically adds QueueRunner \n", | |
" # to graph and to QUEUE_RUNNERS collection\n", | |
" \n", | |
" # Queue #2: FIFO with capacity of 10\n", | |
" q2 = tf.FIFOQueue(capacity=10, dtypes=[tf.float32, tf.float32],\n", | |
" shapes=[(3,), (1,)])\n", | |
" # enqueue operation: fed by Queue #1\n", | |
" q2_enq = q2.enqueue(vals=[q1_x, q1_y])\n", | |
" # dequeue operation: pull 3 rows at a time\n", | |
" q2_deq_x, q2_deq_y = q2.dequeue_many(n=3)\n", | |
" # handle for close operation\n", | |
" q2_close = q2.close()\n", | |
" # QueueRunner for Queue #2: keeps the FIFO fed\n", | |
" q2_qrun = tf.train.QueueRunner(queue=q2, enqueue_ops=[q2_enq], \n", | |
" close_op=q2_close)\n", | |
" # Note: tf.QueueRunner does not automatically add itself to the\n", | |
" # QUEUE_RUNNERS collection, so we manually add it here:\n", | |
" tf.train.add_queue_runner(q2_qrun)\n", | |
"\n", | |
" # Output from Queue #2 is 3 rows [shape=(3,3) and shape=(3,1)]. Sum to 1 row:\n", | |
" sum_col_x = tf.reduce_sum(q2_deq_x, axis=0)\n", | |
" sum_col_y = tf.reduce_sum(q2_deq_y, axis=0)\n", | |
" # now: sum_col_x shape=(1,3) and sum_col_y shape=(1,1)\n", | |
" \n", | |
" # Queue #3: Build batches of 5 examples\n", | |
" batch_x, batch_y = tf.train.batch(tensors=[sum_col_x, sum_col_y],\n", | |
" batch_size=5)\n", | |
" # batch shapes are (5,3) and (5,1), respectively\n", | |
"\n", | |
" # sum each row, using a matmul for the fun of it\n", | |
" W = tf.Variable([[1], [1], [1]], dtype=tf.float32)\n", | |
" row_sums = tf.matmul(batch_x, W) + batch_y\n", | |
" \n", | |
" # sum the column\n", | |
" batch_sum = tf.reduce_sum(row_sums, axis=0)\n", | |
" \n", | |
" # add to running total\n", | |
" total = tf.Variable([0], dtype=tf.float32)\n", | |
" update_total = total.assign(total + batch_sum)\n", | |
" \n", | |
" # epochs variable in Queue #1 is a *local* variable, add\n", | |
" # local variables to the initializer\n", | |
" init = tf.group(tf.global_variables_initializer(),\n", | |
" tf.local_variables_initializer())\n", | |
" \n", | |
" with tf.Session() as sess:\n", | |
" init.run()\n", | |
" x.load(x_in)\n", | |
" y.load(y_in)\n", | |
" coord = tf.train.Coordinator()\n", | |
" threads = tf.train.start_queue_runners(sess=sess, coord=coord) \n", | |
" try:\n", | |
" while True:\n", | |
" tot = sess.run([update_total])\n", | |
" except tf.errors.OutOfRangeError as e:\n", | |
" coord.request_stop(e)\n", | |
" finally:\n", | |
" coord.request_stop()\n", | |
" coord.join(threads) \n", | |
" print('\\nSum of all x and y elements equals the # of rows: {}'.\n", | |
" format(tot[0][0]))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### The sum is 90.0, but it should be 100.0. Why?\n", | |
"\n", | |
"Queue \\#2 dequeues 3 examples at a time. When a queue has no more examples available to enqueue, it is closed by its QueueRunner. Queue \\#2 can continue to be emptied (i.e. dequeue operations) after it is closed. But when less than 3 examples are available to dequeue, an OutOfRange exception is raised. This exception is caught by the QueueRunner for Queue \\#3, which will then close Queue \\#3. \n", | |
"\n", | |
"So, Queue \\#2 can only deliver 99 examples, not the full 100: we lost 1 input example.\n", | |
"\n", | |
"We sum the rows from Queue \\#2 (tf.reduce_sum): 3 examples are summed to 1. This gives us 33 summed examples (99 divided by 3) to feed Queue \\#3. Queue \\#3 builds batches of 5 summed examples. So Queue \\#3 can only build 6 full batches of 5 examples (30 of the 33 examples). So we lose 3 summed examples from Queue \\#2: equivalent to losing 9 more input examples from our original input.\n", | |
"\n", | |
"What if we don't want to lose training examples? If we are fine with having partial batches, we use the dequeue_up_to() method and train.batch() allow smaller final batch argument:\n", | |
"\n", | |
"Below is the modified code using these options. We now get the correct sum of 100.0." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"\n", | |
"Sum of all x and y elements equals the # of rows: 100.0\n" | |
] | |
} | |
], | |
"source": [ | |
"g = tf.Graph()\n", | |
"with g.as_default():\n", | |
" with tf.variable_scope('inputs', initializer=tf.zeros_initializer()):\n", | |
" x = tf.get_variable('x', shape=(100, 3), dtype=tf.float32)\n", | |
" y = tf.get_variable('y', shape=(100, 1), dtype=tf.float32)\n", | |
" \n", | |
" # Queue #1: Pull examples 1 row at a time\n", | |
" q1_x = tf.train.input_producer(x, num_epochs=1, shuffle=False).dequeue()\n", | |
" q1_y = tf.train.input_producer(y, num_epochs=1, shuffle=False).dequeue()\n", | |
" # Note: tf.train.input_producer automatically adds QueueRunners \n", | |
" # to graph and to QUEUE_RUNNERS collection\n", | |
" \n", | |
" # Queue #2: FIFO with capacity of 10\n", | |
" q2 = tf.FIFOQueue(capacity=10, dtypes=[tf.float32, tf.float32],\n", | |
" shapes=[(3,), (1,)])\n", | |
" # enqueue operation: fed by Queue #1\n", | |
" q2_enq = q2.enqueue(vals=[q1_x, q1_y])\n", | |
" # dequeue operation: pull 3 rows at a time\n", | |
" q2_deq_x, q2_deq_y = q2.dequeue_up_to(n=3) # ***Change #1***\n", | |
" # handle for close operation\n", | |
" q2_close = q2.close()\n", | |
"\n", | |
" # QueueRunner for Queue #2: keeps the FIFO fed\n", | |
" q2_qrun = tf.train.QueueRunner(queue=q2, enqueue_ops=[q2_enq], \n", | |
" close_op=q2_close)\n", | |
" # Note: tf.QueueRunner does not automatically add itself to the\n", | |
" # QUEUE_RUNNERS collection, so we manually add it here:\n", | |
" tf.train.add_queue_runner(q2_qrun)\n", | |
"\n", | |
" # Output from Queue #2 is 3 rows. Sum to 1 row:\n", | |
" sum_col_x = tf.reduce_sum(q2_deq_x, axis=0)\n", | |
" sum_col_y = tf.reduce_sum(q2_deq_y, axis=0)\n", | |
" \n", | |
" # Queue #3: Build batches of 5 examples\n", | |
" # ***Change #2***\n", | |
" batch_x, batch_y = tf.train.batch(tensors=[sum_col_x, sum_col_y],\n", | |
" batch_size=5,\n", | |
" allow_smaller_final_batch=True) \n", | |
"\n", | |
" # sum each row, using matmul for the fun of it\n", | |
" W = tf.Variable([[1], [1], [1]], dtype=tf.float32)\n", | |
" row_sums = tf.matmul(batch_x, W) + batch_y\n", | |
" \n", | |
" # sum the column\n", | |
" batch_sum = tf.reduce_sum(row_sums, axis=0)\n", | |
" \n", | |
" # add to running total\n", | |
" total = tf.Variable([0], dtype=tf.float32)\n", | |
" update_total = total.assign(total + batch_sum)\n", | |
" \n", | |
" # epochs variable in Queue #1 is a *local* variable\n", | |
" init = tf.group(tf.global_variables_initializer(),\n", | |
" tf.local_variables_initializer())\n", | |
" \n", | |
" with tf.Session() as sess:\n", | |
" init.run()\n", | |
" x.load(x_in)\n", | |
" y.load(y_in)\n", | |
" coord = tf.train.Coordinator()\n", | |
" threads = tf.train.start_queue_runners(sess=sess, coord=coord) \n", | |
" try:\n", | |
" while not coord.should_stop():\n", | |
" tot = sess.run([update_total])\n", | |
" except tf.errors.OutOfRangeError as e:\n", | |
" coord.request_stop(e)\n", | |
" finally:\n", | |
" coord.request_stop()\n", | |
" coord.join(threads) \n", | |
" print('\\nSum of all x and y elements equals the # of rows: {}'.\n", | |
" format(tot[0][0]))" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.5.2" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment