Skip to content

Instantly share code, notes, and snippets.

@hamidzr
Last active February 7, 2024 18:35
Show Gist options
  • Save hamidzr/9e327b152885785797cb3dc3c1d4cdfa to your computer and use it in GitHub Desktop.
Save hamidzr/9e327b152885785797cb3dc3c1d4cdfa to your computer and use it in GitHub Desktop.
Examine a machine for running deep learning models
"""
docker pull determinedai/genai-eval:latest &&\
echo "Running checks..."; \
curl -s https://gist.githubusercontent.com/hamidzr/9e327b152885785797cb3dc3c1d4cdfa/raw/test-gas-env.py |\
docker run -i --rm --gpus all --entrypoint /bin/bash determinedai/genai-eval:latest -c "cat > /tmp/test-env.py && python3 /tmp/test-env.py"
# or save it to a file test-env.py and run:
cat test-env.py |\
docker run -i --rm --gpus all --entrypoint /bin/bash determinedai/genai-eval:latest -c "cat > /tmp/test-env.py && python3 /tmp/test-env.py"
"""
import os
import sys
import psutil
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset, DistributedSampler
def test_group_1():
assert torch.cuda.is_available(), "CUDA is not available"
assert torch.distributed.is_available(), "torch.distributed is not available"
def bytes_to_gb(bytes):
return bytes / (1024**3)
print(f"CUDA version: {torch.version.cuda}")
print(f"Torch version: {torch.__version__}")
print(f"NCCL version: {torch.cuda.nccl.version()}")
assert torch.backends.cudnn.is_available(), "cuDNN is not available"
print(f"cuDNN version: {torch.backends.cudnn.version()}")
num_gpus = torch.cuda.device_count()
print(f"GPUs: {num_gpus}")
min_compute_capability = 6.0
for gpu in range(num_gpus):
print(f"\nGPU {gpu}:")
print(f" Model: {torch.cuda.get_device_name(gpu)}")
capability = torch.cuda.get_device_capability(gpu)
print(f" Capability: {capability}")
assert (
capability[0] + capability[1] * 0.1 >= min_compute_capability
), f"GPU {gpu} has insufficient compute capability"
gpu_props = torch.cuda.get_device_properties(gpu)
print(f" Total Memory (GB): {bytes_to_gb(gpu_props.total_memory):.2f}")
print(f" Memory Allocated (GB): {bytes_to_gb(torch.cuda.memory_allocated(gpu)):.2f}")
print(f" Memory Cached (GB): {bytes_to_gb(torch.cuda.memory_reserved(gpu)):.2f}")
if num_gpus > 1:
print("Testing inter-GPU communication...")
x = torch.tensor([1.0], device="cuda:0")
y = x.to("cuda:1")
assert y.device.index == 1, "Inter-GPU communication failed"
print("Inter-GPU communication is operational")
initial_memory = torch.cuda.memory_allocated()
torch.cuda.empty_cache()
assert torch.cuda.memory_allocated() == initial_memory, "Potential memory leak detected"
print("No unexpected GPU memory allocation")
required_packages = ["torch"]
missing_packages = [pkg for pkg in required_packages if pkg not in sys.modules]
assert not missing_packages, f"Missing required packages: {missing_packages}"
print("All required Python packages are installed")
print(f"\nSystem Hardware Information:")
print(f"CPU Count: {psutil.cpu_count(logical=False)}")
print(f"Total RAM (GB): {bytes_to_gb(psutil.virtual_memory().total):.2f}")
def setup(rank, world_size):
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12345"
dist.init_process_group("nccl", rank=rank, world_size=world_size)
class SampleDataset(Dataset):
def __getitem__(self, index):
return torch.tensor([index] * 10), torch.tensor([index + 1])
def __len__(self):
return 100
def cleanup():
dist.destroy_process_group()
def ddp_example(rank, world_size):
setup(rank, world_size)
model = nn.Linear(10, 1).to(rank)
ddp_model = DDP(model, device_ids=[rank])
dataset = SampleDataset()
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, sampler=sampler)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.000001)
for epoch in range(5):
for inputs, labels in dataloader:
inputs, labels = inputs.to(rank), labels.to(rank)
inputs = inputs.float()
labels = labels.float()
optimizer.zero_grad()
outputs = ddp_model(inputs)
loss = torch.nn.functional.mse_loss(outputs, labels)
loss.backward()
optimizer.step()
print(f"ddp_example training finished on rank {rank}")
cleanup()
def test_group_2():
world_size = 2
torch.multiprocessing.spawn(ddp_example, args=(world_size,), nprocs=world_size, join=True)
if __name__ == "__main__":
# run test group 1 only once.
if not torch.distributed.is_initialized():
test_group_1()
test_group_2()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment