Skip to content

Instantly share code, notes, and snippets.

@le-dawg
Created February 27, 2020 16:30
Show Gist options
  • Save le-dawg/7549af80516379f6c58bc8d2381486a7 to your computer and use it in GitHub Desktop.
Save le-dawg/7549af80516379f6c58bc8d2381486a7 to your computer and use it in GitHub Desktop.
Time Series input function for tensorflow estimator
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"3.6.8 (default, Aug 20 2019, 17:12:48) \n",
"[GCC 8.3.0]\n"
]
}
],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"from sklearn.metrics import mean_squared_error\n",
"from pandas import read_csv\n",
"from keras.utils import to_categorical\n",
"import zipfile\n",
"import pdb\n",
"import os\n",
"import tensorflow as tf\n",
"print(tf.__version__)\n",
"from matplotlib import pyplot as plt\n",
"%matplotlib inline\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"train\n",
"(7352, 128, 9) (7352, 1)\n",
"test\n",
"(2947, 128, 9) (2947, 1)\n",
"(7352, 128, 9) (7352, 6) (2947, 128, 9) (2947, 6)\n"
]
}
],
"source": [
"# load a single file as a numpy array\n",
"def load_file(filepath):\n",
" dataframe = pd.read_csv(filepath, header=None, delim_whitespace=True)\n",
" return dataframe.values\n",
"\n",
"# load a list of files into a 3D array of [samples, timesteps, features]\n",
"def load_group(filenames, prefix='body_'):\n",
" loaded = list()\n",
" for name in filenames:\n",
" data = load_file(prefix + name)\n",
" loaded.append(data)\n",
"# pdb.set_trace()\n",
" # stack group so that features are the 3rd dimension\n",
" loaded = np.dstack(loaded)\n",
" return loaded\n",
"\n",
"# load a dataset group, such as train or test\n",
"def load_dataset_group(group, prefix=''):\n",
" filepath = '/storage/UCIHAR/UCIHARDATASET/'+group+'/InertialSignals/'\n",
" # load all 9 files as a single array\n",
" filenames = list()\n",
" # total acceleration\n",
" filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']\n",
" # body acceleration\n",
" filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']\n",
" # body gyroscope\n",
" filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']\n",
" # load input data\n",
" X = load_group(filenames, filepath)\n",
" # load class output\n",
" y = load_file(prefix + group + '/y_'+group+'.txt')\n",
" return X, y\n",
"\n",
"# load the dataset, returns train and test X and y elements\n",
"def load_dataset(prefix=''):\n",
" # load all train\n",
" trainX, trainy = load_dataset_group('train', prefix + '/storage/UCIHAR/UCIHARDATASET/')\n",
" print('train'), print(trainX.shape, trainy.shape)\n",
" # load all test\n",
" testX, testy = load_dataset_group('test', prefix + '/storage/UCIHAR/UCIHARDATASET/')\n",
" print('test'),print(testX.shape, testy.shape)\n",
" # zero-offset class values\n",
" trainy = trainy - 1\n",
" testy = testy - 1\n",
" # one hot encode y\n",
" trainy = to_categorical(trainy)\n",
" testy = to_categorical(testy)\n",
" print(trainX.shape, trainy.shape, testX.shape, testy.shape)\n",
" return trainX, trainy, testX, testy\n",
"\n",
"trnX, trnY, tstX, tstY = load_dataset()\n",
"# train_x, train_y = tsparser(\"/storage/UEAdata/Heartbeat_TRAIN.ts\", replace_missing_vals_with='NaN')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# The number of classes.\n",
"NUM_CLASSES = 6\n",
"RANDOM_SEED = 1337\n",
"LOG_DIR = '/storage/UCIHAR/AWlog'\n",
"\n",
"# A `Head` instance defines the loss function and metrics for `Estimators`.\n",
"head = tf.estimator.MultiClassHead(NUM_CLASSES)\n",
"\n",
"# Some `Estimators` use feature columns for understanding their input features.\n",
"feature_columns = [tf.feature_column.numeric_column('total_acc_x', shape=[128, 9]),tf.feature_column.numeric_column('total_acc_y', shape=[128, 9]),tf.feature_column.numeric_column('total_acc_z', shape=[128, 9]),tf.feature_column.numeric_column('body_acc_x', shape=[128, 9]),tf.feature_column.numeric_column('body_acc_y', shape=[128, 9]),tf.feature_column.numeric_column('body_acc_z', shape=[128, 9]),tf.feature_column.numeric_column('body_gyro_x', shape=[128, 9]),tf.feature_column.numeric_column('body_gyro_y', shape=[128, 9]),tf.feature_column.numeric_column('body_gyro_z', shape=[128, 9])]\n",
"\n",
"def make_config(experiment_name):\n",
" # Estimator configuration.\n",
" return tf.estimator.RunConfig(\n",
" save_checkpoints_steps=100,\n",
" save_summary_steps=100,\n",
" tf_random_seed=RANDOM_SEED,\n",
" model_dir=os.path.join(LOG_DIR, experiment_name))"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# # %load_ext tensorboard\n",
"trnXl = trnX.tolist()\n",
"tstXl = tstX.tolist()\n",
"tstYl = tstY.tolist()\n",
"trnYl = trnY.tolist()\n",
"\n",
"trndataset = tf.data.Dataset.from_tensor_slices((trnXl, trnYl))\n",
"tstdataset = tf.data.Dataset.from_tensor_slices((tstXl, tstYl))\n",
"\n",
"# def input_fn(partition, training, batch_size):\n",
"# \"\"\"Generate an input_fn for the Estimator.\"\"\"\n",
"# def _input_fn():\n",
"# if partition == \"train\":\n",
"# dst = trndataset\n",
"# # dst = tf.data.Dataset.from_generator(\n",
"# # generator(x_train, y_train), (tf.float32, tf.int32), ((28, 28), ()))\n",
"# elif partition == \"predict\":\n",
"# dst = tstdataset\n",
"# # dst = tf.data.Dataset.from_generator(\n",
"# # generator(x_test[:10], y_test[:10]), (tf.float32, tf.int32), ((28,28), ()))\n",
"# else:\n",
"# dst = tstdataset\n",
"\n",
"# # We call repeat after shuffling, rather than before, to prevent separate\n",
"# # epochs from blending together.\n",
"# if training:\n",
"# dst = dst.shuffle(10 * batch_size, seed=RANDOM_SEED).repeat()\n",
"\n",
"# # dst = dst.map(preprocess_image).batch(batch_size)\n",
"# # iterator = dst.make_one_shot_iterator()\n",
"# # features, labels = iterator.get_next()\n",
"# # return features, labels\n",
"# return dst\n",
"# return _input_fn\n",
"\n",
"def _input_fn(partition):\n",
" if partition == \"train\":\n",
" dst = trndataset\n",
"# dst = tf.data.Dataset.from_generator(\n",
"# generator(x_train, y_train), (tf.float32, tf.int32), ((28, 28), ()))\n",
" elif partition == \"predict\":\n",
" dst = tstdataset\n",
"# dst = tf.data.Dataset.from_generator(\n",
"# generator(x_test[:10], y_test[:10]), (tf.float32, tf.int32), ((28,28), ()))\n",
" else:\n",
" dst = tstdataset\n",
"\n",
" # We call repeat after shuffling, rather than before, to prevent separate\n",
" # epochs from blending together.\n",
"# if training:\n",
"# dst = dst.shuffle(10 * batch_size, seed=RANDOM_SEED).repeat()\n",
"\n",
"# dst = dst.map(preprocess_image).batch(batch_size)\n",
"# iterator = dst.make_one_shot_iterator()\n",
"# features, labels = iterator.get_next()\n",
"# return features, labels\n",
" return dst\n",
"\n",
"# dataset = tf.data.Dataset.from_tensor_slices(trnXl)\n",
"# for element in dataset: \n",
"# i = 1\n",
"# print(element)\n",
"# i = i+1\n",
"# if i==2:\n",
"# break"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# # !pip install tensorflow_datasets\n",
"# import tensorflow_datasets as tfds\n",
"\n",
"# def in2put_fn():\n",
"# split = tfds.Split.TRAIN\n",
"# dataset = tfds.load('iris', split=split, as_supervised=True)\n",
"# dataset = dataset.map(lambda features, labels: ({'dense_input':features}, labels))\n",
"# dataset = dataset.batch(32).repeat()\n",
"# return dataset\n",
"\n",
"# for features_batch, labels_batch in in2put_fn().take(1):\n",
"# print(features_batch)\n",
"# print(labels_batch)\n",
"\n",
"\n",
"# for features_batch, labels_batch in _input_fn(partition='train').take(1):\n",
"# print(features_batch)\n",
"# print(labels_batch)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Using config: {'_model_dir': '/storage/UCIHAR/AWlog/linear', '_tf_random_seed': 1337, '_save_summary_steps': 100, '_save_checkpoints_steps': 100, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true\n",
"graph_options {\n",
" rewrite_options {\n",
" meta_optimizer_iterations: ONE\n",
" }\n",
"}\n",
", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fec723b9128>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}\n"
]
},
{
"ename": "TypeError",
"evalue": "`input_fn` must be callable, given: <DatasetV1Adapter shapes: ((128, 9), (6,)), types: (tf.float32, tf.float32)>",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-15-879c17bdb34a>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 15\u001b[0m train_spec=tf.estimator.TrainSpec(\n\u001b[1;32m 16\u001b[0m \u001b[0minput_fn\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0m_input_fn\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"train\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 17\u001b[0;31m max_steps=TRAIN_STEPS),\n\u001b[0m\u001b[1;32m 18\u001b[0m eval_spec=tf.estimator.EvalSpec(\n\u001b[1;32m 19\u001b[0m \u001b[0minput_fn\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0m_input_fn\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"test\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/local/lib/python3.6/dist-packages/tensorflow_estimator/python/estimator/training.py\u001b[0m in \u001b[0;36m__new__\u001b[0;34m(cls, input_fn, max_steps, hooks)\u001b[0m\n\u001b[1;32m 156\u001b[0m \"\"\"\n\u001b[1;32m 157\u001b[0m \u001b[0;31m# Validate input_fn.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 158\u001b[0;31m \u001b[0m_validate_input_fn\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0minput_fn\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 159\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 160\u001b[0m \u001b[0;31m# Validate max_steps.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/local/lib/python3.6/dist-packages/tensorflow_estimator/python/estimator/training.py\u001b[0m in \u001b[0;36m_validate_input_fn\u001b[0;34m(input_fn)\u001b[0m\n\u001b[1;32m 51\u001b[0m \u001b[0;34m\"\"\"Validates the `input_fn`.\"\"\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 52\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mcallable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0minput_fn\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 53\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mTypeError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'`input_fn` must be callable, given: {}'\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0minput_fn\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 54\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 55\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mTypeError\u001b[0m: `input_fn` must be callable, given: <DatasetV1Adapter shapes: ((128, 9), (6,)), types: (tf.float32, tf.float32)>"
]
}
],
"source": [
"#@test {\"skip\": true}\n",
"#@title Parameters\n",
"LEARNING_RATE = 0.001 #@param {type:\"number\"}\n",
"TRAIN_STEPS = 50 #@param {type:\"integer\"}\n",
"BATCH_SIZE = 64 #@param {type:\"integer\"}\n",
"\n",
"estimator = tf.estimator.LinearClassifier(\n",
" feature_columns=feature_columns,\n",
" n_classes=NUM_CLASSES,\n",
" optimizer=tf.train.RMSPropOptimizer(learning_rate=LEARNING_RATE),\n",
" config=make_config(\"linear\"))\n",
"\n",
"tf.estimator.train_and_evaluate(\n",
" estimator,\n",
" train_spec=tf.estimator.TrainSpec(\n",
" input_fn=_input_fn(\"train\"),\n",
" max_steps=TRAIN_STEPS),\n",
" eval_spec=tf.estimator.EvalSpec(\n",
" input_fn=_input_fn(\"test\"),\n",
" steps=None,\n",
"# start_delay_secs=1,\n",
" throttle_secs=1, \n",
" ))\n",
"\n",
"# results = estimator.evaluate(\n",
"# _input_fn(\"test\"),\n",
"# steps=None)\n",
"# input_fn(\"test\", training=False, batch_size=BATCH_SIZE),\n",
"# steps=None)\n",
"# print(\"Accuracy:\", results[\"accuracy\"])\n",
"# print(\"Loss:\", results[\"average_loss\"])"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"#Visualize Classes\n",
"featlabels = tf.constant(['total_acc_x','total_acc_y','total_acc_z','body_acc_x','body_acc_y','body_acc_z','body_gyro_x','body_gyro_y','body_gyro_z'])\n",
"featlabels = ['total_acc_x',\n",
" 'total_acc_y',\n",
" 'total_acc_z',\n",
" 'body_acc_x',\n",
" 'body_acc_y',\n",
" 'body_acc_z',\n",
" 'body_gyro_x',\n",
" 'body_gyro_y',\n",
" 'body_gyro_z']\n",
"actlabels = ['Walking',\n",
" 'Upstairs',\n",
" 'Downstairs',\n",
" 'Sitting',\n",
" 'Standing',\n",
" 'Laying']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create the Example\n",
"example = tf.train.Example(features=tf.train.Features(feature={\n",
" 'totalAccX': tf.train.Feature(\n",
" float_list=tf.train.FloatList(value=data['totalAccX'])),\n",
" 'totalAccY': tf.train.Feature(\n",
" float_list=tf.train.FloatList(value=data['totalAccY'])),\n",
" 'totalAccZ': tf.train.Feature(\n",
" float_list=tf.train.FloatList(value=data['totalAccZ'])),\n",
" 'BodyAccX': tf.train.Feature(\n",
" float_list=tf.train.FloatList(value=data['BodyAccX'])),\n",
" 'BodyAccY': tf.train.Feature(\n",
" float_list=tf.train.FloatList(value=data['BodyAccY'])),\n",
" 'BodyAccZ': tf.train.Feature(\n",
" float_list=tf.train.FloatList(value=data['BodyAccZ']))\n",
"}))\n",
"\n",
"print(example)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"# Write TFrecord file\n",
"with tf.python_io.TFRecordWriter('randcase.tfrecord') as writer:\n",
" writer.write(example.SerializeToString())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Reading it in"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING:tensorflow:From <ipython-input-26-2a04b93ab95d>:5: TFRecordReader.__init__ (from tensorflow.python.ops.io_ops) is deprecated and will be removed in a future version.\n",
"Instructions for updating:\n",
"Queue-based input pipelines have been replaced by `tf.data`. Use `tf.data.TFRecordDataset`.\n",
"WARNING:tensorflow:From <ipython-input-26-2a04b93ab95d>:6: string_input_producer (from tensorflow.python.training.input) is deprecated and will be removed in a future version.\n",
"Instructions for updating:\n",
"Queue-based input pipelines have been replaced by `tf.data`. Use `tf.data.Dataset.from_tensor_slices(string_tensor).shuffle(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If `shuffle=False`, omit the `.shuffle(...)`.\n",
"WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/input.py:277: input_producer (from tensorflow.python.training.input) is deprecated and will be removed in a future version.\n",
"Instructions for updating:\n",
"Queue-based input pipelines have been replaced by `tf.data`. Use `tf.data.Dataset.from_tensor_slices(input_tensor).shuffle(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If `shuffle=False`, omit the `.shuffle(...)`.\n",
"WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/input.py:189: limit_epochs (from tensorflow.python.training.input) is deprecated and will be removed in a future version.\n",
"Instructions for updating:\n",
"Queue-based input pipelines have been replaced by `tf.data`. Use `tf.data.Dataset.from_tensors(tensor).repeat(num_epochs)`.\n",
"WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/input.py:198: QueueRunner.__init__ (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version.\n",
"Instructions for updating:\n",
"To construct input pipelines, use the `tf.data` module.\n",
"WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/input.py:198: add_queue_runner (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version.\n",
"Instructions for updating:\n",
"To construct input pipelines, use the `tf.data` module.\n"
]
}
],
"source": [
"# Read and print data:\n",
"sess = tf.InteractiveSession()\n",
"\n",
"# Read TFRecord file\n",
"reader = tf.TFRecordReader()\n",
"filename_queue = tf.train.string_input_producer(['randcase.tfrecord'])\n",
"\n",
"_, serialized_example = reader.read(filename_queue)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment