Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions tests/single_controller/test_split_resource_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,14 @@ def test_split_resource_pool_with_split_size():
ray.init()
# assume we have 2 nodes, with 4 GPUs each
global_resource_pool = RayResourcePool(process_on_nodes=[4, 4])
global_resource_pool.get_placement_groups(device_name=get_device_name())
global_resource_pool.get_placement_groups()

# first 4 gpus for actor_1, last 4 gpus for actor_2
actor_1_resource_pool, actor_2_resource_pool = split_resource_pool(resource_pool=global_resource_pool, split_size=4)
actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0)
actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100)
actor_worker_1 = RayWorkerGroup(
resource_pool=actor_1_resource_pool, ray_cls_with_init=actor_cls_1, device_name=get_device_name()
)
actor_worker_2 = RayWorkerGroup(
resource_pool=actor_2_resource_pool, ray_cls_with_init=actor_cls_2, device_name=get_device_name()
)
actor_worker_1 = RayWorkerGroup(resource_pool=actor_1_resource_pool, ray_cls_with_init=actor_cls_1)
actor_worker_2 = RayWorkerGroup(resource_pool=actor_2_resource_pool, ray_cls_with_init=actor_cls_2)
assert actor_worker_1.world_size == 4
assert actor_worker_2.world_size == 4

Expand All @@ -79,7 +75,7 @@ def test_split_resource_pool_with_split_size_list():
ray.init()
# assume we have 4 nodes, with 2 GPUs each
global_resource_pool = RayResourcePool(process_on_nodes=[2, 2, 2, 2])
global_resource_pool.get_placement_groups(device_name=get_device_name())
global_resource_pool.get_placement_groups()

# first 2 gpus for actor_1, last 6 gpus for actor_2
actor_1_resource_pool, actor_2_resource_pool = split_resource_pool(
Expand All @@ -88,12 +84,8 @@ def test_split_resource_pool_with_split_size_list():
)
actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0)
actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100)
actor_worker_1 = RayWorkerGroup(
resource_pool=actor_1_resource_pool, ray_cls_with_init=actor_cls_1, device_name=get_device_name()
)
actor_worker_2 = RayWorkerGroup(
resource_pool=actor_2_resource_pool, ray_cls_with_init=actor_cls_2, device_name=get_device_name()
)
actor_worker_1 = RayWorkerGroup(resource_pool=actor_1_resource_pool, ray_cls_with_init=actor_cls_1)
actor_worker_2 = RayWorkerGroup(resource_pool=actor_2_resource_pool, ray_cls_with_init=actor_cls_2)
assert actor_worker_1.world_size == 2
assert actor_worker_2.world_size == 6

Expand All @@ -113,7 +105,7 @@ def test_split_resource_pool_with_split_size_list_cross_nodes():
ray.init()
# assume we have 4 nodes, with 2 GPUs each
global_resource_pool = RayResourcePool(process_on_nodes=[4, 4])
global_resource_pool.get_placement_groups(device_name=get_device_name())
global_resource_pool.get_placement_groups()

# first 2 gpus for actor_1, last 6 gpus for actor_2
actor_1_resource_pool, actor_2_resource_pool = split_resource_pool(
Expand All @@ -122,12 +114,8 @@ def test_split_resource_pool_with_split_size_list_cross_nodes():
)
actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0)
actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100)
actor_worker_1 = RayWorkerGroup(
resource_pool=actor_1_resource_pool, ray_cls_with_init=actor_cls_1, device_name=get_device_name()
)
actor_worker_2 = RayWorkerGroup(
resource_pool=actor_2_resource_pool, ray_cls_with_init=actor_cls_2, device_name=get_device_name()
)
actor_worker_1 = RayWorkerGroup(resource_pool=actor_1_resource_pool, ray_cls_with_init=actor_cls_1)
actor_worker_2 = RayWorkerGroup(resource_pool=actor_2_resource_pool, ray_cls_with_init=actor_cls_2)

assert actor_worker_1.world_size == 2
assert actor_worker_2.world_size == 6
Expand All @@ -149,7 +137,7 @@ def test_split_resource_pool_with_split_twice():

# assume we have 4 nodes, with 2 GPUs each
global_resource_pool = RayResourcePool(process_on_nodes=[2, 2, 2, 2])
global_resource_pool.get_placement_groups(device_name=get_device_name())
global_resource_pool.get_placement_groups()

# actors with [2, 1, 1, 1, 1, 2] (split twice)
rp_1, rp_2, rp_3 = split_resource_pool(
Expand Down
34 changes: 15 additions & 19 deletions verl/single_controller/ray/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from verl.protocol import DataProto, _padding_size_key
from verl.single_controller.base import ClassWithInitArgs, ResourcePool, Worker, WorkerGroup
from verl.single_controller.base.decorator import MAGIC_ATTR, Dispatch
from verl.utils.device import get_device_name
from verl.utils.device import get_device_name, get_resource_name
from verl.utils.py_functional import temp_env_var

__all__ = ["Worker"]
Expand Down Expand Up @@ -112,21 +112,17 @@ def __init__(
self.detached = detached
self.accelerator_type = accelerator_type

def get_placement_groups(self, strategy="STRICT_PACK", name=None, device_name="cuda"):
def get_placement_groups(self, strategy="STRICT_PACK", name=None):
if self.pgs is not None:
return self.pgs

pg_name_prefix = (
name if name else f"{self.name_prefix}verl_group_{'_'.join([str(count) for count in self._store])}:"
)
# print(f"pg_name_prefix = {pg_name_prefix}")
if device_name == "npu":
device_name = "NPU"
elif device_name == "cuda":
device_name = "GPU"

bundle = {"CPU": self.max_colocate_count}
if self.use_gpu:
device_name = get_resource_name()
bundle[device_name] = 1
if self.accelerator_type is not None:
bundle[self.accelerator_type] = 1e-4
Expand Down Expand Up @@ -249,9 +245,7 @@ def merge_resource_pool(rp1: RayResourcePool, rp2: RayResourcePool) -> RayResour
merged = type(rp1)(
new_store, rp1.use_gpu, f"{rp1.name_prefix}_{rp2.name_prefix}", rp1.max_colocate_count, rp1.detached
)
merged.pgs = rp1.get_placement_groups(device_name=get_device_name()) + rp2.get_placement_groups(
device_name=get_device_name()
)
merged.pgs = rp1.get_placement_groups() + rp2.get_placement_groups()

return merged

Expand Down Expand Up @@ -293,7 +287,7 @@ def __call__(
use_gpu: bool = True,
num_gpus=1,
sharing_with=None,
device_name="cuda",
**kwargs,
) -> Any:
"""Create and return a Ray actor with the configured options.

Expand All @@ -303,7 +297,7 @@ def __call__(
use_gpu: Whether to use GPU resources
num_gpus: Number of GPUs to allocate
sharing_with: Actor to share resources with
device_name: Device for training
kwargs: Additional keyword arguments

Returns:
A Ray actor handle with the configured options
Expand All @@ -321,10 +315,12 @@ def __call__(
}
options.update(self._options)

if use_gpu and device_name == "cuda":
options["num_gpus"] = num_gpus
if use_gpu and device_name == "npu":
options["resources"] = {"NPU": num_gpus}
if use_gpu:
device_name = get_device_name()
if device_name == "cuda":
options["num_gpus"] = num_gpus
elif device_name == "npu":
options["resources"] = {"NPU": num_gpus}

if len(self._additional_resource) > 1:
for k, v in self._additional_resource.items():
Expand Down Expand Up @@ -380,7 +376,7 @@ def __init__(
# if a WorkerGroup is spawned from Colocate WorkerGroup, this indicates which sub-class is binded to
# this WorkerGroup.
self.sub_cls_name = ""
self.device_name = kwargs.get("device_name", "cuda")
self.device_name = kwargs.get("device_name", get_device_name())
self.profile_steps = kwargs.get("profile_steps", None)
self.worker_nsight_options = kwargs.get("worker_nsight_options", None)
self.customized_worker_env = kwargs.get("worker_env", {})
Expand Down Expand Up @@ -469,7 +465,7 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d
strategy = "PACK"
if bin_pack:
strategy = "STRICT_PACK"
pgs = resource_pool.get_placement_groups(strategy=strategy, device_name=self.device_name)
pgs = resource_pool.get_placement_groups(strategy=strategy)
world_size = resource_pool.world_size
self._world_size = world_size
# cia.add_kwarg("_world_size", world_size)
Expand Down Expand Up @@ -505,7 +501,7 @@ def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack
strategy = "PACK"
if bin_pack:
strategy = "STRICT_PACK"
pgs = resource_pool.get_placement_groups(strategy=strategy, device_name=self.device_name)
pgs = resource_pool.get_placement_groups(strategy=strategy)
world_size = resource_pool.world_size
self._world_size = world_size

Expand Down
8 changes: 8 additions & 0 deletions verl/utils/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
logger = logging.getLogger(__name__)


def get_resource_name() -> str:
"""Function that return ray resource name based on the device type.
Returns:
ray resource name string, either "GPU" or "NPU".
"""
return "GPU" if is_cuda_available else "NPU"


def is_torch_npu_available(check_device=True) -> bool:
"""Check if Ascend NPU is available for PyTorch operations.

Expand Down
Loading