Last active
May 2, 2019 21:36
-
-
Save pat-coady/b3e322c2431b7550075138177b70c4f5 to your computer and use it in GitHub Desktop.
TensorFlow QueueRunner Example 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# TensorFlow QueueRunner Example \\#2\n", | |
"\n", | |
"*Patrick Coady (pcoady@alum.mit.edu)*\n", | |
"\n", | |
"This example starts with a list of file names. Again, this is a contrived example that cascades several queues. " | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Queue Pipeline\n", | |
"\n", | |
"![Queue Pipeline](https://learningai.io//assets/tf-queue-runner/queue_arch_from_file.png)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"import tensorflow as tf\n", | |
"import numpy as np" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"# Build 10 toy data files ('/data' directory must exist)\n", | |
"x_in = np.arange(3*100, dtype=np.float32).reshape((-1, 3))\n", | |
"y_in = np.reshape(-np.sum(x_in, axis=1) + 1, (-1, 1))\n", | |
"for i in range(10):\n", | |
" with open('data/file{}.csv'.format(i), mode='w') as f:\n", | |
" for j in range(10):\n", | |
" idx = i+10*j\n", | |
" f.write('{},{},{},{}\\n'.format(x_in[j,0], x_in[j,1], \n", | |
" x_in[j,2], y_in[j,0]))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": false, | |
"scrolled": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"\n", | |
"Sum of all x and y elements equals the # of rows: 90.0\n" | |
] | |
} | |
], | |
"source": [ | |
"g = tf.Graph()\n", | |
"with g.as_default():\n", | |
" # get list of file names in the data directory\n", | |
" filenames = tf.train.match_filenames_once('data/*')\n", | |
" # Queue #0: Queue of file names (strings)\n", | |
" filename_q = tf.train.string_input_producer(filenames, num_epochs=1)\n", | |
" \n", | |
" # Queue #1: Line from .csv file, fed by queue of file names\n", | |
" file_reader = tf.TextLineReader()\n", | |
" key, value = file_reader.read(filename_q)\n", | |
" \n", | |
" # decode csv file line-by-line\n", | |
" x1, x2, x3, y = tf.decode_csv(value, [[0.0]]*4)\n", | |
" q1_x = tf.stack([x1, x2, x3])\n", | |
" q1_y = tf.reshape(y, (1,))\n", | |
"\n", | |
" # Queue #2: FIFO with capacity of 10\n", | |
" q2 = tf.FIFOQueue(capacity=10, dtypes=[tf.float32, tf.float32],\n", | |
" shapes=[(3,), (1,)])\n", | |
" # enqueue operation: fed by Queue #1\n", | |
" q2_enq = q2.enqueue(vals=[q1_x, q1_y])\n", | |
" # dequeue operation: pull 3 rows at a time\n", | |
" q2_deq_x, q2_deq_y = q2.dequeue_many(n=3)\n", | |
" # handle for close operation\n", | |
" q2_close = q2.close()\n", | |
" # QueueRunner for Queue #2: keeps the FIFO fed\n", | |
" q2_qrun = tf.train.QueueRunner(queue=q2, enqueue_ops=[q2_enq], \n", | |
" close_op=q2_close)\n", | |
" # Note: tf.QueueRunner does not automatically add itself to the\n", | |
" # QUEUE_RUNNERS collection, so we manually add it here:\n", | |
" tf.train.add_queue_runner(q2_qrun)\n", | |
"\n", | |
" # Output from Queue #2 is 3 rows [shape=(3,3) and shape=(3,1)]. Sum to 1 row:\n", | |
" sum_col_x = tf.reduce_sum(q2_deq_x, axis=0)\n", | |
" sum_col_y = tf.reduce_sum(q2_deq_y, axis=0)\n", | |
" # now: sum_col_x shape=(1,3) and sum_col_y shape=(1,1)\n", | |
" \n", | |
" # Queue #3: Build batches of 5 examples\n", | |
" batch_x, batch_y = tf.train.batch(tensors=[sum_col_x, sum_col_y],\n", | |
" batch_size=5)\n", | |
" # batch shapes are (5,3) and (5,1), respectively\n", | |
"\n", | |
" # sum each row, using a matmul for the fun of it\n", | |
" W = tf.Variable([[1], [1], [1]], dtype=tf.float32)\n", | |
" row_sums = tf.matmul(batch_x, W) + batch_y\n", | |
" \n", | |
" # sum the column\n", | |
" batch_sum = tf.reduce_sum(row_sums, axis=0)\n", | |
" \n", | |
" # add to running total\n", | |
" total = tf.Variable([0], dtype=tf.float32)\n", | |
" update_total = total.assign(total + batch_sum)\n", | |
" \n", | |
" # epochs variable in Queue #1 is a *local* variable, add\n", | |
" # local variables to the initializer\n", | |
" init = tf.group(tf.global_variables_initializer(),\n", | |
" tf.local_variables_initializer())\n", | |
" \n", | |
" with tf.Session() as sess:\n", | |
" init.run()\n", | |
" coord = tf.train.Coordinator()\n", | |
" threads = tf.train.start_queue_runners(sess=sess, coord=coord) \n", | |
" try:\n", | |
" while not coord.should_stop():\n", | |
" tot = sess.run([update_total])\n", | |
" except tf.errors.OutOfRangeError as e:\n", | |
" coord.request_stop(e)\n", | |
" finally:\n", | |
" coord.request_stop()\n", | |
" coord.join(threads) \n", | |
" print('\\nSum of all x and y elements equals the # of rows: {}'.\n", | |
" format(tot[0][0]))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"collapsed": true | |
}, | |
"source": [ | |
"**See Example \\#1 for an explanation of why the answer here is 90.0 and not 100.0 as you might expect. We can fix this in the same way we did in example \\#1.**" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.5.2" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment