Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
TensorFlow QueueRunner Example 2
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# TensorFlow QueueRunner Example \\#2\n",
"\n",
"*Patrick Coady (pcoady@alum.mit.edu)*\n",
"\n",
"This example starts with a list of file names. Again, this is a contrived example that cascades several queues. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Queue Pipeline\n",
"\n",
"![Queue Pipeline](https://learningai.io//assets/tf-queue-runner/queue_arch_from_file.png)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Build 10 toy data files ('/data' directory must exist)\n",
"x_in = np.arange(3*100, dtype=np.float32).reshape((-1, 3))\n",
"y_in = np.reshape(-np.sum(x_in, axis=1) + 1, (-1, 1))\n",
"for i in range(10):\n",
" with open('data/file{}.csv'.format(i), mode='w') as f:\n",
" for j in range(10):\n",
" idx = i+10*j\n",
" f.write('{},{},{},{}\\n'.format(x_in[j,0], x_in[j,1], \n",
" x_in[j,2], y_in[j,0]))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Sum of all x and y elements equals the # of rows: 90.0\n"
]
}
],
"source": [
"g = tf.Graph()\n",
"with g.as_default():\n",
" # get list of file names in the data directory\n",
" filenames = tf.train.match_filenames_once('data/*')\n",
" # Queue #0: Queue of file names (strings)\n",
" filename_q = tf.train.string_input_producer(filenames, num_epochs=1)\n",
" \n",
" # Queue #1: Line from .csv file, fed by queue of file names\n",
" file_reader = tf.TextLineReader()\n",
" key, value = file_reader.read(filename_q)\n",
" \n",
" # decode csv file line-by-line\n",
" x1, x2, x3, y = tf.decode_csv(value, [[0.0]]*4)\n",
" q1_x = tf.stack([x1, x2, x3])\n",
" q1_y = tf.reshape(y, (1,))\n",
"\n",
" # Queue #2: FIFO with capacity of 10\n",
" q2 = tf.FIFOQueue(capacity=10, dtypes=[tf.float32, tf.float32],\n",
" shapes=[(3,), (1,)])\n",
" # enqueue operation: fed by Queue #1\n",
" q2_enq = q2.enqueue(vals=[q1_x, q1_y])\n",
" # dequeue operation: pull 3 rows at a time\n",
" q2_deq_x, q2_deq_y = q2.dequeue_many(n=3)\n",
" # handle for close operation\n",
" q2_close = q2.close()\n",
" # QueueRunner for Queue #2: keeps the FIFO fed\n",
" q2_qrun = tf.train.QueueRunner(queue=q2, enqueue_ops=[q2_enq], \n",
" close_op=q2_close)\n",
" # Note: tf.QueueRunner does not automatically add itself to the\n",
" # QUEUE_RUNNERS collection, so we manually add it here:\n",
" tf.train.add_queue_runner(q2_qrun)\n",
"\n",
" # Output from Queue #2 is 3 rows [shape=(3,3) and shape=(3,1)]. Sum to 1 row:\n",
" sum_col_x = tf.reduce_sum(q2_deq_x, axis=0)\n",
" sum_col_y = tf.reduce_sum(q2_deq_y, axis=0)\n",
" # now: sum_col_x shape=(1,3) and sum_col_y shape=(1,1)\n",
" \n",
" # Queue #3: Build batches of 5 examples\n",
" batch_x, batch_y = tf.train.batch(tensors=[sum_col_x, sum_col_y],\n",
" batch_size=5)\n",
" # batch shapes are (5,3) and (5,1), respectively\n",
"\n",
" # sum each row, using a matmul for the fun of it\n",
" W = tf.Variable([[1], [1], [1]], dtype=tf.float32)\n",
" row_sums = tf.matmul(batch_x, W) + batch_y\n",
" \n",
" # sum the column\n",
" batch_sum = tf.reduce_sum(row_sums, axis=0)\n",
" \n",
" # add to running total\n",
" total = tf.Variable([0], dtype=tf.float32)\n",
" update_total = total.assign(total + batch_sum)\n",
" \n",
" # epochs variable in Queue #1 is a *local* variable, add\n",
" # local variables to the initializer\n",
" init = tf.group(tf.global_variables_initializer(),\n",
" tf.local_variables_initializer())\n",
" \n",
" with tf.Session() as sess:\n",
" init.run()\n",
" coord = tf.train.Coordinator()\n",
" threads = tf.train.start_queue_runners(sess=sess, coord=coord) \n",
" try:\n",
" while not coord.should_stop():\n",
" tot = sess.run([update_total])\n",
" except tf.errors.OutOfRangeError as e:\n",
" coord.request_stop(e)\n",
" finally:\n",
" coord.request_stop()\n",
" coord.join(threads) \n",
" print('\\nSum of all x and y elements equals the # of rows: {}'.\n",
" format(tot[0][0]))"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"**See Example \\#1 for an explanation of why the answer here is 90.0 and not 100.0 as you might expect. We can fix this in the same way we did in example \\#1.**"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment