Skip to content

Instantly share code, notes, and snippets.

@chck
Created November 22, 2019 03:26
Show Gist options
  • Save chck/011b87b82ba378878e11973965644c28 to your computer and use it in GitHub Desktop.
Save chck/011b87b82ba378878e11973965644c28 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Multi-worker training with Estimator\n",
"- https://www.tensorflow.org/tutorials/distribute/multi_worker_with_estimator"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# !pip3 install tensorflow_datasets"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow_datasets as tfds\n",
"import tensorflow as tf\n",
"tfds.disable_progress_bar()\n",
"\n",
"import os, json"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Input function"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"BUFFER_SIZE = 10000\n",
"BATCH_SIZE = 64\n",
"\n",
"def input_fn(mode, input_context=None):\n",
" datasets, info = tfds.load(\n",
" name='mnist',\n",
" with_info=True,\n",
" as_supervised=True)\n",
" mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else\n",
" datasets['test'])\n",
" \n",
" def scale(image, label):\n",
" image = tf.cast(image, tf.float32)\n",
" image /= 255\n",
" return image, label\n",
" \n",
" if input_context:\n",
" mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,\n",
" input_context.input_pipeline_id)\n",
" \n",
" return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Multi-worker configuration"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"os.environ['TF_CONFIG'] = json.dumps({\n",
" 'cluster': {\n",
" 'worker': ['localhost:12345', 'localhost:23456']\n",
" },\n",
" 'task': {'type': 'worker', 'index': 0}\n",
"})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define the model"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"LEARNING_RATE = 1e-4\n",
"layers = tf.keras.layers\n",
"\n",
"def model_fn(features, labels, mode):\n",
" model = tf.keras.Sequential([\n",
" layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),\n",
" layers.MaxPooling2D(),\n",
" layers.Flatten(),\n",
" layers.Dense(64, activation='relu'),\n",
" layers.Dense(10),\n",
" ])\n",
" logits = model(features, training=False)\n",
" \n",
" if mode == tf.estimator.ModeKeys.PREDICT:\n",
" predictions = {'logits': logits}\n",
" return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)\n",
" \n",
" optimizer = tf.compat.v1.train.GradientDescentOptimizer(learning_rate=LEARNING_RATE)\n",
" loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)\n",
" loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)\n",
" if mode == tf.estimator.ModeKeys.EVAL:\n",
" return tf.estimator.EstimatorSpec(mode, loss=loss)\n",
" \n",
" return tf.estimator.EstimatorSpec(\n",
" mode=mode,\n",
" loss=loss,\n",
" train_op=optimizer.minimize(loss, tf.compat.v1.train.get_or_create_global_step()))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## MultiWorkerMirroredStrategy"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Enabled multi-worker collective ops with available devices: ['/job:worker/replica:0/task:0/device:CPU:0', '/job:worker/replica:0/task:0/device:XLA_CPU:0']\n",
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n"
]
}
],
"source": [
"strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train and evaluate the model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:TF_CONFIG environment variable: {'cluster': {'worker': ['localhost:12345', 'localhost:23456']}, 'task': {'type': 'worker', 'index': 0}}\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:TF_CONFIG environment variable: {'cluster': {'worker': ['localhost:12345', 'localhost:23456']}, 'task': {'type': 'worker', 'index': 0}}\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Initializing RunConfig with distribution strategies.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Initializing RunConfig with distribution strategies.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:RunConfig initialized for Distribute Coordinator with INDEPENDENT_WORKER mode\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:RunConfig initialized for Distribute Coordinator with INDEPENDENT_WORKER mode\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true\n",
"graph_options {\n",
" rewrite_options {\n",
" meta_optimizer_iterations: ONE\n",
" }\n",
"}\n",
", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7fe134567e80>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fe13ddf85f8>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': 'independent_worker'}\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true\n",
"graph_options {\n",
" rewrite_options {\n",
" meta_optimizer_iterations: ONE\n",
" }\n",
"}\n",
", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7fe134567e80>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fe13ddf85f8>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': 'independent_worker'}\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Running `train_and_evaluate` with Distribute Coordinator.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Running `train_and_evaluate` with Distribute Coordinator.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Running Distribute Coordinator with mode = 'independent_worker', cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, environment = None, rpc_layer = 'grpc'\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Running Distribute Coordinator with mode = 'independent_worker', cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, environment = None, rpc_layer = 'grpc'\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Updated config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true\n",
"graph_options {\n",
" rewrite_options {\n",
" meta_optimizer_iterations: ONE\n",
" }\n",
"}\n",
", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7fe13457d588>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fe13457b400>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': 'grpc://localhost:12345', '_evaluation_master': 'grpc://localhost:12345', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 2, '_distribute_coordinator_mode': 'independent_worker'}\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Updated config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true\n",
"graph_options {\n",
" rewrite_options {\n",
" meta_optimizer_iterations: ONE\n",
" }\n",
"}\n",
", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7fe13457d588>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fe13457b400>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': 'grpc://localhost:12345', '_evaluation_master': 'grpc://localhost:12345', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 2, '_distribute_coordinator_mode': 'independent_worker'}\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Calling model_fn.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Calling model_fn.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Collective batch_all_reduce: 6 all-reduces, num_workers = 2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Collective batch_all_reduce: 6 all-reduces, num_workers = 2\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Done calling model_fn.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Done calling model_fn.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_workers = 2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_workers = 2\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING:tensorflow:Collective ops may deadlock with `save_checkpoints_secs` please use `save_checkpoint_steps` instead. Clearing `save_checkpoint_secs` and setting `save_checkpoint_steps` to 1000 now.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:tensorflow:Collective ops may deadlock with `save_checkpoints_secs` please use `save_checkpoint_steps` instead. Clearing `save_checkpoint_secs` and setting `save_checkpoint_steps` to 1000 now.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Create CheckpointSaverHook.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Create CheckpointSaverHook.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:all_hooks [<tensorflow_estimator.python.estimator.util.DistributedIteratorInitializerHook object at 0x7fe13457bb00>, <tensorflow.python.training.basic_session_run_hooks.NanTensorHook object at 0x7fe1340ddf60>, <tensorflow.python.training.basic_session_run_hooks.LoggingTensorHook object at 0x7fe13457ba90>, <tensorflow.python.training.basic_session_run_hooks.StepCounterHook object at 0x7fe13417a400>, <tensorflow.python.training.basic_session_run_hooks.SummarySaverHook object at 0x7fe134160940>, <tensorflow.python.training.basic_session_run_hooks.CheckpointSaverHook object at 0x7fe1341609e8>]\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:all_hooks [<tensorflow_estimator.python.estimator.util.DistributedIteratorInitializerHook object at 0x7fe13457bb00>, <tensorflow.python.training.basic_session_run_hooks.NanTensorHook object at 0x7fe1340ddf60>, <tensorflow.python.training.basic_session_run_hooks.LoggingTensorHook object at 0x7fe13457ba90>, <tensorflow.python.training.basic_session_run_hooks.StepCounterHook object at 0x7fe13417a400>, <tensorflow.python.training.basic_session_run_hooks.SummarySaverHook object at 0x7fe134160940>, <tensorflow.python.training.basic_session_run_hooks.CheckpointSaverHook object at 0x7fe1341609e8>]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Creating chief session creator with config: device_filters: \"/job:worker/task:0\"\n",
"allow_soft_placement: true\n",
"graph_options {\n",
" rewrite_options {\n",
" meta_optimizer_iterations: ONE\n",
" scoped_allocator_optimization: ON\n",
" scoped_allocator_opts {\n",
" enable_op: \"CollectiveReduce\"\n",
" }\n",
" }\n",
"}\n",
"experimental {\n",
" collective_group_leader: \"/job:worker/replica:0/task:0\"\n",
"}\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Creating chief session creator with config: device_filters: \"/job:worker/task:0\"\n",
"allow_soft_placement: true\n",
"graph_options {\n",
" rewrite_options {\n",
" meta_optimizer_iterations: ONE\n",
" scoped_allocator_optimization: ON\n",
" scoped_allocator_opts {\n",
" enable_op: \"CollectiveReduce\"\n",
" }\n",
" }\n",
"}\n",
"experimental {\n",
" collective_group_leader: \"/job:worker/replica:0/task:0\"\n",
"}\n",
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Graph was finalized.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Graph was finalized.\n"
]
}
],
"source": [
"config = tf.estimator.RunConfig(train_distribute=strategy)\n",
"\n",
"classifier = tf.estimator.Estimator(model_fn=model_fn, model_dir='/tmp/multiworker', config=config)\n",
"tf.estimator.train_and_evaluate(\n",
" classifier,\n",
" train_spec=tf.estimator.TrainSpec(input_fn=input_fn),\n",
" eval_spec=tf.estimator.EvalSpec(input_fn=input_fn),\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.1"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment