Last active
February 28, 2024 05:43
-
-
Save woshiyyya/e0679f42b80339c964c16eb1372f7f11 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ray | |
ray.init() | |
node_resources = {} | |
for node in ray.nodes(): | |
print(node, "\n") | |
node_resources[node["NodeID"]] = node["Resources"] | |
import ray | |
from ray.util.placement_group import placement_group | |
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy | |
# Create a placement group. | |
pg = placement_group([{'memory': 40*1024**3, 'GPU': 1.0}]) | |
ray.get(pg.ready()) | |
@ray.remote(num_cpus=0) | |
def my_task(): | |
node_id = ray.get_runtime_context().get_node_id() | |
node_ip = ray.util.get_node_ip_address() | |
print("my task node_ip: ", node_ip) | |
print("my task node id: ", node_id) | |
print(node_resources[node_id]) | |
resource_id_map = ray._private.worker.global_worker.core_worker.resource_ids() | |
print("resource_id_map", resource_id_map) | |
assert node_resources[node_id]["memory"] > 40*1024**3 | |
ray.get( | |
my_task.options( | |
scheduling_strategy=PlacementGroupSchedulingStrategy( | |
placement_group=pg, | |
placement_group_bundle_index=0, | |
placement_group_capture_child_tasks=True, | |
), | |
num_cpus=0, | |
num_gpus=0, | |
memory=0, | |
resources={}, | |
).remote() | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Ray Train release test: Colocate Trainer and Rank 0 worker | |
Setup: | |
- 1 x g4dn.4xlarge (16 CPU, 1 GPU, 64 GB Memory) | |
- 3 x g4dn.xlarge (4 CPU, 1 GPU, 16 GB memory) | |
Test owner: woshiyyya | |
""" | |
import ray | |
import ray.train | |
import pytest | |
from ray.train.data_parallel_trainer import DataParallelTrainer | |
from ray.train.backend import Backend, BackendConfig | |
from ray.train import ScalingConfig | |
ray.init() | |
# @pytest.mark.parametrize( | |
# "trainer_resources", [None, {"memory": 40 * 1024**3}, {"CPU": 10}] | |
# ) | |
# @pytest.mark.parametrize( | |
# "resources_per_worker_and_use_gpu", | |
# [ | |
# (None, True), | |
# ({"CPU": 1}, False), | |
# ({"GPU": 1}, True), | |
# ], | |
# ) | |
def main( | |
trainer_resources, | |
resources_per_worker_and_use_gpu, | |
): | |
resources_per_worker, use_gpu = resources_per_worker_and_use_gpu | |
for node in ray.nodes(): | |
print(node, "\n") | |
def train_func(): | |
# print(f"WORKER RANK {ray.train.get_context().get_world_rank()}, NODE IP = {ray.util.get_node_ip_address()}") | |
# if ray.train.get_context().get_world_rank() == 0: | |
# assert "10.0.50.83" == ray.util.get_node_ip_address() | |
pass | |
class CustomBackend(Backend): | |
def on_training_start(self, worker_group, backend_config): | |
trainer_node_id = ray.util.get_node_ip_address() | |
print("Trainer Node IP = ", trainer_node_id) | |
# assert trainer_node_id == "10.0.50.83" | |
print("PLACEMENT GROUP = ", worker_group._placement_group, ray.util.get_current_placement_group().bundle_specs) | |
for i, worker in enumerate(worker_group.workers): | |
print(f"worker group i={i}", worker) | |
def check_node_id(): | |
print(f"WORKER RANK {ray.train.get_context().get_world_rank()}, NODE IP = {ray.util.get_node_ip_address()}") | |
if ray.train.get_context().get_world_rank() == 0: | |
assert trainer_node_id == ray.util.get_node_ip_address() | |
worker_group.execute(check_node_id) | |
class CustomBackendConfig(BackendConfig): | |
@property | |
def backend_cls(self): | |
return CustomBackend | |
for num_workers in [1, 2, 4]: | |
scale_config = ScalingConfig( | |
num_workers=num_workers, | |
use_gpu=use_gpu, | |
trainer_resources=trainer_resources, | |
resources_per_worker=resources_per_worker, | |
) | |
trainer = DataParallelTrainer( | |
train_func, | |
scaling_config=scale_config, | |
backend_config=CustomBackendConfig(), | |
) | |
trainer.fit() | |
print(f"PASS {num_workers}") | |
if __name__ == "__main__": | |
main(trainer_resources={"memory": 40 * 1024**3}, resources_per_worker_and_use_gpu=(None, True)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ray | |
from ray.cluster_utils import Cluster | |
cluster = Cluster() | |
node_configs = [] | |
node_configs += [{"resources": {"X": 10}}] | |
node_configs += [{"resources": {"X": 15}}] | |
node_configs += [{"resources": {"X": 100}}] | |
for config in node_configs: | |
cluster.add_node(**config) | |
ray.init(address=cluster.address) | |
node_resources = {} | |
for node in ray.nodes(): | |
print(node, "\n") | |
node_resources[node["NodeID"]] = node["Resources"] | |
import ray | |
from ray.util.placement_group import placement_group | |
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy | |
# Create a placement group. | |
pg = placement_group([{"X": 20}, {"X": 80}, {"X": 5}, {"X": 5}]) | |
ray.get(pg.ready()) | |
@ray.remote(num_cpus=0) | |
def my_task(): | |
node_id = ray.get_runtime_context().get_node_id() | |
print("my task node id: ", node_id) | |
print(node_resources[node_id]) | |
resource_id_map = ray._private.worker.global_worker.core_worker.resource_ids() | |
print("resource_id_map", resource_id_map) | |
assert node_resources[node_id]["X"] == 100 | |
ray.get( | |
my_task.options( | |
scheduling_strategy=PlacementGroupSchedulingStrategy( | |
placement_group=pg, | |
placement_group_bundle_index=0, | |
placement_group_capture_child_tasks=True, | |
), | |
num_cpus=0, | |
num_gpus=0, | |
memory=0, | |
resources={}, | |
).remote() | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment