Created
October 3, 2017 21:38
-
-
Save andrewm4894/ebd3ac3c87e2ab4af8a10740e85073bb to your computer and use it in GitHub Desktop.
a version of model.py adapted to my problem but that also has ability to pass instance keys when predicting.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Define a Wide + Deep model for classification on structured data.""" | |
from __future__ import absolute_import | |
from __future__ import division | |
from __future__ import print_function | |
import multiprocessing | |
import tensorflow as tf | |
from tensorflow.python.lib.io import file_io | |
import json | |
# read data schema file | |
with file_io.FileIO('gs://pmc-ml/clickmodel/data_schema.json', 'r') as f: | |
data_schema = json.load(f) | |
# parse data schema | |
CSV_COLUMNS = [] | |
CSV_COLUMN_DEFAULTS = [] | |
# build column info lists | |
for i in range(1,(len(data_schema)+1)): | |
CSV_COLUMNS.append(data_schema[str(i)]['name']) | |
CSV_COLUMN_DEFAULTS.append([data_schema[str(i)]['default']]) | |
# define label column | |
LABEL_COLUMN = 'target' | |
LABELS = ['neg', 'pos'] | |
# set up inputs | |
INPUT_COLUMNS = [ | |
tf.feature_column.numeric_column( | |
'key'), | |
tf.feature_column.categorical_column_with_vocabulary_list( | |
'post_day_pst', data_schema['4']['keys'].split(',') ), | |
tf.feature_column.categorical_column_with_vocabulary_list( | |
'post_hour_bin_pst', data_schema['5']['keys'].split(',') ), | |
tf.feature_column.numeric_column( | |
'post_word_count'), | |
tf.feature_column.numeric_column( | |
'user_sessions_per_day'), | |
tf.feature_column.numeric_column( | |
'user_pageviews_per_session'), | |
] | |
# define unused cols | |
UNUSED_COLUMNS = set(CSV_COLUMNS) - {col.name for col in INPUT_COLUMNS} - {LABEL_COLUMN} | |
# define key column | |
KEY = 'key' | |
def key_model_fn_gen(estimator): | |
def _model_fn(features, labels, mode): | |
key = features.pop(KEY) | |
params = estimator.params | |
model_fn_ops = estimator._model_fn(features=features, labels=labels, mode=mode, params=params) | |
model_fn_ops.predictions[KEY] = key | |
model_fn_ops.output_alternatives[None][1][KEY] = key # <== UPDATED | |
print(model_fn_ops.output_alternatives) | |
return model_fn_ops | |
return _model_fn | |
def build_estimator(config, embedding_size=8, hidden_units=None): | |
"""Build a wide and deep model for predicting target. | |
Args: | |
config: tf.contrib.learn.RunConfig defining the runtime environment for the | |
estimator (including model_dir). | |
embedding_size: int, the number of dimensions used to represent categorical | |
features when providing them as inputs to the DNN. | |
hidden_units: [int], the layer sizes of the DNN (input layer first) | |
learning_rate: float, the learning rate for the optimizer. | |
Returns: | |
A DNNCombinedLinearClassifier | |
""" | |
(key, | |
post_day_pst, | |
post_hour_bin_pst, | |
post_word_count, | |
user_sessions_per_day, | |
user_pageviews_per_session | |
) = INPUT_COLUMNS | |
"""Build an estimator.""" | |
# Reused Transformations. | |
# Continuous columns can be converted to categorical via bucketization | |
post_word_count_buckets = tf.feature_column.bucketized_column(post_word_count, boundaries=[250,1000,5000]) | |
# Wide columns | |
wide_columns = [ | |
# Interactions between different categorical features can also | |
# be added as new virtual features. | |
tf.feature_column.crossed_column( | |
['post_day_pst', 'post_hour_bin_pst'], hash_bucket_size=int(50)), | |
post_word_count_buckets, | |
] | |
# Deep columns | |
deep_columns = [ | |
post_word_count, | |
user_sessions_per_day, | |
user_pageviews_per_session, | |
] | |
return tf.contrib.learn.Estimator( | |
model_fn=key_model_fn_gen( | |
tf.contrib.learn.DNNLinearCombinedClassifier( | |
config=config, | |
linear_feature_columns=wide_columns, | |
dnn_feature_columns=deep_columns, | |
dnn_hidden_units=hidden_units or [100, 70, 50, 25], | |
fix_global_step_increment_bug=True) | |
), | |
config=config | |
) | |
def parse_label_column(label_string_tensor): | |
"""Parses a string tensor into the label tensor | |
Args: | |
label_string_tensor: Tensor of dtype string. Result of parsing the | |
CSV column specified by LABEL_COLUMN | |
Returns: | |
A Tensor of the same shape as label_string_tensor, should return | |
an int64 Tensor representing the label index for classification tasks, | |
and a float32 Tensor representing the value for a regression task. | |
""" | |
# Build a Hash Table inside the graph | |
table = tf.contrib.lookup.index_table_from_tensor(tf.constant(LABELS)) | |
# Use the hash table to convert string labels to ints and one-hot encode | |
return table.lookup(label_string_tensor) | |
def csv_serving_input_fn(): | |
"""Build the serving inputs.""" | |
csv_row = tf.placeholder( | |
shape=[None], | |
dtype=tf.string | |
) | |
features = parse_csv(csv_row) | |
features.pop(LABEL_COLUMN) | |
return tf.contrib.learn.InputFnOps(features, None, {'csv_row': csv_row}) | |
def example_serving_input_fn(): | |
"""Build the serving inputs.""" | |
example_bytestring = tf.placeholder( | |
shape=[None], | |
dtype=tf.string, | |
) | |
feature_scalars = tf.parse_example( | |
example_bytestring, | |
tf.feature_column.make_parse_example_spec(INPUT_COLUMNS) | |
) | |
features = { | |
key: tf.expand_dims(tensor, -1) | |
for key, tensor in feature_scalars.iteritems() | |
} | |
return tf.contrib.learn.InputFnOps( | |
features, | |
None, # labels | |
{'example_proto': example_bytestring} | |
) | |
def json_serving_input_fn(): | |
"""Build the serving inputs.""" | |
inputs = {} | |
inputs[KEY] = tf.placeholder(shape=[None], dtype=tf.string) | |
for feat in INPUT_COLUMNS: | |
inputs[feat.name] = tf.placeholder(shape=[None], dtype=feat.dtype) | |
features = { | |
key: tf.expand_dims(tensor, -1) | |
for key, tensor in inputs.items() | |
} | |
return tf.contrib.learn.InputFnOps(features, None, inputs) | |
SERVING_FUNCTIONS = { | |
'JSON': json_serving_input_fn, | |
'EXAMPLE': example_serving_input_fn, | |
'CSV': csv_serving_input_fn | |
} | |
def parse_csv(rows_string_tensor): | |
"""Takes the string input tensor and returns a dict of rank-2 tensors.""" | |
# Takes a rank-1 tensor and converts it into rank-2 tensor | |
# Example if the data is ['csv,line,1', 'csv,line,2', ..] to | |
# [['csv,line,1'], ['csv,line,2']] which after parsing will result in a | |
# tuple of tensors: [['csv'], ['csv']], [['line'], ['line']], [[1], [2]] | |
row_columns = tf.expand_dims(rows_string_tensor, -1) | |
columns = tf.decode_csv(row_columns, record_defaults=CSV_COLUMN_DEFAULTS) | |
features = dict(zip(CSV_COLUMNS, columns)) | |
# Remove unused columns | |
for col in UNUSED_COLUMNS: | |
features.pop(col) | |
return features | |
def generate_input_fn(filenames, | |
num_epochs=None, | |
shuffle=True, | |
skip_header_lines=0, | |
batch_size=200): | |
"""Generates an input function for training or evaluation. | |
This uses the input pipeline based approach using file name queue | |
to read data so that entire data is not loaded in memory. | |
Args: | |
filenames: [str] list of CSV files to read data from. | |
num_epochs: int how many times through to read the data. | |
If None will loop through data indefinitely | |
shuffle: bool, whether or not to randomize the order of data. | |
Controls randomization of both file order and line order within | |
files. | |
skip_header_lines: int set to non-zero in order to skip header lines | |
in CSV files. | |
batch_size: int First dimension size of the Tensors returned by | |
input_fn | |
Returns: | |
A function () -> (features, indices) where features is a dictionary of | |
Tensors, and indices is a single Tensor of label indices. | |
""" | |
# match on file patterns | |
input_file_names = tf.train.match_filenames_once(filenames) | |
filename_queue = tf.train.string_input_producer( | |
input_file_names, num_epochs=num_epochs, shuffle=shuffle) | |
reader = tf.TextLineReader(skip_header_lines=skip_header_lines) | |
_, rows = reader.read_up_to(filename_queue, num_records=batch_size) | |
# Parse the CSV File | |
features = parse_csv(rows) | |
# This operation builds up a buffer of parsed tensors, so that parsing | |
# input data doesn't block training | |
# If requested it will also shuffle | |
if shuffle: | |
features = tf.train.shuffle_batch( | |
features, | |
batch_size, | |
min_after_dequeue=2 * batch_size + 1, | |
capacity=batch_size * 10, | |
num_threads=multiprocessing.cpu_count(), | |
enqueue_many=True, | |
allow_smaller_final_batch=True | |
) | |
else: | |
features = tf.train.batch( | |
features, | |
batch_size, | |
capacity=batch_size * 10, | |
num_threads=multiprocessing.cpu_count(), | |
enqueue_many=True, | |
allow_smaller_final_batch=True | |
) | |
return features, parse_label_column(features.pop(LABEL_COLUMN)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment