Skip to content

Instantly share code, notes, and snippets.

@santurini
Last active May 22, 2023 08:32
Show Gist options
  • Save santurini/af2a0ed661d521b6b1c9c93b683fee3f to your computer and use it in GitHub Desktop.
Save santurini/af2a0ed661d521b6b1c9c93b683fee3f to your computer and use it in GitHub Desktop.
Tutorial to setup a Distributed Data Parallel training in torch using mpirun instead of torchrun

To launch a distributed training in torch with mpirun we have to:

  1. Configure a passwordless ssh connection with the nodes
  2. Setup the distributed environment inside the training script, in this case train.py
  3. Launch the training from the MASTER node with mpirun

For the first step, this is the pipeline:

# generate a public/private ssh key and make sure to NOT insert a passphrase

ssh-keygen -t rsa

# copy public key 'id_rsa' on the MASTER and SLAVE

ssh-copy-id -i ~/.ssh/id_rsa.pub username@master && ssh-copy-id -i ~/.ssh/id_rsa.pub username@slave

# add key on MASTER node

ssh-add ~/.ssh/id_rsa

Then we need to correctly setup the training script in this way:

  1. initialize distributed group
import torch.distributed as dist

rank = int(os.environ["OMPI_COMM_WORLD_RANK"])
local_rank = int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"])
world_size = int(os.environ["OMPI_COMM_WORLD_SIZE"])

dist.init_process_group(backend="nccl", rank=rank, world_size=world_size)
  1. wrap model with DistributedDataParallel
ddp_model = torch.nn.parallel.DistributedDataParallel(
            model,
            device_ids=[local_rank],
            output_device=local_rank
        )        
  1. use a distributed sampler for the DataLoader
train_sampler = DistributedSampler(dataset=train_ds)
val_sampler = DistributedSampler(dataset=val_ds)

train_dl = DataLoader(..., sampler=train_sampler)
val_dl = DataLoader(..., sampler=val_sampler)

To log in a distributed fashion you can follow one of the following strategies:

A) log only from the process with global rank 0:

if rank == 0:
  # code to log whatever

B) reduce results on process with global rank 0:

dist.reduce(tensor_name_to_reduce, dst=0, op=dist.ReduceOp.SUM) # in-place operation

tensor_name_to_reduce = tensor_name_to_reduce.item() / world_size # average for num_gpus

# code to log the aggregated tensor

Now to launch the training you can use the following command on the MASTER node:

 mpirun -np 2
        -H xxx.xxx.xxx.xxx:1,xxx.xxx.xxx.xxy:1 
        -x MASTER_ADDR=xxx.xxx.xxx.xxx 
        -x MASTER_PORT=1234 
        -x PATH=$PATH:/path/to/venv/my_env/bin 
        -bind-to none -map-by slot 
        python train.py --additional_script_args

the arguments are:

  • np: number of gpus to use, 2 in this case
  • H: list of hosts and available slots (:1, means one gpu), we can also link to a hostile (--hostfile)
  • x: additional variables, in this case master address & port and path to virtualenv that has to be the same on both machines
  • bind-to: no binding to use multithread
  • map-by: how to map the process, in this case we map per gpu
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment