Skip to content

Instantly share code, notes, and snippets.

@yundai424
Last active February 2, 2024 18:40
Show Gist options
  • Save yundai424/560bd5acc7323a3ae00c0145ef7da33b to your computer and use it in GitHub Desktop.
Save yundai424/560bd5acc7323a3ae00c0145ef7da33b to your computer and use it in GitHub Desktop.
DeepSpeed #5059
import deepspeed.comm as dist
import deepspeed
from deepspeed.runtime.zero.config import DeepSpeedZeroConfig
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from torch.utils.data import DataLoader
import numpy as np
import pytest
class TestZeroPPConvergence(DistributedTest):
world_size = 16
def load_and_prepare_data(self, model_name):
"""Load model, tokenizer and dataset, and prepare data loader."""
from datasets import load_dataset
# Load model and tokenizer
model = AutoModelForCausalLM.from_pretrained(model_name).to(torch.bfloat16)
model.gradient_checkpointing_enable()
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
# Load and tokenize dataset
dataset = load_dataset("wikitext", 'wikitext-103-raw-v1', split='train[:1%]').filter(lambda x: x['text'])
def tokenize_function(examples):
# Tokenize and ensure 'labels' are the same as 'input_ids'
tokenized_output = tokenizer(examples["text"], padding="longest", truncation=True, return_tensors='pt', max_length=256)
tokenized_output["labels"] = tokenized_output["input_ids"].clone()
return tokenized_output
tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
# Create data loader
data_loader = DataLoader(tokenized_dataset, batch_size=64, shuffle=False)
return model, data_loader
def get_loss(self, model, data_loader, config_dict, step=500):
"""Train the model and calculate average loss."""
# Initialize DeepSpeed
model, _, _, _ = deepspeed.initialize(model=model, model_parameters=model.parameters(), config=config_dict, dist_init_required=True)
dist.barrier()
model.train()
# Training loop
losses = []
for n, batch in enumerate(data_loader):
if n >= step:
break
batch = {k: v.to(model.device) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
# if torch.distributed.get_rank() == 0:
# print(f"loss: {loss}")
model.backward(loss)
model.step()
losses.append(loss.item())
return np.nanmean(losses[-100:])
def get_config_dict(self, use_quantized_weights=False, use_hpz=False):
"""Generate the configuration dictionary for DeepSpeed."""
config = {
"train_micro_batch_size_per_gpu": 1,
"zero_optimization": {
"stage": 3,
"stage3_max_reuse_distance": 0,
"contiguous_gradients": True,
"overlap_comm": True,
},
"optimizer": {
"type": "Adam",
"params": {
"lr": 1e-5
}
},
"bf16": {
"enabled": True
}
}
if use_quantized_weights:
config["zero_optimization"]["zero_quantized_weights"] = True
if use_hpz:
config["zero_optimization"]["zero_hpz_partition_size"] = self.world_size // 2
return config
def test(self, model_name):
torch.manual_seed(0)
model, data_loader = self.load_and_prepare_data(model_name)
zeropp_loss = self.get_loss(model, data_loader, self.get_config_dict(use_quantized_weights=False, use_hpz=True))
model, data_loader = self.load_and_prepare_data(model_name)
baseline_loss = self.get_loss(model, data_loader, self.get_config_dict())
# Output and assert
print(f"zeropp_loss={zeropp_loss}, baseline_loss={baseline_loss}")
assert zeropp_loss < baseline_loss * 1.1, f"zeropp_loss={zeropp_loss}, baseline_loss={baseline_loss}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment