Distributed TensorFlow Pipeline using Google Cloud Machine Learning Engine

Wide and Deep Models - combine Linear Model key feature memorization & DNN generalization: tf.contrib.learn.DNNLinearCombinedClassifier

problem: Predicting Income with the Census Income Dataset - Given census data about a person such as age, gender, education and occupation (the features), we will try to predict whether or not the person earns more than 50,000 dollars a year (the target label)

50000 records

Our GCP ML-Engine Console:$GCP_PROJECT



  • Construct and train a Wide & Deep TensorFlow Deep Learning Model use the high level tf.contrib.learn.Estimator API.
  • Specify a pipeline for staged evaluation: from single-worker training to distributed training without any code changes
  • Leverage Google Cloud Machine Learning Engine - run training jobs & export model binaries for prediction

Download the data

Census Income Data Set by the UC Irvine Machine Learning Repository. hosted on Google Cloud Storage:

  • Training file is
  • Evaluation file is adult.test.csv

Run Exports

set up environment

export CENSUS_DATA=census_data
export EVAL_FILE=adult.test.csv

export TRAIN_GCS_FILE=gs://cloudml-public/census/data/$TRAIN_FILE
export EVAL_GCS_FILE=gs://cloudml-public/census/data/$EVAL_FILE


Virtual environment

allows running without changing global python packages on your system. Install Miniconda

  • Create conda environment conda create --name single-tf python=2.7
  • Activate env source activate single-tf

Install dependencies

code analysis

learn_runner creates an Experiment which executes model code (Estimator and input functions) → generate_experiment_fn returns experiment_fn returns Experiment (

  • model.build_estimator returns DNNLinearCombinedClassifier (model_dir,wide_columns,deep_columns,hidden_units) ,
  • model.generate_input_fn x2 returns input_fn -> (features: Dict[Tensors], indices: Tensor[label indices]),
  • model.serving_input_fn returns InputFnOps(features,None,feature_placeholders ) )


  • parse arguments with argparse.ArgumentParser - add them with add_argument and extract a dict with parse_args
    • train-files - GCS or local path to training data
    • num-epochs
    • train-batch-size - default=40
    • eval-batch-size - default=40
    • train-steps - this or num-epochs required
    • eval-files - GCS or local path to test data
    • embedding-size - #embedding dimensions for categorical columns. default=8
    • first-layer-size - #nodes in 1st layer of DNN. default=100
    • num-layers - default=4
    • scale-factor - How quickly layer size should decay. default=0.7
    • job-dir - GCS location to write checkpoints and export models
    • verbose-logging
    • eval-delay-secs - Experiment arg: How long to wait before running first evaluation. default=1
    • min-eval-frequency - Experiment arg: Minimum number of training steps between evaluations. default=10
  • tensorflow.contrib.learn.python.learn.learn_runner runs the Experiment - run(experiment_fn, output_dir, schedule) - uses tf.learn.RunConfig to parse TF_CONFIG environment variables set by TF


  • Create an experiment function given hyperparameters - Returns: A function (output_dir) -> Experiment used by learn_runner to create an Experiment
  • the Experiment executes:
    • model.generate_input_fn functions to gather test & train inputs
    • returns Estimator - the ctor of this takes:
      • model.build_estimator - constructs the model topology
      • model.serving_input_fn - specifies export strategies to control the prediction graph structure


  • args:
    • model_dir - used by the Classifier for checkpoints summaries and exports.
    • embedding_size - #dimensions used to represent categorical features when input to the DNN.
    • hidden_units - DNN topology
  • leverage tensorflow.contrib.layers to ingest input data
    • layers.sparse_column_with_keys - For categorical columns with known values, specify keys: lists of values
    • layers.sparse_column_with_hash_bucket - For categorical columns with many values, specify hash_bucket_size
    • layers.real_valued_column - continuous base columns.DEEP columns
    • layers.bucketized_column - Continuous columns can be converted to categorical via bucketization boundaries list
    • layers.crossed_column - WIDE columns - Interactions between different categorical features
    • layers.embedding_column - DEEP columns - specify dimension=embedding_size
  • returns a DNNCombinedLinearClassifier - ctor params:
    • model_dir
    • linear_feature_columns=wide_columns
    • dnn_feature_columns=deep_columns
    • dnn_hidden_units


  • Builds the input subgraph for prediction - returns a tf.contrib.learn.input_fn_utils.InputFnOps, a named tuple consisting of:
    • features - dict of features to be passed to the Estimator
    • labels - None for predictions
    • inputs - dict of tf.placeholder for model input fields


  • Generates an input function for training or evaluation.
  • constructs a filenamequeue using tf.train.string_input_producer and uses tf.TextLineReader read_up_to to read input rows by batch_size
  • tf.train.shuffle_batch - maintains a buffer for shuffling inputs between batches
  • Returns: A function () -> (features, indices)
    • features - a dict of Tensors
    • indices - a Tensor of label indices

Single Node Training

run same code locally and on Cloud ML Engine.

Using local machine python

export TRAIN_STEPS=1000
export OUTPUT_DIR=census_output
rm -rf $OUTPUT_DIR
python trainer/ --train-files $CENSUS_DATA/$TRAIN_FILE \
                       --eval-files $CENSUS_DATA/$EVAL_FILE \
                       --job-dir $OUTPUT_DIR \
                       --train-steps $TRAIN_STEPS

Using gcloud local

mock running it on the cloud:

export TRAIN_STEPS=1000
export OUTPUT_DIR=census_output
rm -rf $OUTPUT_DIR
gcloud ml-engine local train --package-path trainer \
                           --module-name trainer.task \
                           -- \
                           --train-files $CENSUS_DATA/$TRAIN_FILE \
                           --eval-files $CENSUS_DATA/$EVAL_FILE \
                           --job-dir $OUTPUT_DIR \
                           --train-steps $TRAIN_STEPS

Setup GC ML-Engine + Bucket

export ML_BUCKET=gs://josh-machine-learning
gsutil mb $ML_BUCKET

gcloud ml-engine init-project

export SVCACCT=cloud-ml-service@${GCP_PROJECT}
gsutil acl ch -u $SVCACCT:WRITE $ML_BUCKET

Using Cloud ML Engine

--job-dir comes before -- while training on the cloud --> different trial runs during Hyperparameter tuning.

export GCS_JOB_DIR=gs://<my-bucket>/path/to/my/jobs/job3
export JOB_NAME=census
export TRAIN_STEPS=1000
gcloud ml-engine jobs submit training $JOB_NAME \
                                    --runtime-version 1.0 \
                                    --job-dir $GCS_JOB_DIR \
                                    --module-name trainer.task \
                                    --package-path trainer/ \
                                    --region us-central1 \
                                    -- \
                                    --train-files $TRAIN_GCS_FILE \
                                    --eval-files $EVAL_GCS_FILE \
                                    --train-steps $TRAIN_STEPS


inspect the details about the graph.

tensorboard --logdir=$GCS_JOB_DIR
  • Accuracy and Output - approx accuracy close to 80%.

Distributed Node Training

uses Distributed TensorFlow TF_CONFIG environment variable. - generated using gcloud and parsed to create a ClusterSpec. specify ScaleTier for predefined tiers

Using gcloud local

Run the distributed training code locally

export TRAIN_STEPS=1000
export TRAIN_STEPS=500
export OUTPUT_DIR=census_output
rm -rf $OUTPUT_DIR
gcloud ml-engine local train --package-path trainer \
                           --module-name trainer.task \
                           --parameter-server-count $PS_SERVER_COUNT \
                           --worker-count $WORKER_COUNT \
                           --distributed \
                           -- \
                           --train-files $CENSUS_DATA/$TRAIN_FILE \
                           --eval-files $CENSUS_DATA/$EVAL_FILE \
                           --train-steps $TRAIN_STEPS \
                           --job-dir $OUTPUT_DIR

Using Cloud ML Engine

Run the distributed training job

export GCS_JOB_DIR=gs://<my-bucket>/path/to/my/models/run3
export JOB_NAME=census
export TRAIN_STEPS=1000
gcloud ml-engine jobs submit training $JOB_NAME \
                                    --scale-tier $SCALE_TIER \
                                    --runtime-version 1.0 \
                                    --job-dir $GCS_JOB_DIR \
                                    --module-name trainer.task \
                                    --package-path trainer/ \
                                    --region us-central1 \
                                    -- \
                                    --train-files $TRAIN_GCS_FILE \
                                    --eval-files $EVAL_GCS_FILE \
                                    --train-steps $TRAIN_STEPS

Hyperparameter Tuning

find out the most optimal hyperparameters. (

Running Hyperparameter Job

specify hyperparameter tuning yaml file:

    goal: MAXIMIZE
    hyperparameterMetricTag: accuracy
    maxTrials: 4
    maxParallelTrials: 2
      - parameterName: first-layer-size
        type: INTEGER
        minValue: 50
        maxValue: 500
        scaleType: UNIT_LINEAR_SCALE
      - parameterName: num-layers
        type: INTEGER
        minValue: 1
        maxValue: 15
        scaleType: UNIT_LINEAR_SCALE
      - parameterName: scale-factor
        type: DOUBLE
        minValue: 0.1
        maxValue: 1.0
        scaleType: UNIT_REVERSE_LOG_SCALE

add the --config argument.

export HPTUNING_CONFIG=hptuning_config.yaml
export JOB_NAME=census
export TRAIN_STEPS=1000
gcloud ml-engine jobs submit training $JOB_NAME \
                                    --scale-tier $SCALE_TIER \
                                    --runtime-version 1.0 \
                                    --config $HPTUNING_CONFIG \
                                    --job-dir $GCS_JOB_DIR \
                                    --module-name trainer.task \
                                    --package-path trainer/ \
                                    --region us-central1 \
                                    -- \
                                    --train-files $TRAIN_GCS_FILE \
                                    --eval-files $EVAL_GCS_FILE \
                                    --train-steps $TRAIN_STEPS

run the Tensorboard command to see results of different runs and compare accuracy / auroc numbers:

tensorboard --logdir=$GCS_JOB_DIR

Run Predictions

Deploy a Prediction Service

Once training job has finished, use exported model to create a prediction server. first create a model:

gcloud ml-engine models create census --regions us-central1

from GCS path of exported trained model binaries :

gsutil ls -r $GCS_JOB_DIR/export

a directory named $GCS_JOB_DIR/export/Servo/<timestamp>.

export MODEL_BINARIES=$GCS_JOB_DIR/export/Servo/<timestamp>
gcloud ml-engine versions create v1 --model census --origin $MODEL_BINARIES --runtime-version 1.0

Run Online Predictions

can now send prediction requests to the API.

gcloud ml-engine predict --model census --version v1 --json-instances ../test.json

see a response with the predicted labels of the examples:

How to interpret results ? {"probabilities": [0.9962924122810364, 0.003707568161189556], "logits": [-5.593664646148682], "classes": 0, "logistic": [0.003707568161189556]}

probabilities: are the probabilities of < $50K vs >=$50K. classes: the predicted class (0, i.e. < $50K) logits: ln(p/(1-p)) = ln(0.00371/(1-.00371)) = -5.593 logistic: 1/(1+exp(-logit)) = 1/(1+exp(5.593)) = 0.0037

Run Batch Prediction

for large amounts of data + no latency requirements on receiving prediction results submit a prediction job to the API. requires data be stored in GCS.

export JOB_NAME=census_prediction
gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model census \
    --version v1 \
    --data-format TEXT \
    --region us-central1 \
    --input-paths gs://cloudml-public/testdata/prediction/census.json \
    --output-path $GCS_JOB_DIR/predictions

Check status of prediction job:

gcloud ml-engine jobs describe $JOB_NAME

After job is SUCCEEDED , check results in --output-path.

