Skip to content

Instantly share code, notes, and snippets.

@anj-s
Last active April 28, 2021 15:02
Show Gist options
  • Save anj-s/0af840dfe2e9e192c66760a9da59e951 to your computer and use it in GitHub Desktop.
Save anj-s/0af840dfe2e9e192c66760a9da59e951 to your computer and use it in GitHub Desktop.
Repro rpc_sync segmentation fault
# Example repro for failing to profile a callback.
import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import os
import argparse
import subprocess
RPC_PORT = 25001
def get_init_urls(args):
if args.slurm_job:
node_list = os.environ.get("SLURM_JOB_NODELIST")
hostnames = subprocess.check_output(
["scontrol", "show", "hostnames", node_list]
)
master_host = hostnames.split()[0].decode("utf-8")
# Each worker has it's own process group hence we create
# a local URL for PG connections.
url_rpc = f"tcp://{master_host}:{RPC_PORT}"
else:
url_rpc = f"tcp://localhost:{RPC_PORT}"
return url_rpc
def run_worker(rank, args, world_size):
url_rpc = get_init_urls(args)
rpc_backend_options = rpc.TensorPipeRpcBackendOptions()
rpc_backend_options.init_method = url_rpc
if rank == 0:
rpc_backend_options.set_device_map("worker1", {0: 0})
else:
rpc_backend_options.set_device_map("worker0", {0: 0})
print(f"run_worker {rank} with world size {world_size}")
rpc.init_rpc(
f"worker{rank}",
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
# Testing!
a = torch.ones((1, 1))
if args.use_cuda_tensors:
a = a.to("cuda:0")
b = torch.ones((1, 1))
if args.use_cuda_tensors:
b = b.to("cuda:0")
if rank == 0:
val = torch.distributed.rpc.rpc_sync(
"worker1", torch.add, args=(a, b))
print(f"val {val}")
# Works 04/17
# (fairscale) anj@devfair0443:~/examples$ srun python playground/herring/rpc_perf.py --use_cuda_tensors
# run_worker 1 with world size 2
# run_worker 0 with world size 2
# val tensor([[2.]], device='cuda:0')
# block until all rpcs finish
rpc.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Performance analysis for PyTorch APIs."
)
parser.add_argument(
"--num_devices",
type=int,
default=8,
help="""The number of GPUs per machine.""",
)
parser.add_argument("--use_cuda_tensors", action="store_true", default=False)
parser.add_argument("--test_single_node", action="store_true", default=False)
args = parser.parse_args()
if "SLURM_NODEID" in os.environ:
args.slurm_job = True
os.environ["TP_SOCKET_IFNAME"] = "front0"
os.environ["GLOO_SOCKET_IFNAME"] = "front0"
else:
args.slurm_job = False
# os.environ["TP_VERBOSE_LOGGING"] = "1"
gpus_per_node = args.num_devices
if args.slurm_job:
node_id = int(os.environ.get("SLURM_NODEID"))
num_nodes = int(os.environ.get("SLURM_NNODES"))
world_size = num_nodes
else:
# hardcoded to test on local devfair machine
world_size = 2
args.num_devices = 2
num_nodes = 1
args.num_nodes = num_nodes
args.world_size = world_size
if args.slurm_job and not args.test_single_node:
# TODO(anj-s): Make sure that are at least 2 nodes.
run_worker(node_id, args, world_size)
else:
mp.spawn(run_worker, args=(args, 2), nprocs=2, join=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment