Skip to content

Instantly share code, notes, and snippets.


jreuben11/ Secret

Created Mar 17, 2017
What would you like to do?
Distributed TensorFlow Pipeline using Google Cloud Machine Learning Engine

Distributed TensorFlow Pipeline using Google Cloud Machine Learning Engine


Wide and Deep Models - combine Linear Model key feature memorization & DNN generalization: tf.contrib.learn.DNNLinearCombinedClassifier

problem: Predicting Income with the Census Income Dataset - Given census data about a person such as age, gender, education and occupation (the features), we will try to predict whether or not the person earns more than 50,000 dollars a year (the target label)

50000 records

Our GCP ML-Engine Console:$GCP_PROJECT



  • Construct and train a Wide & Deep TensorFlow Deep Learning Model use the high level tf.contrib.learn.Estimator API.
  • Specify a pipeline for staged evaluation: from single-worker training to distributed training without any code changes
  • Leverage Google Cloud Machine Learning Engine - run training jobs & export model binaries for prediction

Download the data

Census Income Data Set by the UC Irvine Machine Learning Repository. hosted on Google Cloud Storage:

  • Training file is
  • Evaluation file is adult.test.csv

Run Exports

set up environment

export CENSUS_DATA=census_data
export EVAL_FILE=adult.test.csv

export TRAIN_GCS_FILE=gs://cloudml-public/census/data/$TRAIN_FILE
export EVAL_GCS_FILE=gs://cloudml-public/census/data/$EVAL_FILE


Virtual environment

allows running without changing global python packages on your system. Install Miniconda

  • Create conda environment conda create --name single-tf python=2.7
  • Activate env source activate single-tf

Install dependencies

code analysis

learn_runner creates an Experiment which executes model code (Estimator and input functions) → generate_experiment_fn returns experiment_fn returns Experiment (

  • model.build_estimator returns DNNLinearCombinedClassifier (model_dir,wide_columns,deep_columns,hidden_units) ,
  • model.generate_input_fn x2 returns input_fn -> (features: Dict[Tensors], indices: Tensor[label indices]),
  • model.serving_input_fn returns InputFnOps(features,None,feature_placeholders ) )


  • parse arguments with argparse.ArgumentParser - add them with add_argument and extract a dict with parse_args
    • train-files - GCS or local path to training data
    • num-epochs
    • train-batch-size - default=40
    • eval-batch-size - default=40
    • train-steps - this or num-epochs required
    • eval-files - GCS or local path to test data
    • embedding-size - #embedding dimensions for categorical columns. default=8
    • first-layer-size - #nodes in 1st layer of DNN. default=100
    • num-layers - default=4
    • scale-factor - How quickly layer size should decay. default=0.7
    • job-dir - GCS location to write checkpoints and export models
    • verbose-logging
    • eval-delay-secs - Experiment arg: How long to wait before running first evaluation. default=1
    • min-eval-frequency - Experiment arg: Minimum number of training steps between evaluations. default=10
  • tensorflow.contrib.learn.python.learn.learn_runner runs the Experiment - run(experiment_fn, output_dir, schedule) - uses tf.learn.RunConfig to parse TF_CONFIG environment variables set by TF


  • Create an experiment function given hyperparameters - Returns: A function (output_dir) -> Experiment used by learn_runner to create an Experiment
  • the Experiment executes:
    • model.generate_input_fn functions to gather test & train inputs
    • returns Estimator - the ctor of this takes:
      • model.build_estimator - constructs the model topology
      • model.serving_input_fn - specifies export strategies to control the prediction graph structure


  • args:
    • model_dir - used by the Classifier for checkpoints summaries and exports.
    • embedding_size - #dimensions used to represent categorical features when input to the DNN.
    • hidden_units - DNN topology
  • leverage tensorflow.contrib.layers to ingest input data
    • layers.sparse_column_with_keys - For categorical columns with known values, specify keys: lists of values
    • layers.sparse_column_with_hash_bucket - For categorical columns with many values, specify hash_bucket_size
    • layers.real_valued_column - continuous base columns.DEEP columns
    • layers.bucketized_column - Continuous columns can be converted to categorical via bucketization boundaries list
    • layers.crossed_column - WIDE columns - Interactions between different categorical features
    • layers.embedding_column - DEEP columns - specify dimension=embedding_size
  • returns a DNNCombinedLinearClassifier - ctor params:
    • model_dir
    • linear_feature_columns=wide_columns
    • dnn_feature_columns=deep_columns
    • dnn_hidden_units


  • Builds the input subgraph for prediction - returns a tf.contrib.learn.input_fn_utils.InputFnOps, a named tuple consisting of:
    • features - dict of features to be passed to the Estimator
    • labels - None for predictions
    • inputs - dict of tf.placeholder for model input fields


  • Generates an input function for training or evaluation.
  • constructs a filenamequeue using tf.train.string_input_producer and uses tf.TextLineReader read_up_to to read input rows by batch_size
  • tf.train.shuffle_batch - maintains a buffer for shuffling inputs between batches
  • Returns: A function () -> (features, indices)
    • features - a dict of Tensors
    • indices - a Tensor of label indices

Single Node Training

run same code locally and on Cloud ML Engine.

Using local machine python

export TRAIN_STEPS=1000
export OUTPUT_DIR=census_output
rm -rf $OUTPUT_DIR
python trainer/ --train-files $CENSUS_DATA/$TRAIN_FILE \
                       --eval-files $CENSUS_DATA/$EVAL_FILE \
                       --job-dir $OUTPUT_DIR \
                       --train-steps $TRAIN_STEPS

Using gcloud local

mock running it on the cloud:

export TRAIN_STEPS=1000
export OUTPUT_DIR=census_output
rm -rf $OUTPUT_DIR
gcloud ml-engine local train --package-path trainer \
                           --module-name trainer.task \
                           -- \
                           --train-files $CENSUS_DATA/$TRAIN_FILE \
                           --eval-files $CENSUS_DATA/$EVAL_FILE \
                           --job-dir $OUTPUT_DIR \
                           --train-steps $TRAIN_STEPS

Setup GC ML-Engine + Bucket

export ML_BUCKET=gs://josh-machine-learning
gsutil mb $ML_BUCKET

gcloud ml-engine init-project

export SVCACCT=cloud-ml-service@${GCP_PROJECT}
gsutil acl ch -u $SVCACCT:WRITE $ML_BUCKET

Using Cloud ML Engine

--job-dir comes before -- while training on the cloud --> different trial runs during Hyperparameter tuning.

export GCS_JOB_DIR=gs://<my-bucket>/path/to/my/jobs/job3
export JOB_NAME=census
export TRAIN_STEPS=1000
gcloud ml-engine jobs submit training $JOB_NAME \
                                    --runtime-version 1.0 \
                                    --job-dir $GCS_JOB_DIR \
                                    --module-name trainer.task \
                                    --package-path trainer/ \
                                    --region us-central1 \
                                    -- \
                                    --train-files $TRAIN_GCS_FILE \
                                    --eval-files $EVAL_GCS_FILE \
                                    --train-steps $TRAIN_STEPS


inspect the details about the graph.

tensorboard --logdir=$GCS_JOB_DIR
  • Accuracy and Output - approx accuracy close to 80%.

Distributed Node Training

uses Distributed TensorFlow TF_CONFIG environment variable. - generated using gcloud and parsed to create a ClusterSpec. specify ScaleTier for predefined tiers

Using gcloud local

Run the distributed training code locally

export TRAIN_STEPS=1000
export TRAIN_STEPS=500
export OUTPUT_DIR=census_output
rm -rf $OUTPUT_DIR
gcloud ml-engine local train --package-path trainer \
                           --module-name trainer.task \
                           --parameter-server-count $PS_SERVER_COUNT \
                           --worker-count $WORKER_COUNT \
                           --distributed \
                           -- \
                           --train-files $CENSUS_DATA/$TRAIN_FILE \
                           --eval-files $CENSUS_DATA/$EVAL_FILE \
                           --train-steps $TRAIN_STEPS \
                           --job-dir $OUTPUT_DIR

Using Cloud ML Engine

Run the distributed training job

export GCS_JOB_DIR=gs://<my-bucket>/path/to/my/models/run3
export JOB_NAME=census
export TRAIN_STEPS=1000
gcloud ml-engine jobs submit training $JOB_NAME \
                                    --scale-tier $SCALE_TIER \
                                    --runtime-version 1.0 \
                                    --job-dir $GCS_JOB_DIR \
                                    --module-name trainer.task \
                                    --package-path trainer/ \
                                    --region us-central1 \
                                    -- \
                                    --train-files $TRAIN_GCS_FILE \
                                    --eval-files $EVAL_GCS_FILE \
                                    --train-steps $TRAIN_STEPS

Hyperparameter Tuning

find out the most optimal hyperparameters. (

Running Hyperparameter Job

specify hyperparameter tuning yaml file:

    goal: MAXIMIZE
    hyperparameterMetricTag: accuracy
    maxTrials: 4
    maxParallelTrials: 2
      - parameterName: first-layer-size
        type: INTEGER
        minValue: 50
        maxValue: 500
        scaleType: UNIT_LINEAR_SCALE
      - parameterName: num-layers
        type: INTEGER
        minValue: 1
        maxValue: 15
        scaleType: UNIT_LINEAR_SCALE
      - parameterName: scale-factor
        type: DOUBLE
        minValue: 0.1
        maxValue: 1.0
        scaleType: UNIT_REVERSE_LOG_SCALE

add the --config argument.

export HPTUNING_CONFIG=hptuning_config.yaml
export JOB_NAME=census
export TRAIN_STEPS=1000
gcloud ml-engine jobs submit training $JOB_NAME \
                                    --scale-tier $SCALE_TIER \
                                    --runtime-version 1.0 \
                                    --config $HPTUNING_CONFIG \
                                    --job-dir $GCS_JOB_DIR \
                                    --module-name trainer.task \
                                    --package-path trainer/ \
                                    --region us-central1 \
                                    -- \
                                    --train-files $TRAIN_GCS_FILE \
                                    --eval-files $EVAL_GCS_FILE \
                                    --train-steps $TRAIN_STEPS

run the Tensorboard command to see results of different runs and compare accuracy / auroc numbers:

tensorboard --logdir=$GCS_JOB_DIR

Run Predictions

Deploy a Prediction Service

Once training job has finished, use exported model to create a prediction server. first create a model:

gcloud ml-engine models create census --regions us-central1

from GCS path of exported trained model binaries :

gsutil ls -r $GCS_JOB_DIR/export

a directory named $GCS_JOB_DIR/export/Servo/<timestamp>.

export MODEL_BINARIES=$GCS_JOB_DIR/export/Servo/<timestamp>
gcloud ml-engine versions create v1 --model census --origin $MODEL_BINARIES --runtime-version 1.0

Run Online Predictions

can now send prediction requests to the API.

gcloud ml-engine predict --model census --version v1 --json-instances ../test.json

see a response with the predicted labels of the examples:

How to interpret results ? {"probabilities": [0.9962924122810364, 0.003707568161189556], "logits": [-5.593664646148682], "classes": 0, "logistic": [0.003707568161189556]}

probabilities: are the probabilities of < $50K vs >=$50K. classes: the predicted class (0, i.e. < $50K) logits: ln(p/(1-p)) = ln(0.00371/(1-.00371)) = -5.593 logistic: 1/(1+exp(-logit)) = 1/(1+exp(5.593)) = 0.0037

Run Batch Prediction

for large amounts of data + no latency requirements on receiving prediction results submit a prediction job to the API. requires data be stored in GCS.

export JOB_NAME=census_prediction
gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model census \
    --version v1 \
    --data-format TEXT \
    --region us-central1 \
    --input-paths gs://cloudml-public/testdata/prediction/census.json \
    --output-path $GCS_JOB_DIR/predictions

Check status of prediction job:

gcloud ml-engine jobs describe $JOB_NAME

After job is SUCCEEDED , check results in --output-path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment