-
-
Save yundai424/560bd5acc7323a3ae00c0145ef7da33b to your computer and use it in GitHub Desktop.
DeepSpeed #5059
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import deepspeed.comm as dist | |
import deepspeed | |
from deepspeed.runtime.zero.config import DeepSpeedZeroConfig | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from torch.utils.data import DataLoader | |
import numpy as np | |
import pytest | |
class TestZeroPPConvergence(DistributedTest): | |
world_size = 16 | |
def load_and_prepare_data(self, model_name): | |
"""Load model, tokenizer and dataset, and prepare data loader.""" | |
from datasets import load_dataset | |
# Load model and tokenizer | |
model = AutoModelForCausalLM.from_pretrained(model_name).to(torch.bfloat16) | |
model.gradient_checkpointing_enable() | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
tokenizer.pad_token = tokenizer.eos_token | |
# Load and tokenize dataset | |
dataset = load_dataset("wikitext", 'wikitext-103-raw-v1', split='train[:1%]').filter(lambda x: x['text']) | |
def tokenize_function(examples): | |
# Tokenize and ensure 'labels' are the same as 'input_ids' | |
tokenized_output = tokenizer(examples["text"], padding="longest", truncation=True, return_tensors='pt', max_length=256) | |
tokenized_output["labels"] = tokenized_output["input_ids"].clone() | |
return tokenized_output | |
tokenized_dataset = dataset.map(tokenize_function, batched=True) | |
tokenized_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels']) | |
# Create data loader | |
data_loader = DataLoader(tokenized_dataset, batch_size=64, shuffle=False) | |
return model, data_loader | |
def get_loss(self, model, data_loader, config_dict, step=500): | |
"""Train the model and calculate average loss.""" | |
# Initialize DeepSpeed | |
model, _, _, _ = deepspeed.initialize(model=model, model_parameters=model.parameters(), config=config_dict, dist_init_required=True) | |
dist.barrier() | |
model.train() | |
# Training loop | |
losses = [] | |
for n, batch in enumerate(data_loader): | |
if n >= step: | |
break | |
batch = {k: v.to(model.device) for k, v in batch.items()} | |
outputs = model(**batch) | |
loss = outputs.loss | |
# if torch.distributed.get_rank() == 0: | |
# print(f"loss: {loss}") | |
model.backward(loss) | |
model.step() | |
losses.append(loss.item()) | |
return np.nanmean(losses[-100:]) | |
def get_config_dict(self, use_quantized_weights=False, use_hpz=False): | |
"""Generate the configuration dictionary for DeepSpeed.""" | |
config = { | |
"train_micro_batch_size_per_gpu": 1, | |
"zero_optimization": { | |
"stage": 3, | |
"stage3_max_reuse_distance": 0, | |
"contiguous_gradients": True, | |
"overlap_comm": True, | |
}, | |
"optimizer": { | |
"type": "Adam", | |
"params": { | |
"lr": 1e-5 | |
} | |
}, | |
"bf16": { | |
"enabled": True | |
} | |
} | |
if use_quantized_weights: | |
config["zero_optimization"]["zero_quantized_weights"] = True | |
if use_hpz: | |
config["zero_optimization"]["zero_hpz_partition_size"] = self.world_size // 2 | |
return config | |
def test(self, model_name): | |
torch.manual_seed(0) | |
model, data_loader = self.load_and_prepare_data(model_name) | |
zeropp_loss = self.get_loss(model, data_loader, self.get_config_dict(use_quantized_weights=False, use_hpz=True)) | |
model, data_loader = self.load_and_prepare_data(model_name) | |
baseline_loss = self.get_loss(model, data_loader, self.get_config_dict()) | |
# Output and assert | |
print(f"zeropp_loss={zeropp_loss}, baseline_loss={baseline_loss}") | |
assert zeropp_loss < baseline_loss * 1.1, f"zeropp_loss={zeropp_loss}, baseline_loss={baseline_loss}" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment