Skip to content

Instantly share code, notes, and snippets.

@zmjjmz
Created January 25, 2018 02:19
Show Gist options
  • Save zmjjmz/8e3a7e5430f2e700a9e89bf2b4f6259b to your computer and use it in GitHub Desktop.
Save zmjjmz/8e3a7e5430f2e700a9e89bf2b4f6259b to your computer and use it in GitHub Desktop.
endtoend with keras
from __future__ import division
import urlparse
import os
import numpy
import boto3
import tensorflow
from tensorflow.python.keras._impl import keras
from tensorflow.python.estimator.export.export_output import PredictOutput
from tensorflow.python.estimator.export.export import build_raw_serving_input_receiver_fn
from tensorflow.python.saved_model import signature_constants
from tensorflow.python import debug as tf_debug
import datautil
import ml_utils
# still working out how to best do this:
# ENDTOEND embedding only text model
# I have no clue what I'm doing
# reference: https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/tensorflow_abalone_age_predictor_using_keras/abalone.py
# fuck it
def keras_avgpool_linear_pad_endtoend(inp_placeholder, word_map_emb_pair, pad_length,
n_classes, random_seed, oov_thresh=0.9, embed_config={}, model_config={}):
# note: normally this doesn't make sense, but the examples don't seem to encapsulate the
# layers into a Model class, rather they're just defined and then I believe put into
# an EstimatorSpec. This makes some sense to me, although they're basically just using keras for the layer convenience
# lasagne style.
# expect these in the order filter_embeddings returns them
# nastyyy
word_ind_map, embedding_mat = word_map_emb_pair
numpy.random.seed(random_seed)
#inp = keras.layers.Input(shape=(1,), name='text', dtype='string')
# assume word_ind_map and embedding_mat has been fucked with accordingly
pad_value = 0
oov_value = 1
lookedup = ml_utils.TokenizeLookupLayer(word_ind_map, pad_length,
pad_value=pad_value, oov_value=oov_value,
name='lookedup')(inp_placeholder)
# only makes sense if pad_value is 0
lengths = ml_utils.CountNonZeroLayer(name='get_len')(lookedup)
tensorflow.summary.histogram('lengths', lengths)
oov_code = ml_utils.OOVCodeLayer(
oov_value=oov_value,
oov_thresh=oov_thresh,
name='oov_code'
)([lookedup, lengths])
test_word = 'dicks'
test_word_ind = word_ind_map[test_word]
emb = keras.layers.Embedding(*(embedding_mat.shape), weights=[embedding_mat],
input_length=pad_length, name='embed',
**embed_config)(lookedup)
pooler = ml_utils.MaskedGlobalAveragePooling1D(name='avg')([emb, lengths])
model = keras.layers.core.Dense(n_classes, input_shape=(embedding_mat.shape[1],),
name='weights', **model_config)(pooler)
softmax = keras.layers.core.Activation('softmax', name='softmax')(model)
return {'softmax':softmax,
'oov_code':oov_code,
'pooler':pooler,
'emb':emb,
'lookedup':lookedup,
'length':lengths,
}
# TODO(ZJ): definitely move this somewhere else
def download_from_s3(uri, profile='sagemaker', local_cache='/tmp/'):
# TODO(ZJ): Handle errors probably
parsed = urlparse.urlparse(uri)
target_bucket = parsed.netloc
path = parsed.path.lstrip('/') #JHASKDHDKJASHDFJKHASKLJGFHJKLASH DFKJLHDSAKJDHSAKJDH SREALLLLLY S3 REALLLYHYYY
# create the session
sess = boto3.Session(profile_name=profile)
s3_acc = sess.resource('s3')
bucket = s3_acc.Bucket(target_bucket)
# download to /tmp (assume that's ok?)
new_filepath = os.path.join(local_cache, os.path.basename(path))
bucket.download_file(path, new_filepath)
return new_filepath
class model_fn_defaults:
embedding_uri = None
pad_length = 162
n_classes = 2
random_seed = None
oov_thresh = 0.9
#train_spec = {"optimizer":"sgd", "lr":0.01}
compile_spec = {
"optimizer":"sgd",
"loss":{"softmax":"categorical_crossentropy"},
"metrics":["accuracy"]
}
text_inp = 'text_processed'
s3_profile = None
embed_config = {"trainable":False}
model_config = {}
def keras_model_fn(params):
# TODO(ZJ) this could be done better probably please so help me god
embedding_uri = params.get('embedding_uri', model_fn_defaults.embedding_uri)
s3_profile = params.get('s3_profile', model_fn_defaults.s3_profile)
pad_length = params.get('pad_length', model_fn_defaults.pad_length)
n_classes = params.get('n_classes', model_fn_defaults.n_classes)
random_seed = params.get('random_seed', model_fn_defaults.random_seed)
oov_thresh = params.get('oov_thresh', model_fn_defaults.oov_thresh)
compile_spec = params.get('compile_spec', model_fn_defaults.compile_spec)
text_inp = params.get('text_inp', model_fn_defaults.text_inp)
embed_config = params.get('embed_config', model_fn_defaults.embed_config)
model_config = params.get('model_config', model_fn_defaults.model_config)
# ensure that all keras stuff is done on the default graph
# otherwise we'll get a disconnected graph
# how do I close session
# XXX this is super not-general obviously. I don't know what the pipeline
# looks like where it would make sense to generalize this, so I'm not going
# to worry about it!
inp = keras.layers.Input(shape=(1,), name='text_processed', dtype='string')
# TODO: this should be a call to an s3 resource to download the embeddings at a specified URI
embedding_location = download_from_s3(embedding_uri, profile=s3_profile)
word_map_emb_pair = datautil.load_cPkl(embedding_location)
tensors = keras_avgpool_linear_pad_endtoend(inp,
word_map_emb_pair, pad_length, n_classes, random_seed, oov_thresh=oov_thresh,
embed_config=embed_config, model_config=model_config)
pred_dict = tensors
model = keras.models.Model(inputs=[inp], outputs=[tensors['softmax']])
model.compile(**compile_spec)
return model
def _main_input_fn(path, group_col, group, data_col, label_col, data_type=numpy.unicode,
label_type=numpy.float32, batch_size=32, shuffle=True, num_epochs=None):
# I'm going to make this one a bit more general
# actually does the work around here
# TODO(ZJ): consider that the omc_pipeline text processing steps can be done in here!
# load the file
df_ = datautil.load_cPkl(path)
df_group = df_[df_[group_col] == group]
# assume we can do this
# assume data_col is the same name as the input tensor
data_dict = {data_col:numpy.expand_dims(
numpy.stack(df_group[data_col]).astype(data_type),
axis=-1)}
# determine number of classes automatically, assuming the label_col is a class vector
n_classes = max(df_[label_col]) + 1
labels = tensorflow.keras.utils.to_categorical(df_group[label_col], num_classes=n_classes).astype(
label_type)
return tensorflow.estimator.inputs.numpy_input_fn(
x=data_dict, y=labels,
shuffle=shuffle,
batch_size=batch_size,
num_epochs=num_epochs
)
class train_input_fn_defaults:
filename = None
train_batch_size = 32
train_grp = 'train'
data_col = 'text_processed'
label_col = 'label'
group_col = 'group'
num_epochs = None # train by number of steps by default
shuffle = True
def get_train_input_fn(training_dir, params):
# TODO(ZJ) udhfiguhjdsfkjfhjklfjasdjfas
filename = params.get('filename', train_input_fn_defaults.filename)
batch_size = params.get('train_batch_size', train_input_fn_defaults.train_batch_size)
train_grp = params.get('train_grp', train_input_fn_defaults.train_grp)
data_col = params.get('data_col', train_input_fn_defaults.data_col)
label_col = params.get('label_col', train_input_fn_defaults.label_col)
group_col = params.get('group_col', train_input_fn_defaults.group_col)
num_epochs = params.get('train_num_epochs', train_input_fn_defaults.num_epochs)
shuffle = params.get('train_shuffle', train_input_fn_defaults.shuffle)
if filename is None:
raise NotImplementedError("At the moment you still have to provide a filename for where your dataframe is")
fullpath = os.path.join(training_dir, filename)
return _main_input_fn(fullpath, group_col, train_grp, data_col, label_col, batch_size=batch_size,
num_epochs=num_epochs, shuffle=shuffle)
def train_input_fn(training_dir, params):
# fuuuuuck youuuuu sagemaaaakkeeeeerrrr
return get_train_input_fn(training_dir, params)()
class eval_input_fn_defaults:
filename = None
eval_batch_size = 128
eval_grp = 'val'
data_col = 'text_processed'
label_col = 'label'
group_col = 'group'
num_epochs = 1 # eval the whole dataset but silly to do more than once
shuffle = False
def get_eval_input_fn(training_dir, params):
# TODO(ZJ) udhfiguhjdsfkjfhjklfjasdjfas
filename = params.get('filename', eval_input_fn_defaults.filename)
batch_size = params.get('eval_batch_size', eval_input_fn_defaults.eval_batch_size)
eval_grp = params.get('eval_grp', eval_input_fn_defaults.eval_grp)
data_col = params.get('data_col', eval_input_fn_defaults.data_col)
label_col = params.get('label_col', eval_input_fn_defaults.label_col)
group_col = params.get('group_col', eval_input_fn_defaults.group_col)
num_epochs = params.get('eval_num_epochs', eval_input_fn_defaults.num_epochs)
shuffle = params.get('eval_shuffle', eval_input_fn_defaults.shuffle)
# ok, same deal
if filename is None:
raise NotImplementedError("At the moment you still have to provide a filename for where your dataframe is")
fullpath = os.path.join(training_dir, filename)
return _main_input_fn(fullpath, group_col, eval_grp, data_col, label_col, batch_size=batch_size,
num_epochs=num_epochs, shuffle=shuffle)
def eval_input_fn(training_dir, params):
# fuuuuuck youuuuu sagemaaaakkeeeeerrrr
return get_eval_input_fn(training_dir, params)()
class serving_input_fn_defaults:
tensor_name = 'text_processed'
def get_serving_input_fn(params):
tensor_name = params.get('tensor_name', serving_input_fn_defaults.tensor_name)
# not sure what the hyperparameters are for
# annoyingly I can't really give this the model and say, hey, get what you need
# as I did in the previous stuff?
tensor = tensorflow.placeholder(tensorflow.string, shape=[None,1])
return build_raw_serving_input_receiver_fn({
tensor_name:tensor
})
def serving_input_fn(params):
# fuuuuuck youuuuu sagemaaaakkeeeeerrrr
return get_serving_input_fn(params)()
# TODO (ZJ): I *think* that the omc_pipeline could be run in an input_fn. However I'm really not sure what the InvokeEndpoint data looks like
if __name__ == "__main__":
import sys
print("Testing {0} locally".format(sys.argv[0]))
df_name = 'small_dataset.pkl'
emb_name = 'glove_filtered_small_split.pkl'
data_dir = '/vol/data0/tmp/test_stuff_idunno/s3_cache/'
target_dir = 'data/testing_dataset'
bucket = 'sagemaker-omc-test'
s3_uri = 's3://' + os.path.join(bucket, target_dir)
hyperparameters = {
"embedding_uri":os.path.join(s3_uri, emb_name),
"filename":df_name,
"s3_profile":"sagemaker",
"eval_num_epochs":1,
"train_num_epochs":1,
"train_shuffle":False,
}
print("Making estimator")
train_input = train_input_fn(data_dir, hyperparameters)
model = keras_model_fn(hyperparameters)
keras_estimator = keras.estimator.model_to_estimator(keras_model=model)
print("Training keras_estimator")
_train_input_fn = get_train_input_fn(data_dir, hyperparameters)
keras_estimator.train(input_fn=_train_input_fn, steps=None,)
#hooks=[tf_debug.LocalCLIDebugHook()])
print("Evaluating keras_estimator")
eval_dict = keras_estimator.evaluate(input_fn=get_eval_input_fn(data_dir, hyperparameters), steps=None,)
#hooks=[tf_debug.LocalCLIDebugHook()])
print(eval_dict)
print("Exporting keras_estimator")
print(keras_estimator._config)
keras_estimator.export_savedmodel(
os.path.join(data_dir, 'exported'),
get_serving_input_fn(hyperparameters),
)
print("Predicting")
test_inputs = [
"dicks",
]
predicted = keras_estimator.predict(
input_fn=tensorflow.keras_estimator.inputs.numpy_input_fn(
x={
'text_processed':numpy.expand_dims(
numpy.stack(test_inputs).astype(numpy.unicode),
axis=-1)
},
num_epochs=1,
shuffle=False
),
hooks=[tf_debug.LocalCLIDebugHook()]
)
for inp in test_inputs:
print("Input")
print(inp)
print("Output")
print(predicted.next())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment