Skip to content

Instantly share code, notes, and snippets.

@roman-4erkasov
Last active February 6, 2023 20:07
Show Gist options
  • Save roman-4erkasov/49ba6dcdfcca33999d17b88378765253 to your computer and use it in GitHub Desktop.
Save roman-4erkasov/49ba6dcdfcca33999d17b88378765253 to your computer and use it in GitHub Desktop.
import os
import torch as th
import torch.distributed as dist
import torch.multiprocessing as mp
def run(rank: int, value: float, src:int, dst: int):
tensor = th.FloatTensor([value,])#.to(f"cuda:{rank}")
print(f"[rk={rank}] tensor before send-recv: {tensor}")
req = dist.isend(tensor=tensor, dst=dst)
print(f"[rk={rank}] after isend")
dist.recv(tensor=tensor, src=src)
print(f"[rk={rank}] after recv")
req.wait()
print(f"[rk={rank}] after wait")
print(f"[rk={rank}] tensor after send-recv: {tensor}")
# tensor = th.FloatTensor([value,])
# print(f"[rk={rank}] tensor before send-recv: {tensor}")
# req = dist.isend(tensor=tensor.to(f"cuda:{rank}"), dst=dst)
# dist.irecv(tensor=tensor.to(f"cuda:{rank}"), src=src)
# req.wait()
# print(f"[rk={rank}] tensor after send-recv: {tensor}")
def init_process(rank: int):
dist.init_process_group(
"gloo",
rank=rank,
world_size=3,
init_method="file:///home/fatuus/torch_dist/init_process_group_2"
)
if rank==0:
run(rank=rank, value=float(rank), src=1, dst=1)
elif rank==1:
run(rank, value=float(rank), src=2, dst=2)
elif rank==2:
run(rank, value=float(rank), src=0, dst=0)
else:
raise Exception()
if __name__ == "__main__":
mp.set_start_method("spawn")
processes = []
for rank in [0,1, 2]:
p = mp.Process(target=init_process, args=(rank, ))
p.start()
processes.append(p)
for p in processes:
p.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment