We introduce low-level TensorFlow and work our way through the necessary concepts and APIs so as to be able to write distributed machine learning models. Given a TensorFlow model, we explain how to scale out the training of that model and offer high-performance predictions using Cloud Machine Learning Engine.
Course Objectives:
- Create machine learning models in TensorFlow
- Use the TensorFlow libraries to solve numerical problems
- Troubleshoot and debug common TensorFlow code pitfalls
- Use tf.estimator to create, train, and evaluate an ML model
- Train, deploy, and productionalize ML models at scale with Cloud ML Engine
Labs and Demos: Lab: Training Data Analyst
- TensorFlow is an open source, high performance, library for numerical computation
- A
tensor
is an N-dimentional array of data
- A
- Operations are represented as DAG
- Edges are arrays of data and Nodes are mathematical operations
- Graphs are portable between different devices
- TensorFlow Lite for smaller, less powerful devices and models
- Modfels may be fine-tuned on those devices
- TF toolkit hierarchy
- Hardware -- cpu, gpu, tpu
- Core -- C++ and Python
- Components -- tf.layers, tf.losses, tf.metrics
- High-level API -- tf.estimator
- Lazy evaluation and eager execution
- Lazy evaluation allows for a lot of flexibility and optimization when you're running the graph
- DAG can be remotely executed and assigned to devices
- The
session
class represents the connection between the Python program that we write, and the C++ runtime
import tensorflow as tf
c = tf.add(a, b)
with tf.Session() as session:
print(sess.run(c))
# from tensorflow.contrib.eager.python import tfe
# tfe.enable_eager_execution()
tf.enable_eager_execution()
c = tf.add(a, b)
- Write out the graph with
tf.summary.FileWriter
- Name the tensors and the operations
- The graph can be visualized in TensorBoard
- TensorBoard can run in CloudShell
- Tensors, variables, constants
- Tensors can be sliced, reshaped
- Variable is initialized and then changed as the program runs
- Placeholder allows to feed in values, such as by reading from a text file
Lab: 03_tensorflow/a_tfstart.ipynb
def compute_area(sides):
# slice the input to get the sides
a = sides[:,0] # 5.0, 2.3
b = sides[:,1] # 3.0, 4.1
c = sides[:,2] # 7.1, 4.8
# Heron's formula
s = (a + b + c) * 0.5 # (a + b) is a short-cut to tf.add(a, b)
# (a * b) is a short-cut to tf.multiply(a, b), not tf.matmul(a, b)
areasq = s * (s - a) * (s - b) * (s - c)
return tf.sqrt(areasq)
with tf.Session() as sess:
sides = tf.placeholder(tf.float32, shape=(None, 3)) # batchsize number of triangles, 3 sides
area = compute_area(sides)
result = sess.run(area, feed_dict = {
sides: [
[5.0, 3.0, 7.1],
[2.3, 4.1, 4.8]
]
})
print(result)
tf.enable_eager_execution()
area = compute_area(tf.constant([
[5.0, 3.0, 7.1],
[2.3, 4.1, 4.8]
]))
print(area)
- Debugging TensorFlow programs
- Read the error messages to understand the problem
- Fix shape problems
tf.expand_dims(t, 1)
changes the shape by inserting a dimension of 1 into a tensor shapetf.slice(t, start, size)
is a way of extracting part of a tensortf.seqeeze(x)
is inverse operation to expand dims
- Data type problems
tf.cast(t, dtype)
- Debugging full-blown programs with
tf.logging
,tf.Print()
,tfdbg
andTensorBoard
Demo: 03_tensorflow/debug_demo.ipynb
In this module we will walk you through the Estimator API.
- High-level API to wrap up a large amount of boilerplate code
- Quickly build models
- Provide checkpointing
- Can handle out-of-memory datasets
- Used to train, evaluate and monitor
- Supports distributed training
- Allows hyper-parameter tuning
- Serve predictions in production environment
tf.estimator.Estimator
- LinearRegressor
- DNNRegressor
- DNNLinerCombinedRegressor
- LinerClassifier
- And more...
featcols = [
tf.feature_columns.numeric_column("sq_footage"),
tf.feature_columns.categorical_column_with_vocabulary_list(
"type", ["house", "apt"])
]
model = tf.estimator.LineralRegressor(featcols)
model.train(train_input_fn, steps=100)
model.predict(predict_input_fn)
model2 = tf.estimator.DNNRegressor(
featcols, hidden_units=[3,2], activation_fb=tf.nn.relu, dropout=0.2, optimizer="Adam"
)
-
Training on in-memory datasets
tf.estimator.inputs.numpy_input_fn
tf.estimator.inputs.pandas_input_fn
-
Train on large datasets with
Dataset API
- Datasets help create input_fn's for Estimators
- Data is sharded into multiple files
- Supports many file formats -- TextLineDataset, TFRecordDataset, FixedLengthRecordDataset
def decode_line(row):
cols = tf.decode_csv(row, record_defaults=[[0],['house'],[0]])
features = {'sq_footage': cols[0], 'type': cols[1]}
label = cols[2] # price
return features, label
# Read one csv file
dataset = tf.data.TextLineDataset("train_1.csv").map(decode_line)
# Read a set of sharded CSV files
dataset = tf.data.Dataset.list_files("train.csv-*") \
.flat_map(tf.data.TextLineDataset) \
.map(decode_line)
dataset = dataset.shuffle(1000).repeat(15).batch(128)
def input_fn():
features, label = dataset.make_one_shot_iterator().get_next()
return features, label
model.train(input_fn)
All the tf. commands that you write in Python do not actually process any data, they just build graphs
estimator.train_and_evaluate
is the preferred method for training real-world models- run-config tells the estimator where and how often to write Checkpoints and Tensorboard logs
- train spec tells the estimator how to get training data
- eval spec controls the evaluation and the checkpointing of the model since they happen at the same time
run_config = tf.estimator.RunConfig(model_dir=output_dir, ...)
estimator = tf.estimator.LinearRegressor(featcols, config=run_config)
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=1000)
export_latest = tf.estimator.LatestExporter(serving_input_receiver_fn=serving_input_fn)
eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn, exporters=export_latest)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
- Monitoring with TensorBoard
- Point Tensorboard to your output directory and the dashboards appear in your browser at localhost:6006
- Pre-made Estimators export relevant metrics, embeddings, histograms, etc. for TensorBoard
- Add summaries for Tensorboard with a single line
- TensorBoard has a suite of visualization tools to explore and explain models and results
- Serving and training-time inputs are often very different
- Serving input function transforms from parsed JSON data to the data your model expects
- Serving input function that decodes JPEGs
def serving_input_fn():
json = { 'sq_footage': tf.placeholder(tf.int32, [None]),
'prop_type': tf.placeholder(tf.string, [None])
}
# … transformations ...
features = { 'sq_footage': json['sq_footage'],
'type': json['prop_type'],
}
return tf.estimator.export.ServingInputReceiver(features, json)
def serving_input_fn():
json = {'jpeg_bytes': tf.placeholder(tf.string, [None])}
def decode(jpeg):
pixels = tf.image.decode_jpeg(jpeg, channels=3)
return pixels
pics = tf.map_fn(decode, json['jpeg_bytes'], dtype=tf.uint8)
features = {'pics': pics}
return tf.estimator.export.ServingInputReceiver(features, json)
Labs:
- As data size increases, batching and distribution become important
- Cloud Machine Learning Engine (CMLE) - repeatable, scalable, tuned
- Input necessary transformations
- Hyperparameter tuning
- Autoscale prediction code
- Training your model with CMLE
- Create computation graph and training application with TF
- Package the trainer application
- Configure and start a Cloud ML Engine job
- Monitor training jobs with gcloud and TensorBoard
- Deploy a model to GCP and use it for predictions
> gcloud ml-engine local train \
--module-name=trainer.task \
--package-path=/somedir/taxifare/trainer \
-- \
--train_data_paths="/somedir/datasets/*train*" \
--eval_data_paths=/somedir/datasets/*valid* \
--output_dir=/somedir/output \
--train_steps=100 --job-dir=/tmp
> gcloud ml-engine jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--job-dir=$OUTDIR --staging-bucket=gs://$BUCKET \
--scale-tier=BASIC \
REST as before
> MODEL_NAME="taxifare"
> MODEL_VERSION="v1"
> MODEL_LOCATION="gs://${BUCKET}/taxifare/smallinput/taxi_trained/export/Servo/.../"
> gcloud ml-engine models create ${MODEL_NAME} --regions $REGION
> gcloud ml-engine versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin
> ${MODEL_LOCATION} --runtime-version 1.4
credentials = GoogleCredentials.get_application_default()
api = discovery.build('ml', 'v1', credentials=credentials)
request_data = [
{'pickup_longitude': -73.885262,
'pickup_latitude': 40.773008,
'dropoff_longitude': -73.987232,
'dropoff_latitude': 40.732403,
'passenger_count': 2}]
parent = 'projects/%s/models/%s/versions/%s' % ('cloud-training-demos', 'taxifare', 'v1')
response = api.projects().predict(body={'instances': request_data}, name=parent).execute()
Labs:
Create a neural network that is capable of finding the volume of a cylinder given the radius of its base (r) and its height (h). Assume that the radius and height of the cylinder are both in the range 0.5 to 2.0. Simulate the necessary training dataset.
Hint:
The input features will be r and h and the label will be 𝜋𝑟2ℎ Create random values for r and h and compute V. Your dataset will consist of r, h and V. Then, use a DNN regressor. Make sure to generate enough data.
import math
N = 2000
T = 200
E = 100
r = 0.5 + np.random.ranf(N+T+E) * 1.5
h = 0.5 + np.random.ranf(N+T+E) * 1.5
V = math.pi * r * r * h
def cylinder_train_fn(r, h, V, num_epochs):
return tf.estimator.inputs.numpy_input_fn(
x = { "radius": r, "height": h },
y = V,
batch_size = 64,
num_epochs=num_epochs,
shuffle=True
)
def cylinder_valid_fn(r, h, V):
return tf.estimator.inputs.numpy_input_fn(
x = { "radius": r, "height": h },
y = V,
shuffle=True
)
def cylinder_test_fn(r, h):
return tf.estimator.inputs.numpy_input_fn(
x = { "radius": r, "height": h },
shuffle=True
)
feat_columns_cyl = [
tf.feature_column.numeric_column("radius"),
tf.feature_column.numeric_column("height")
]
model_cyl = tf.estimator.DNNRegressor(
feature_columns=feat_columns_cyl,
hidden_units=[36, 12, 4]
)
model_cyl.train(cylinder_train_fn(r[:N], h[:N], V[:N], 20))
def print_rmse(model, fn):
metrics = model.evaluate(input_fn = fn)
print('RMSE on dataset = {}'.format(np.sqrt(metrics['average_loss'])))
print_rmse(model_cyl, cylinder_valid_fn(r[N:N+T], h[N:N+T], V[N:N+T]))
for pr in model_cyl.predict(cylinder_test_fn(r[N+T:], h[N+T:])):
print(pr)