Last active
April 28, 2021 15:02
-
-
Save anj-s/0af840dfe2e9e192c66760a9da59e951 to your computer and use it in GitHub Desktop.
Repro rpc_sync segmentation fault
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Example repro for failing to profile a callback. | |
import torch | |
import torch.distributed.rpc as rpc | |
import torch.multiprocessing as mp | |
import os | |
import argparse | |
import subprocess | |
RPC_PORT = 25001 | |
def get_init_urls(args): | |
if args.slurm_job: | |
node_list = os.environ.get("SLURM_JOB_NODELIST") | |
hostnames = subprocess.check_output( | |
["scontrol", "show", "hostnames", node_list] | |
) | |
master_host = hostnames.split()[0].decode("utf-8") | |
# Each worker has it's own process group hence we create | |
# a local URL for PG connections. | |
url_rpc = f"tcp://{master_host}:{RPC_PORT}" | |
else: | |
url_rpc = f"tcp://localhost:{RPC_PORT}" | |
return url_rpc | |
def run_worker(rank, args, world_size): | |
url_rpc = get_init_urls(args) | |
rpc_backend_options = rpc.TensorPipeRpcBackendOptions() | |
rpc_backend_options.init_method = url_rpc | |
if rank == 0: | |
rpc_backend_options.set_device_map("worker1", {0: 0}) | |
else: | |
rpc_backend_options.set_device_map("worker0", {0: 0}) | |
print(f"run_worker {rank} with world size {world_size}") | |
rpc.init_rpc( | |
f"worker{rank}", | |
rank=rank, | |
world_size=world_size, | |
rpc_backend_options=rpc_backend_options, | |
) | |
# Testing! | |
a = torch.ones((1, 1)) | |
if args.use_cuda_tensors: | |
a = a.to("cuda:0") | |
b = torch.ones((1, 1)) | |
if args.use_cuda_tensors: | |
b = b.to("cuda:0") | |
if rank == 0: | |
val = torch.distributed.rpc.rpc_sync( | |
"worker1", torch.add, args=(a, b)) | |
print(f"val {val}") | |
# Works 04/17 | |
# (fairscale) anj@devfair0443:~/examples$ srun python playground/herring/rpc_perf.py --use_cuda_tensors | |
# run_worker 1 with world size 2 | |
# run_worker 0 with world size 2 | |
# val tensor([[2.]], device='cuda:0') | |
# block until all rpcs finish | |
rpc.shutdown() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description="Performance analysis for PyTorch APIs." | |
) | |
parser.add_argument( | |
"--num_devices", | |
type=int, | |
default=8, | |
help="""The number of GPUs per machine.""", | |
) | |
parser.add_argument("--use_cuda_tensors", action="store_true", default=False) | |
parser.add_argument("--test_single_node", action="store_true", default=False) | |
args = parser.parse_args() | |
if "SLURM_NODEID" in os.environ: | |
args.slurm_job = True | |
os.environ["TP_SOCKET_IFNAME"] = "front0" | |
os.environ["GLOO_SOCKET_IFNAME"] = "front0" | |
else: | |
args.slurm_job = False | |
# os.environ["TP_VERBOSE_LOGGING"] = "1" | |
gpus_per_node = args.num_devices | |
if args.slurm_job: | |
node_id = int(os.environ.get("SLURM_NODEID")) | |
num_nodes = int(os.environ.get("SLURM_NNODES")) | |
world_size = num_nodes | |
else: | |
# hardcoded to test on local devfair machine | |
world_size = 2 | |
args.num_devices = 2 | |
num_nodes = 1 | |
args.num_nodes = num_nodes | |
args.world_size = world_size | |
if args.slurm_job and not args.test_single_node: | |
# TODO(anj-s): Make sure that are at least 2 nodes. | |
run_worker(node_id, args, world_size) | |
else: | |
mp.spawn(run_worker, args=(args, 2), nprocs=2, join=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment