Skip to content

Instantly share code, notes, and snippets.

@currymj
Last active September 13, 2019 18:35
Show Gist options
  • Save currymj/34e9c949225512c89862a76356e74519 to your computer and use it in GitHub Desktop.
Save currymj/34e9c949225512c89862a76356e74519 to your computer and use it in GitHub Desktop.
example code to reproduce problem
#!/bin/bash
#SBATCH --ntasks 2
#SBATCH -N 2
#SBATCH --job-name=distributed-pbg
#SBATCH -o log-gloo-test-%j.out
source /etc/profile
export HDF5_USE_FILE_LOCKING='FALSE'
export PARENT=`/bin/hostname -s`
#export MPORT=13001
export CHILDREN=`scontrol show hostnames $SLURM_JOB_NODELIST | grep -v $PARENT`
export HOSTLIST="$PARENT $CHILDREN"
echo $HOSTLIST
export WORLD_SIZE=$SLURM_NTASKS
srun --resv-ports=30 ./distributed_runner.sh
#!/bin/bash
/bin/hostname -s
#ping -c 2 $PARENT
export MPORT=`echo $SLURM_STEP_RESV_PORTS | cut -d- -f1`
echo $MPORT
echo $SLURM_PROCID
python test_proc.py $SLURM_PROCID
"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
import datetime
from torch.multiprocessing import Process
import argparse
import sys
import time
parser = argparse.ArgumentParser()
parser.add_argument('rank', type=int)
args = parser.parse_args()
def run(rank, size):
""" Distributed function to be implemented later. """
print("run", file=sys.stderr)
time.sleep(10)
pass
def init_processes(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
init_method = f"tcp://{os.environ['PARENT']}:{os.environ['MPORT']}"
print(f"{rank} initializing", file=sys.stderr, flush=True)
dist.init_process_group(backend, rank=rank, world_size=size, init_method=init_method, timeout=datetime.timedelta(minutes=1))
print(f"{rank} initialized", file=sys.stderr, flush=True)
fn(rank, size)
if __name__ == "__main__":
size = 2
processes = []
base_rank = args.rank
base_world_size = int(os.environ['SLURM_NTASKS'])
print("initializing procs", file=sys.stderr)
for rank in range(base_rank*size, base_rank*size + size):
print(f"launching {rank}", file=sys.stderr)
p = Process(target=init_processes, args=(rank, base_world_size*size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment