Skip to content

Instantly share code, notes, and snippets.

@ChaiBapchya
Last active May 10, 2021 17:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ChaiBapchya/03c88c70bd8e003585e7edde436b403d to your computer and use it in GitHub Desktop.
Save ChaiBapchya/03c88c70bd8e003585e7edde436b403d to your computer and use it in GitHub Desktop.
Sagemaker + Huggingface + TF + HVD
import sagemaker
from sagemaker.huggingface import HuggingFace
# hyperparameters, which are passed into the training job
hyperparameters={
'epochs': 1,
'train_batch_size': 2,
'model_name':'bert-large-uncased-whole-word-masking'
}
role = sagemaker.get_execution_role()
# configuration for running training on smdistributed Data Parallel
#distribution = {'smdistributed':{'dataparallel':{ 'enabled': True }}}
distribution={'mpi': {'enabled':True,"custom_mpi_options":"-verbose --NCCL_DEBUG=INFO -x RDMAV_FORK_SAFE=1"}}
# instance configurations
instance_type='ml.p3dn.24xlarge'
instance_count=2
#volume_size=200
huggingface_estimator = HuggingFace(
entry_point='train.py',
source_dir='.',
instance_type=instance_type,
role=role,
instance_count=instance_count,
transformers_version='4.5.0',
tensorflow_version='2.4.1',
py_version='py37',
distribution=distribution,
hyperparameters=hyperparameters,
base_job_name='hf-tf-bert-'+str(instance_count)+'node-'+instance_type.replace('.','-'),
debugger_hook_config=False, # currently needed
)
huggingface_estimator.fit()
import argparse
import logging
import os
import sys
import tensorflow as tf
from datasets import load_dataset
from tqdm import tqdm
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification
from transformers.file_utils import is_sagemaker_dp_enabled
if os.environ.get("SDP_ENABLED") or is_sagemaker_dp_enabled():
SDP_ENABLED = True
os.environ["SAGEMAKER_INSTANCE_TYPE"] = "p3dn.24xlarge"
import smdistributed.dataparallel.tensorflow as sdp
else:
SDP_ENABLED = True
import horovod.tensorflow as sdp
def fit(model, loss, opt, train_dataset, epochs, train_batch_size, max_steps=None):
pbar = tqdm(train_dataset)
for i, batch in enumerate(pbar):
with tf.GradientTape() as tape:
inputs, targets = batch
outputs = model(batch)
loss_value = loss(targets, outputs.logits)
if SDP_ENABLED:
tape = sdp.DistributedGradientTape(tape, sparse_as_dense=True)
grads = tape.gradient(loss_value, model.trainable_variables)
opt.apply_gradients(zip(grads, model.trainable_variables))
pbar.set_description(f"Loss: {loss_value:.4f}")
if SDP_ENABLED:
if i == 0:
sdp.broadcast_variables(model.variables, root_rank=0)
sdp.broadcast_variables(opt.variables(), root_rank=0)
first_batch = False
if max_steps and i >= max_steps:
break
train_results = {"loss": loss_value.numpy()}
return train_results
def get_datasets():
# Load dataset
train_dataset, test_dataset = load_dataset("imdb", split=["train", "test"])
# Preprocess train dataset
train_dataset = train_dataset.map(
lambda e: tokenizer(e["text"], truncation=True, padding="max_length"), batched=True
)
train_dataset.set_format(type="tensorflow", columns=["input_ids", "attention_mask", "label"])
train_features = {
x: train_dataset[x].to_tensor(default_value=0, shape=[None, tokenizer.model_max_length])
for x in ["input_ids", "attention_mask"]
}
tf_train_dataset = tf.data.Dataset.from_tensor_slices((train_features, train_dataset["label"]))
# Preprocess test dataset
test_dataset = test_dataset.map(
lambda e: tokenizer(e["text"], truncation=True, padding="max_length"), batched=True
)
test_dataset.set_format(type="tensorflow", columns=["input_ids", "attention_mask", "label"])
test_features = {
x: test_dataset[x].to_tensor(default_value=0, shape=[None, tokenizer.model_max_length])
for x in ["input_ids", "attention_mask"]
}
tf_test_dataset = tf.data.Dataset.from_tensor_slices((test_features, test_dataset["label"]))
if SDP_ENABLED:
tf_train_dataset = tf_train_dataset.shard(sdp.size(), sdp.rank())
tf_test_dataset = tf_test_dataset.shard(sdp.size(), sdp.rank())
tf_train_dataset = tf_train_dataset.batch(args.train_batch_size, drop_remainder=True)
tf_test_dataset = tf_test_dataset.batch(args.eval_batch_size, drop_remainder=True)
return tf_train_dataset, tf_test_dataset
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# Hyperparameters sent by the client are passed as command-line arguments to the script.
parser.add_argument("--epochs", type=int, default=3)
parser.add_argument("--train-batch-size", type=int, default=16)
parser.add_argument("--eval-batch-size", type=int, default=8)
parser.add_argument("--model_name", type=str)
parser.add_argument("--learning_rate", type=str, default=5e-5)
parser.add_argument("--do_train", type=bool, default=True)
parser.add_argument("--do_eval", type=bool, default=True)
# Data, model, and output directories
parser.add_argument("--output_data_dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
parser.add_argument("--model_dir", type=str, default=os.environ["SM_MODEL_DIR"])
parser.add_argument("--n_gpus", type=str, default=os.environ["SM_NUM_GPUS"])
args, _ = parser.parse_known_args()
# Set up logging
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.getLevelName("INFO"),
handlers=[logging.StreamHandler(sys.stdout)],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
if SDP_ENABLED:
sdp.init()
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], "GPU")
# Load model and tokenizer
model = TFAutoModelForSequenceClassification.from_pretrained(args.model_name)
tokenizer = AutoTokenizer.from_pretrained(args.model_name)
# get datasets
tf_train_dataset, tf_test_dataset = get_datasets()
# fine optimizer and loss
optimizer = tf.keras.optimizers.Adam(learning_rate=args.learning_rate)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
# Training
if args.do_train:
# train_results = model.fit(tf_train_dataset, epochs=args.epochs, batch_size=args.train_batch_size)
train_results = fit(
model, loss, optimizer, tf_train_dataset, args.epochs, args.train_batch_size, max_steps=None
)
logger.info("*** Train ***")
output_eval_file = os.path.join(args.output_data_dir, "train_results.txt")
if not SDP_ENABLED or sdp.rank() == 0:
with open(output_eval_file, "w") as writer:
logger.info("***** Train results *****")
logger.info(train_results)
for key, value in train_results.items():
logger.info(" %s = %s", key, value)
writer.write("%s = %s\n" % (key, value))
# Evaluation
if args.do_eval and (not SDP_ENABLED or sdp.rank() == 0):
result = model.evaluate(tf_test_dataset, batch_size=args.eval_batch_size, return_dict=True)
logger.info("*** Evaluate ***")
output_eval_file = os.path.join(args.output_data_dir, "eval_results.txt")
with open(output_eval_file, "w") as writer:
logger.info("***** Eval results *****")
logger.info(result)
for key, value in result.items():
logger.info(" %s = %s", key, value)
writer.write("%s = %s\n" % (key, value))
# Save result
if SDP_ENABLED:
if sdp.rank() == 0:
model.save_pretrained(args.model_dir)
tokenizer.save_pretrained(args.model_dir)
else:
model.save_pretrained(args.model_dir)
tokenizer.save_pretrained(args.model_dir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment