Skip to content

Instantly share code, notes, and snippets.

@woshiyyya
Last active February 28, 2024 05:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save woshiyyya/e0679f42b80339c964c16eb1372f7f11 to your computer and use it in GitHub Desktop.
Save woshiyyya/e0679f42b80339c964c16eb1372f7f11 to your computer and use it in GitHub Desktop.
import ray
ray.init()
node_resources = {}
for node in ray.nodes():
print(node, "\n")
node_resources[node["NodeID"]] = node["Resources"]
import ray
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
# Create a placement group.
pg = placement_group([{'memory': 40*1024**3, 'GPU': 1.0}])
ray.get(pg.ready())
@ray.remote(num_cpus=0)
def my_task():
node_id = ray.get_runtime_context().get_node_id()
node_ip = ray.util.get_node_ip_address()
print("my task node_ip: ", node_ip)
print("my task node id: ", node_id)
print(node_resources[node_id])
resource_id_map = ray._private.worker.global_worker.core_worker.resource_ids()
print("resource_id_map", resource_id_map)
assert node_resources[node_id]["memory"] > 40*1024**3
ray.get(
my_task.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=0,
placement_group_capture_child_tasks=True,
),
num_cpus=0,
num_gpus=0,
memory=0,
resources={},
).remote()
)
"""Ray Train release test: Colocate Trainer and Rank 0 worker
Setup:
- 1 x g4dn.4xlarge (16 CPU, 1 GPU, 64 GB Memory)
- 3 x g4dn.xlarge (4 CPU, 1 GPU, 16 GB memory)
Test owner: woshiyyya
"""
import ray
import ray.train
import pytest
from ray.train.data_parallel_trainer import DataParallelTrainer
from ray.train.backend import Backend, BackendConfig
from ray.train import ScalingConfig
ray.init()
# @pytest.mark.parametrize(
# "trainer_resources", [None, {"memory": 40 * 1024**3}, {"CPU": 10}]
# )
# @pytest.mark.parametrize(
# "resources_per_worker_and_use_gpu",
# [
# (None, True),
# ({"CPU": 1}, False),
# ({"GPU": 1}, True),
# ],
# )
def main(
trainer_resources,
resources_per_worker_and_use_gpu,
):
resources_per_worker, use_gpu = resources_per_worker_and_use_gpu
for node in ray.nodes():
print(node, "\n")
def train_func():
# print(f"WORKER RANK {ray.train.get_context().get_world_rank()}, NODE IP = {ray.util.get_node_ip_address()}")
# if ray.train.get_context().get_world_rank() == 0:
# assert "10.0.50.83" == ray.util.get_node_ip_address()
pass
class CustomBackend(Backend):
def on_training_start(self, worker_group, backend_config):
trainer_node_id = ray.util.get_node_ip_address()
print("Trainer Node IP = ", trainer_node_id)
# assert trainer_node_id == "10.0.50.83"
print("PLACEMENT GROUP = ", worker_group._placement_group, ray.util.get_current_placement_group().bundle_specs)
for i, worker in enumerate(worker_group.workers):
print(f"worker group i={i}", worker)
def check_node_id():
print(f"WORKER RANK {ray.train.get_context().get_world_rank()}, NODE IP = {ray.util.get_node_ip_address()}")
if ray.train.get_context().get_world_rank() == 0:
assert trainer_node_id == ray.util.get_node_ip_address()
worker_group.execute(check_node_id)
class CustomBackendConfig(BackendConfig):
@property
def backend_cls(self):
return CustomBackend
for num_workers in [1, 2, 4]:
scale_config = ScalingConfig(
num_workers=num_workers,
use_gpu=use_gpu,
trainer_resources=trainer_resources,
resources_per_worker=resources_per_worker,
)
trainer = DataParallelTrainer(
train_func,
scaling_config=scale_config,
backend_config=CustomBackendConfig(),
)
trainer.fit()
print(f"PASS {num_workers}")
if __name__ == "__main__":
main(trainer_resources={"memory": 40 * 1024**3}, resources_per_worker_and_use_gpu=(None, True))
import ray
from ray.cluster_utils import Cluster
cluster = Cluster()
node_configs = []
node_configs += [{"resources": {"X": 10}}]
node_configs += [{"resources": {"X": 15}}]
node_configs += [{"resources": {"X": 100}}]
for config in node_configs:
cluster.add_node(**config)
ray.init(address=cluster.address)
node_resources = {}
for node in ray.nodes():
print(node, "\n")
node_resources[node["NodeID"]] = node["Resources"]
import ray
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
# Create a placement group.
pg = placement_group([{"X": 20}, {"X": 80}, {"X": 5}, {"X": 5}])
ray.get(pg.ready())
@ray.remote(num_cpus=0)
def my_task():
node_id = ray.get_runtime_context().get_node_id()
print("my task node id: ", node_id)
print(node_resources[node_id])
resource_id_map = ray._private.worker.global_worker.core_worker.resource_ids()
print("resource_id_map", resource_id_map)
assert node_resources[node_id]["X"] == 100
ray.get(
my_task.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=0,
placement_group_capture_child_tasks=True,
),
num_cpus=0,
num_gpus=0,
memory=0,
resources={},
).remote()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment