Skip to content

Instantly share code, notes, and snippets.

@andrewm4894
Created October 3, 2017 21:38
Show Gist options
  • Save andrewm4894/ebd3ac3c87e2ab4af8a10740e85073bb to your computer and use it in GitHub Desktop.
Save andrewm4894/ebd3ac3c87e2ab4af8a10740e85073bb to your computer and use it in GitHub Desktop.
a version of model.py adapted to my problem but that also has ability to pass instance keys when predicting.
"""Define a Wide + Deep model for classification on structured data."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import multiprocessing
import tensorflow as tf
from tensorflow.python.lib.io import file_io
import json
# read data schema file
with file_io.FileIO('gs://pmc-ml/clickmodel/data_schema.json', 'r') as f:
data_schema = json.load(f)
# parse data schema
CSV_COLUMNS = []
CSV_COLUMN_DEFAULTS = []
# build column info lists
for i in range(1,(len(data_schema)+1)):
CSV_COLUMNS.append(data_schema[str(i)]['name'])
CSV_COLUMN_DEFAULTS.append([data_schema[str(i)]['default']])
# define label column
LABEL_COLUMN = 'target'
LABELS = ['neg', 'pos']
# set up inputs
INPUT_COLUMNS = [
tf.feature_column.numeric_column(
'key'),
tf.feature_column.categorical_column_with_vocabulary_list(
'post_day_pst', data_schema['4']['keys'].split(',') ),
tf.feature_column.categorical_column_with_vocabulary_list(
'post_hour_bin_pst', data_schema['5']['keys'].split(',') ),
tf.feature_column.numeric_column(
'post_word_count'),
tf.feature_column.numeric_column(
'user_sessions_per_day'),
tf.feature_column.numeric_column(
'user_pageviews_per_session'),
]
# define unused cols
UNUSED_COLUMNS = set(CSV_COLUMNS) - {col.name for col in INPUT_COLUMNS} - {LABEL_COLUMN}
# define key column
KEY = 'key'
def key_model_fn_gen(estimator):
def _model_fn(features, labels, mode):
key = features.pop(KEY)
params = estimator.params
model_fn_ops = estimator._model_fn(features=features, labels=labels, mode=mode, params=params)
model_fn_ops.predictions[KEY] = key
model_fn_ops.output_alternatives[None][1][KEY] = key # <== UPDATED
print(model_fn_ops.output_alternatives)
return model_fn_ops
return _model_fn
def build_estimator(config, embedding_size=8, hidden_units=None):
"""Build a wide and deep model for predicting target.
Args:
config: tf.contrib.learn.RunConfig defining the runtime environment for the
estimator (including model_dir).
embedding_size: int, the number of dimensions used to represent categorical
features when providing them as inputs to the DNN.
hidden_units: [int], the layer sizes of the DNN (input layer first)
learning_rate: float, the learning rate for the optimizer.
Returns:
A DNNCombinedLinearClassifier
"""
(key,
post_day_pst,
post_hour_bin_pst,
post_word_count,
user_sessions_per_day,
user_pageviews_per_session
) = INPUT_COLUMNS
"""Build an estimator."""
# Reused Transformations.
# Continuous columns can be converted to categorical via bucketization
post_word_count_buckets = tf.feature_column.bucketized_column(post_word_count, boundaries=[250,1000,5000])
# Wide columns
wide_columns = [
# Interactions between different categorical features can also
# be added as new virtual features.
tf.feature_column.crossed_column(
['post_day_pst', 'post_hour_bin_pst'], hash_bucket_size=int(50)),
post_word_count_buckets,
]
# Deep columns
deep_columns = [
post_word_count,
user_sessions_per_day,
user_pageviews_per_session,
]
return tf.contrib.learn.Estimator(
model_fn=key_model_fn_gen(
tf.contrib.learn.DNNLinearCombinedClassifier(
config=config,
linear_feature_columns=wide_columns,
dnn_feature_columns=deep_columns,
dnn_hidden_units=hidden_units or [100, 70, 50, 25],
fix_global_step_increment_bug=True)
),
config=config
)
def parse_label_column(label_string_tensor):
"""Parses a string tensor into the label tensor
Args:
label_string_tensor: Tensor of dtype string. Result of parsing the
CSV column specified by LABEL_COLUMN
Returns:
A Tensor of the same shape as label_string_tensor, should return
an int64 Tensor representing the label index for classification tasks,
and a float32 Tensor representing the value for a regression task.
"""
# Build a Hash Table inside the graph
table = tf.contrib.lookup.index_table_from_tensor(tf.constant(LABELS))
# Use the hash table to convert string labels to ints and one-hot encode
return table.lookup(label_string_tensor)
def csv_serving_input_fn():
"""Build the serving inputs."""
csv_row = tf.placeholder(
shape=[None],
dtype=tf.string
)
features = parse_csv(csv_row)
features.pop(LABEL_COLUMN)
return tf.contrib.learn.InputFnOps(features, None, {'csv_row': csv_row})
def example_serving_input_fn():
"""Build the serving inputs."""
example_bytestring = tf.placeholder(
shape=[None],
dtype=tf.string,
)
feature_scalars = tf.parse_example(
example_bytestring,
tf.feature_column.make_parse_example_spec(INPUT_COLUMNS)
)
features = {
key: tf.expand_dims(tensor, -1)
for key, tensor in feature_scalars.iteritems()
}
return tf.contrib.learn.InputFnOps(
features,
None, # labels
{'example_proto': example_bytestring}
)
def json_serving_input_fn():
"""Build the serving inputs."""
inputs = {}
inputs[KEY] = tf.placeholder(shape=[None], dtype=tf.string)
for feat in INPUT_COLUMNS:
inputs[feat.name] = tf.placeholder(shape=[None], dtype=feat.dtype)
features = {
key: tf.expand_dims(tensor, -1)
for key, tensor in inputs.items()
}
return tf.contrib.learn.InputFnOps(features, None, inputs)
SERVING_FUNCTIONS = {
'JSON': json_serving_input_fn,
'EXAMPLE': example_serving_input_fn,
'CSV': csv_serving_input_fn
}
def parse_csv(rows_string_tensor):
"""Takes the string input tensor and returns a dict of rank-2 tensors."""
# Takes a rank-1 tensor and converts it into rank-2 tensor
# Example if the data is ['csv,line,1', 'csv,line,2', ..] to
# [['csv,line,1'], ['csv,line,2']] which after parsing will result in a
# tuple of tensors: [['csv'], ['csv']], [['line'], ['line']], [[1], [2]]
row_columns = tf.expand_dims(rows_string_tensor, -1)
columns = tf.decode_csv(row_columns, record_defaults=CSV_COLUMN_DEFAULTS)
features = dict(zip(CSV_COLUMNS, columns))
# Remove unused columns
for col in UNUSED_COLUMNS:
features.pop(col)
return features
def generate_input_fn(filenames,
num_epochs=None,
shuffle=True,
skip_header_lines=0,
batch_size=200):
"""Generates an input function for training or evaluation.
This uses the input pipeline based approach using file name queue
to read data so that entire data is not loaded in memory.
Args:
filenames: [str] list of CSV files to read data from.
num_epochs: int how many times through to read the data.
If None will loop through data indefinitely
shuffle: bool, whether or not to randomize the order of data.
Controls randomization of both file order and line order within
files.
skip_header_lines: int set to non-zero in order to skip header lines
in CSV files.
batch_size: int First dimension size of the Tensors returned by
input_fn
Returns:
A function () -> (features, indices) where features is a dictionary of
Tensors, and indices is a single Tensor of label indices.
"""
# match on file patterns
input_file_names = tf.train.match_filenames_once(filenames)
filename_queue = tf.train.string_input_producer(
input_file_names, num_epochs=num_epochs, shuffle=shuffle)
reader = tf.TextLineReader(skip_header_lines=skip_header_lines)
_, rows = reader.read_up_to(filename_queue, num_records=batch_size)
# Parse the CSV File
features = parse_csv(rows)
# This operation builds up a buffer of parsed tensors, so that parsing
# input data doesn't block training
# If requested it will also shuffle
if shuffle:
features = tf.train.shuffle_batch(
features,
batch_size,
min_after_dequeue=2 * batch_size + 1,
capacity=batch_size * 10,
num_threads=multiprocessing.cpu_count(),
enqueue_many=True,
allow_smaller_final_batch=True
)
else:
features = tf.train.batch(
features,
batch_size,
capacity=batch_size * 10,
num_threads=multiprocessing.cpu_count(),
enqueue_many=True,
allow_smaller_final_batch=True
)
return features, parse_label_column(features.pop(LABEL_COLUMN))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment