Created
January 25, 2018 02:19
-
-
Save zmjjmz/8e3a7e5430f2e700a9e89bf2b4f6259b to your computer and use it in GitHub Desktop.
endtoend with keras
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import urlparse | |
import os | |
import numpy | |
import boto3 | |
import tensorflow | |
from tensorflow.python.keras._impl import keras | |
from tensorflow.python.estimator.export.export_output import PredictOutput | |
from tensorflow.python.estimator.export.export import build_raw_serving_input_receiver_fn | |
from tensorflow.python.saved_model import signature_constants | |
from tensorflow.python import debug as tf_debug | |
import datautil | |
import ml_utils | |
# still working out how to best do this: | |
# ENDTOEND embedding only text model | |
# I have no clue what I'm doing | |
# reference: https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/tensorflow_abalone_age_predictor_using_keras/abalone.py | |
# fuck it | |
def keras_avgpool_linear_pad_endtoend(inp_placeholder, word_map_emb_pair, pad_length, | |
n_classes, random_seed, oov_thresh=0.9, embed_config={}, model_config={}): | |
# note: normally this doesn't make sense, but the examples don't seem to encapsulate the | |
# layers into a Model class, rather they're just defined and then I believe put into | |
# an EstimatorSpec. This makes some sense to me, although they're basically just using keras for the layer convenience | |
# lasagne style. | |
# expect these in the order filter_embeddings returns them | |
# nastyyy | |
word_ind_map, embedding_mat = word_map_emb_pair | |
numpy.random.seed(random_seed) | |
#inp = keras.layers.Input(shape=(1,), name='text', dtype='string') | |
# assume word_ind_map and embedding_mat has been fucked with accordingly | |
pad_value = 0 | |
oov_value = 1 | |
lookedup = ml_utils.TokenizeLookupLayer(word_ind_map, pad_length, | |
pad_value=pad_value, oov_value=oov_value, | |
name='lookedup')(inp_placeholder) | |
# only makes sense if pad_value is 0 | |
lengths = ml_utils.CountNonZeroLayer(name='get_len')(lookedup) | |
tensorflow.summary.histogram('lengths', lengths) | |
oov_code = ml_utils.OOVCodeLayer( | |
oov_value=oov_value, | |
oov_thresh=oov_thresh, | |
name='oov_code' | |
)([lookedup, lengths]) | |
test_word = 'dicks' | |
test_word_ind = word_ind_map[test_word] | |
emb = keras.layers.Embedding(*(embedding_mat.shape), weights=[embedding_mat], | |
input_length=pad_length, name='embed', | |
**embed_config)(lookedup) | |
pooler = ml_utils.MaskedGlobalAveragePooling1D(name='avg')([emb, lengths]) | |
model = keras.layers.core.Dense(n_classes, input_shape=(embedding_mat.shape[1],), | |
name='weights', **model_config)(pooler) | |
softmax = keras.layers.core.Activation('softmax', name='softmax')(model) | |
return {'softmax':softmax, | |
'oov_code':oov_code, | |
'pooler':pooler, | |
'emb':emb, | |
'lookedup':lookedup, | |
'length':lengths, | |
} | |
# TODO(ZJ): definitely move this somewhere else | |
def download_from_s3(uri, profile='sagemaker', local_cache='/tmp/'): | |
# TODO(ZJ): Handle errors probably | |
parsed = urlparse.urlparse(uri) | |
target_bucket = parsed.netloc | |
path = parsed.path.lstrip('/') #JHASKDHDKJASHDFJKHASKLJGFHJKLASH DFKJLHDSAKJDHSAKJDH SREALLLLLY S3 REALLLYHYYY | |
# create the session | |
sess = boto3.Session(profile_name=profile) | |
s3_acc = sess.resource('s3') | |
bucket = s3_acc.Bucket(target_bucket) | |
# download to /tmp (assume that's ok?) | |
new_filepath = os.path.join(local_cache, os.path.basename(path)) | |
bucket.download_file(path, new_filepath) | |
return new_filepath | |
class model_fn_defaults: | |
embedding_uri = None | |
pad_length = 162 | |
n_classes = 2 | |
random_seed = None | |
oov_thresh = 0.9 | |
#train_spec = {"optimizer":"sgd", "lr":0.01} | |
compile_spec = { | |
"optimizer":"sgd", | |
"loss":{"softmax":"categorical_crossentropy"}, | |
"metrics":["accuracy"] | |
} | |
text_inp = 'text_processed' | |
s3_profile = None | |
embed_config = {"trainable":False} | |
model_config = {} | |
def keras_model_fn(params): | |
# TODO(ZJ) this could be done better probably please so help me god | |
embedding_uri = params.get('embedding_uri', model_fn_defaults.embedding_uri) | |
s3_profile = params.get('s3_profile', model_fn_defaults.s3_profile) | |
pad_length = params.get('pad_length', model_fn_defaults.pad_length) | |
n_classes = params.get('n_classes', model_fn_defaults.n_classes) | |
random_seed = params.get('random_seed', model_fn_defaults.random_seed) | |
oov_thresh = params.get('oov_thresh', model_fn_defaults.oov_thresh) | |
compile_spec = params.get('compile_spec', model_fn_defaults.compile_spec) | |
text_inp = params.get('text_inp', model_fn_defaults.text_inp) | |
embed_config = params.get('embed_config', model_fn_defaults.embed_config) | |
model_config = params.get('model_config', model_fn_defaults.model_config) | |
# ensure that all keras stuff is done on the default graph | |
# otherwise we'll get a disconnected graph | |
# how do I close session | |
# XXX this is super not-general obviously. I don't know what the pipeline | |
# looks like where it would make sense to generalize this, so I'm not going | |
# to worry about it! | |
inp = keras.layers.Input(shape=(1,), name='text_processed', dtype='string') | |
# TODO: this should be a call to an s3 resource to download the embeddings at a specified URI | |
embedding_location = download_from_s3(embedding_uri, profile=s3_profile) | |
word_map_emb_pair = datautil.load_cPkl(embedding_location) | |
tensors = keras_avgpool_linear_pad_endtoend(inp, | |
word_map_emb_pair, pad_length, n_classes, random_seed, oov_thresh=oov_thresh, | |
embed_config=embed_config, model_config=model_config) | |
pred_dict = tensors | |
model = keras.models.Model(inputs=[inp], outputs=[tensors['softmax']]) | |
model.compile(**compile_spec) | |
return model | |
def _main_input_fn(path, group_col, group, data_col, label_col, data_type=numpy.unicode, | |
label_type=numpy.float32, batch_size=32, shuffle=True, num_epochs=None): | |
# I'm going to make this one a bit more general | |
# actually does the work around here | |
# TODO(ZJ): consider that the omc_pipeline text processing steps can be done in here! | |
# load the file | |
df_ = datautil.load_cPkl(path) | |
df_group = df_[df_[group_col] == group] | |
# assume we can do this | |
# assume data_col is the same name as the input tensor | |
data_dict = {data_col:numpy.expand_dims( | |
numpy.stack(df_group[data_col]).astype(data_type), | |
axis=-1)} | |
# determine number of classes automatically, assuming the label_col is a class vector | |
n_classes = max(df_[label_col]) + 1 | |
labels = tensorflow.keras.utils.to_categorical(df_group[label_col], num_classes=n_classes).astype( | |
label_type) | |
return tensorflow.estimator.inputs.numpy_input_fn( | |
x=data_dict, y=labels, | |
shuffle=shuffle, | |
batch_size=batch_size, | |
num_epochs=num_epochs | |
) | |
class train_input_fn_defaults: | |
filename = None | |
train_batch_size = 32 | |
train_grp = 'train' | |
data_col = 'text_processed' | |
label_col = 'label' | |
group_col = 'group' | |
num_epochs = None # train by number of steps by default | |
shuffle = True | |
def get_train_input_fn(training_dir, params): | |
# TODO(ZJ) udhfiguhjdsfkjfhjklfjasdjfas | |
filename = params.get('filename', train_input_fn_defaults.filename) | |
batch_size = params.get('train_batch_size', train_input_fn_defaults.train_batch_size) | |
train_grp = params.get('train_grp', train_input_fn_defaults.train_grp) | |
data_col = params.get('data_col', train_input_fn_defaults.data_col) | |
label_col = params.get('label_col', train_input_fn_defaults.label_col) | |
group_col = params.get('group_col', train_input_fn_defaults.group_col) | |
num_epochs = params.get('train_num_epochs', train_input_fn_defaults.num_epochs) | |
shuffle = params.get('train_shuffle', train_input_fn_defaults.shuffle) | |
if filename is None: | |
raise NotImplementedError("At the moment you still have to provide a filename for where your dataframe is") | |
fullpath = os.path.join(training_dir, filename) | |
return _main_input_fn(fullpath, group_col, train_grp, data_col, label_col, batch_size=batch_size, | |
num_epochs=num_epochs, shuffle=shuffle) | |
def train_input_fn(training_dir, params): | |
# fuuuuuck youuuuu sagemaaaakkeeeeerrrr | |
return get_train_input_fn(training_dir, params)() | |
class eval_input_fn_defaults: | |
filename = None | |
eval_batch_size = 128 | |
eval_grp = 'val' | |
data_col = 'text_processed' | |
label_col = 'label' | |
group_col = 'group' | |
num_epochs = 1 # eval the whole dataset but silly to do more than once | |
shuffle = False | |
def get_eval_input_fn(training_dir, params): | |
# TODO(ZJ) udhfiguhjdsfkjfhjklfjasdjfas | |
filename = params.get('filename', eval_input_fn_defaults.filename) | |
batch_size = params.get('eval_batch_size', eval_input_fn_defaults.eval_batch_size) | |
eval_grp = params.get('eval_grp', eval_input_fn_defaults.eval_grp) | |
data_col = params.get('data_col', eval_input_fn_defaults.data_col) | |
label_col = params.get('label_col', eval_input_fn_defaults.label_col) | |
group_col = params.get('group_col', eval_input_fn_defaults.group_col) | |
num_epochs = params.get('eval_num_epochs', eval_input_fn_defaults.num_epochs) | |
shuffle = params.get('eval_shuffle', eval_input_fn_defaults.shuffle) | |
# ok, same deal | |
if filename is None: | |
raise NotImplementedError("At the moment you still have to provide a filename for where your dataframe is") | |
fullpath = os.path.join(training_dir, filename) | |
return _main_input_fn(fullpath, group_col, eval_grp, data_col, label_col, batch_size=batch_size, | |
num_epochs=num_epochs, shuffle=shuffle) | |
def eval_input_fn(training_dir, params): | |
# fuuuuuck youuuuu sagemaaaakkeeeeerrrr | |
return get_eval_input_fn(training_dir, params)() | |
class serving_input_fn_defaults: | |
tensor_name = 'text_processed' | |
def get_serving_input_fn(params): | |
tensor_name = params.get('tensor_name', serving_input_fn_defaults.tensor_name) | |
# not sure what the hyperparameters are for | |
# annoyingly I can't really give this the model and say, hey, get what you need | |
# as I did in the previous stuff? | |
tensor = tensorflow.placeholder(tensorflow.string, shape=[None,1]) | |
return build_raw_serving_input_receiver_fn({ | |
tensor_name:tensor | |
}) | |
def serving_input_fn(params): | |
# fuuuuuck youuuuu sagemaaaakkeeeeerrrr | |
return get_serving_input_fn(params)() | |
# TODO (ZJ): I *think* that the omc_pipeline could be run in an input_fn. However I'm really not sure what the InvokeEndpoint data looks like | |
if __name__ == "__main__": | |
import sys | |
print("Testing {0} locally".format(sys.argv[0])) | |
df_name = 'small_dataset.pkl' | |
emb_name = 'glove_filtered_small_split.pkl' | |
data_dir = '/vol/data0/tmp/test_stuff_idunno/s3_cache/' | |
target_dir = 'data/testing_dataset' | |
bucket = 'sagemaker-omc-test' | |
s3_uri = 's3://' + os.path.join(bucket, target_dir) | |
hyperparameters = { | |
"embedding_uri":os.path.join(s3_uri, emb_name), | |
"filename":df_name, | |
"s3_profile":"sagemaker", | |
"eval_num_epochs":1, | |
"train_num_epochs":1, | |
"train_shuffle":False, | |
} | |
print("Making estimator") | |
train_input = train_input_fn(data_dir, hyperparameters) | |
model = keras_model_fn(hyperparameters) | |
keras_estimator = keras.estimator.model_to_estimator(keras_model=model) | |
print("Training keras_estimator") | |
_train_input_fn = get_train_input_fn(data_dir, hyperparameters) | |
keras_estimator.train(input_fn=_train_input_fn, steps=None,) | |
#hooks=[tf_debug.LocalCLIDebugHook()]) | |
print("Evaluating keras_estimator") | |
eval_dict = keras_estimator.evaluate(input_fn=get_eval_input_fn(data_dir, hyperparameters), steps=None,) | |
#hooks=[tf_debug.LocalCLIDebugHook()]) | |
print(eval_dict) | |
print("Exporting keras_estimator") | |
print(keras_estimator._config) | |
keras_estimator.export_savedmodel( | |
os.path.join(data_dir, 'exported'), | |
get_serving_input_fn(hyperparameters), | |
) | |
print("Predicting") | |
test_inputs = [ | |
"dicks", | |
] | |
predicted = keras_estimator.predict( | |
input_fn=tensorflow.keras_estimator.inputs.numpy_input_fn( | |
x={ | |
'text_processed':numpy.expand_dims( | |
numpy.stack(test_inputs).astype(numpy.unicode), | |
axis=-1) | |
}, | |
num_epochs=1, | |
shuffle=False | |
), | |
hooks=[tf_debug.LocalCLIDebugHook()] | |
) | |
for inp in test_inputs: | |
print("Input") | |
print(inp) | |
print("Output") | |
print(predicted.next()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment