Skip to content

Instantly share code, notes, and snippets.

@pat-coady
Last active May 2, 2019 21:36
Show Gist options
  • Save pat-coady/567d30961616f8ec868421eefe6c4a1a to your computer and use it in GitHub Desktop.
Save pat-coady/567d30961616f8ec868421eefe6c4a1a to your computer and use it in GitHub Desktop.
TensorFlow QueueRunner Example 1
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# TensorFlow QueueRunner Example \\#1\n",
"\n",
"*Patrick Coady (pcoady@alum.mit.edu)*\n",
"\n",
"This example cascades queues to build a data pipeline. This example is a bit contrived to work 3 queues into one graph.\n",
"\n",
"This example notebook feeds the graph from a NumPy array. The next example notebook will feed the pipeline directly from files."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Queue Pipeline\n",
"\n",
"![Queue Pipeline](https://learningai.io//assets/tf-queue-runner/queue_arch.png)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"x:\n",
"[[ 0. 1. 2.]\n",
" [ 3. 4. 5.]\n",
" [ 6. 7. 8.]]\n",
" ...\n",
"y:\n",
"[[ -2.]\n",
" [-11.]\n",
" [-20.]]\n",
" ...\n",
"\n",
"Sum of all x and y elements equals the # of rows: 100.0\n"
]
}
],
"source": [
"# Build toy data set\n",
"x_in = np.arange(3*100, dtype=np.float32).reshape((-1, 3))\n",
"y_in = np.reshape(-np.sum(x_in, axis=1) + 1, (-1, 1))\n",
"# y is negative of x-columns plus 1\n",
"print('x:')\n",
"print(x_in[:3,:])\n",
"print(' ...')\n",
"print('y:')\n",
"print(y_in[:3,:])\n",
"print(' ...')\n",
"sum_all = np.sum(x_in)+np.sum(y_in)\n",
"print('\\nSum of all x and y elements equals the # of rows: {}'.format(sum_all))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Sum of all x and y elements equals the # of rows: 90.0\n"
]
}
],
"source": [
"g = tf.Graph()\n",
"with g.as_default():\n",
" with tf.variable_scope('inputs', initializer=tf.zeros_initializer()):\n",
" # we'll load these variables once session is launched\n",
" x = tf.get_variable('x', shape=(100, 3), dtype=tf.float32)\n",
" y = tf.get_variable('y', shape=(100, 1), dtype=tf.float32)\n",
" \n",
" # Queue #1: Pull examples 1 row at a time\n",
" q1_x = tf.train.input_producer(x, num_epochs=1, shuffle=False).dequeue()\n",
" q1_y = tf.train.input_producer(y, num_epochs=1, shuffle=False).dequeue()\n",
" # Note: tf.train.input_producer automatically adds QueueRunner \n",
" # to graph and to QUEUE_RUNNERS collection\n",
" \n",
" # Queue #2: FIFO with capacity of 10\n",
" q2 = tf.FIFOQueue(capacity=10, dtypes=[tf.float32, tf.float32],\n",
" shapes=[(3,), (1,)])\n",
" # enqueue operation: fed by Queue #1\n",
" q2_enq = q2.enqueue(vals=[q1_x, q1_y])\n",
" # dequeue operation: pull 3 rows at a time\n",
" q2_deq_x, q2_deq_y = q2.dequeue_many(n=3)\n",
" # handle for close operation\n",
" q2_close = q2.close()\n",
" # QueueRunner for Queue #2: keeps the FIFO fed\n",
" q2_qrun = tf.train.QueueRunner(queue=q2, enqueue_ops=[q2_enq], \n",
" close_op=q2_close)\n",
" # Note: tf.QueueRunner does not automatically add itself to the\n",
" # QUEUE_RUNNERS collection, so we manually add it here:\n",
" tf.train.add_queue_runner(q2_qrun)\n",
"\n",
" # Output from Queue #2 is 3 rows [shape=(3,3) and shape=(3,1)]. Sum to 1 row:\n",
" sum_col_x = tf.reduce_sum(q2_deq_x, axis=0)\n",
" sum_col_y = tf.reduce_sum(q2_deq_y, axis=0)\n",
" # now: sum_col_x shape=(1,3) and sum_col_y shape=(1,1)\n",
" \n",
" # Queue #3: Build batches of 5 examples\n",
" batch_x, batch_y = tf.train.batch(tensors=[sum_col_x, sum_col_y],\n",
" batch_size=5)\n",
" # batch shapes are (5,3) and (5,1), respectively\n",
"\n",
" # sum each row, using a matmul for the fun of it\n",
" W = tf.Variable([[1], [1], [1]], dtype=tf.float32)\n",
" row_sums = tf.matmul(batch_x, W) + batch_y\n",
" \n",
" # sum the column\n",
" batch_sum = tf.reduce_sum(row_sums, axis=0)\n",
" \n",
" # add to running total\n",
" total = tf.Variable([0], dtype=tf.float32)\n",
" update_total = total.assign(total + batch_sum)\n",
" \n",
" # epochs variable in Queue #1 is a *local* variable, add\n",
" # local variables to the initializer\n",
" init = tf.group(tf.global_variables_initializer(),\n",
" tf.local_variables_initializer())\n",
" \n",
" with tf.Session() as sess:\n",
" init.run()\n",
" x.load(x_in)\n",
" y.load(y_in)\n",
" coord = tf.train.Coordinator()\n",
" threads = tf.train.start_queue_runners(sess=sess, coord=coord) \n",
" try:\n",
" while True:\n",
" tot = sess.run([update_total])\n",
" except tf.errors.OutOfRangeError as e:\n",
" coord.request_stop(e)\n",
" finally:\n",
" coord.request_stop()\n",
" coord.join(threads) \n",
" print('\\nSum of all x and y elements equals the # of rows: {}'.\n",
" format(tot[0][0]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### The sum is 90.0, but it should be 100.0. Why?\n",
"\n",
"Queue \\#2 dequeues 3 examples at a time. When a queue has no more examples available to enqueue, it is closed by its QueueRunner. Queue \\#2 can continue to be emptied (i.e. dequeue operations) after it is closed. But when less than 3 examples are available to dequeue, an OutOfRange exception is raised. This exception is caught by the QueueRunner for Queue \\#3, which will then close Queue \\#3. \n",
"\n",
"So, Queue \\#2 can only deliver 99 examples, not the full 100: we lost 1 input example.\n",
"\n",
"We sum the rows from Queue \\#2 (tf.reduce_sum): 3 examples are summed to 1. This gives us 33 summed examples (99 divided by 3) to feed Queue \\#3. Queue \\#3 builds batches of 5 summed examples. So Queue \\#3 can only build 6 full batches of 5 examples (30 of the 33 examples). So we lose 3 summed examples from Queue \\#2: equivalent to losing 9 more input examples from our original input.\n",
"\n",
"What if we don't want to lose training examples? If we are fine with having partial batches, we use the dequeue_up_to() method and train.batch() allow smaller final batch argument:\n",
"\n",
"Below is the modified code using these options. We now get the correct sum of 100.0."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Sum of all x and y elements equals the # of rows: 100.0\n"
]
}
],
"source": [
"g = tf.Graph()\n",
"with g.as_default():\n",
" with tf.variable_scope('inputs', initializer=tf.zeros_initializer()):\n",
" x = tf.get_variable('x', shape=(100, 3), dtype=tf.float32)\n",
" y = tf.get_variable('y', shape=(100, 1), dtype=tf.float32)\n",
" \n",
" # Queue #1: Pull examples 1 row at a time\n",
" q1_x = tf.train.input_producer(x, num_epochs=1, shuffle=False).dequeue()\n",
" q1_y = tf.train.input_producer(y, num_epochs=1, shuffle=False).dequeue()\n",
" # Note: tf.train.input_producer automatically adds QueueRunners \n",
" # to graph and to QUEUE_RUNNERS collection\n",
" \n",
" # Queue #2: FIFO with capacity of 10\n",
" q2 = tf.FIFOQueue(capacity=10, dtypes=[tf.float32, tf.float32],\n",
" shapes=[(3,), (1,)])\n",
" # enqueue operation: fed by Queue #1\n",
" q2_enq = q2.enqueue(vals=[q1_x, q1_y])\n",
" # dequeue operation: pull 3 rows at a time\n",
" q2_deq_x, q2_deq_y = q2.dequeue_up_to(n=3) # ***Change #1***\n",
" # handle for close operation\n",
" q2_close = q2.close()\n",
"\n",
" # QueueRunner for Queue #2: keeps the FIFO fed\n",
" q2_qrun = tf.train.QueueRunner(queue=q2, enqueue_ops=[q2_enq], \n",
" close_op=q2_close)\n",
" # Note: tf.QueueRunner does not automatically add itself to the\n",
" # QUEUE_RUNNERS collection, so we manually add it here:\n",
" tf.train.add_queue_runner(q2_qrun)\n",
"\n",
" # Output from Queue #2 is 3 rows. Sum to 1 row:\n",
" sum_col_x = tf.reduce_sum(q2_deq_x, axis=0)\n",
" sum_col_y = tf.reduce_sum(q2_deq_y, axis=0)\n",
" \n",
" # Queue #3: Build batches of 5 examples\n",
" # ***Change #2***\n",
" batch_x, batch_y = tf.train.batch(tensors=[sum_col_x, sum_col_y],\n",
" batch_size=5,\n",
" allow_smaller_final_batch=True) \n",
"\n",
" # sum each row, using matmul for the fun of it\n",
" W = tf.Variable([[1], [1], [1]], dtype=tf.float32)\n",
" row_sums = tf.matmul(batch_x, W) + batch_y\n",
" \n",
" # sum the column\n",
" batch_sum = tf.reduce_sum(row_sums, axis=0)\n",
" \n",
" # add to running total\n",
" total = tf.Variable([0], dtype=tf.float32)\n",
" update_total = total.assign(total + batch_sum)\n",
" \n",
" # epochs variable in Queue #1 is a *local* variable\n",
" init = tf.group(tf.global_variables_initializer(),\n",
" tf.local_variables_initializer())\n",
" \n",
" with tf.Session() as sess:\n",
" init.run()\n",
" x.load(x_in)\n",
" y.load(y_in)\n",
" coord = tf.train.Coordinator()\n",
" threads = tf.train.start_queue_runners(sess=sess, coord=coord) \n",
" try:\n",
" while not coord.should_stop():\n",
" tot = sess.run([update_total])\n",
" except tf.errors.OutOfRangeError as e:\n",
" coord.request_stop(e)\n",
" finally:\n",
" coord.request_stop()\n",
" coord.join(threads) \n",
" print('\\nSum of all x and y elements equals the # of rows: {}'.\n",
" format(tot[0][0]))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment