diff --git a/tests/experimental/agent_loop/agent_utils.py b/tests/experimental/agent_loop/agent_utils.py index 20e6848746a..4596236bc78 100644 --- a/tests/experimental/agent_loop/agent_utils.py +++ b/tests/experimental/agent_loop/agent_utils.py @@ -79,7 +79,7 @@ def init_agent_loop_manager(config: DictConfig) -> AgentLoopManager | RayWorkerG config=config, rm_resource_pool=rm_resource_pool, ) - agent_loop_manager = AgentLoopManager( + agent_loop_manager = AgentLoopManager.create( config=config, worker_group=actor_rollout_wg, reward_loop_worker_handles=reward_loop_manager.reward_loop_workers, diff --git a/tests/experimental/agent_loop/test_agent_loop_extra_fields_schema_on_cpu.py b/tests/experimental/agent_loop/test_agent_loop_extra_fields_schema_on_cpu.py index f8bda825ab2..e5d296a8756 100644 --- a/tests/experimental/agent_loop/test_agent_loop_extra_fields_schema_on_cpu.py +++ b/tests/experimental/agent_loop/test_agent_loop_extra_fields_schema_on_cpu.py @@ -147,7 +147,10 @@ async def test_agent_loop_extra_fields_schema_stable_for_training_concat_on_cpu( # Minimal config surface used by the agent loops. config = OmegaConf.create( { - "actor_rollout_ref": {"rollout": {"prompt_length": 16, "response_length": 16}}, + "actor_rollout_ref": { + "rollout": {"prompt_length": 16, "response_length": 16, "multi_turn": {"tool_config_path": None}}, + "model": {}, + }, "data": { "tool_config_path": None, "apply_chat_template_kwargs": {}, @@ -160,7 +163,7 @@ async def test_agent_loop_extra_fields_schema_stable_for_training_concat_on_cpu( processor = None trainer_config = DictConfigWrap(config) - dataset_config = DictConfigWrap(config.data) + data_config = DictConfigWrap(config.data) single_turn = SingleTurnAgentLoop( trainer_config=trainer_config, @@ -168,7 +171,7 @@ async def test_agent_loop_extra_fields_schema_stable_for_training_concat_on_cpu( tokenizer=tokenizer, processor=processor, dataset_cls=RLHFDataset, - dataset_config=dataset_config, + data_config=data_config, ) partial_single_turn = PartialSingleTurnAgentLoop( trainer_config=trainer_config, @@ -176,7 +179,7 @@ async def test_agent_loop_extra_fields_schema_stable_for_training_concat_on_cpu( tokenizer=tokenizer, processor=processor, dataset_cls=RLHFDataset, - dataset_config=dataset_config, + data_config=data_config, ) raw_prompt = [{"role": "user", "content": "hi"}] diff --git a/tests/experimental/reward_loop/test_agent_reward_loop_colocate.py b/tests/experimental/reward_loop/test_agent_reward_loop_colocate.py index 0e4e6b93683..0ea96dca409 100644 --- a/tests/experimental/reward_loop/test_agent_reward_loop_colocate.py +++ b/tests/experimental/reward_loop/test_agent_reward_loop_colocate.py @@ -98,7 +98,10 @@ def test_agent_reward_loop_standalone(): ) actor_rollout_wg.init_model() - agent_loop_manager = AgentLoopManager(config, worker_group=actor_rollout_wg) + agent_loop_manager = AgentLoopManager.create( + config=config, + worker_group=actor_rollout_wg, + ) # sleep rollout replicas checkpoint_manager = CheckpointEngineManager( config=omega_conf_to_dataclass(config.actor_rollout_ref.rollout.checkpoint_engine), diff --git a/tests/experimental/reward_loop/test_agent_reward_loop_standalone.py b/tests/experimental/reward_loop/test_agent_reward_loop_standalone.py index bd9011b9874..80a0945bec7 100644 --- a/tests/experimental/reward_loop/test_agent_reward_loop_standalone.py +++ b/tests/experimental/reward_loop/test_agent_reward_loop_standalone.py @@ -56,6 +56,7 @@ def test_agent_reward_loop_standalone(): config.actor_rollout_ref.rollout.prompt_length = 1024 config.actor_rollout_ref.rollout.response_length = 4096 config.actor_rollout_ref.rollout.skip_tokenizer_init = True + config.actor_rollout_ref.rollout.nnodes = 1 config.trainer.n_gpus_per_node = 4 config.trainer.nnodes = 1 @@ -76,8 +77,9 @@ def test_agent_reward_loop_standalone(): # 1. init reward model manager reward_loop_manager = RewardLoopManager(config) - agent_loop_manager = AgentLoopManager( - config=config, reward_loop_worker_handles=reward_loop_manager.reward_loop_workers + agent_loop_manager = AgentLoopManager.create( + config=config, + reward_loop_worker_handles=reward_loop_manager.reward_loop_workers, ) # 2. init test data diff --git a/verl/experimental/agent_loop/agent_loop.py b/verl/experimental/agent_loop/agent_loop.py index b591d093696..c60baa6abbb 100644 --- a/verl/experimental/agent_loop/agent_loop.py +++ b/verl/experimental/agent_loop/agent_loop.py @@ -35,24 +35,32 @@ from verl.experimental.agent_loop.utils import resolve_config_path from verl.protocol import DataProto from verl.single_controller.ray.base import RayResourcePool, RayWorkerGroup -from verl.utils import hf_processor, hf_tokenizer from verl.utils.chat_template import initialize_system_prompt +from verl.utils.config import omega_conf_to_dataclass from verl.utils.dataset.rl_dataset import RLHFDataset, get_dataset_class -from verl.utils.fs import copy_to_local from verl.utils.model import compute_position_id_with_mask -from verl.utils.ray_utils import get_event_loop +from verl.utils.ray_utils import auto_await, get_event_loop from verl.utils.rollout_trace import ( RolloutTraceConfig, rollout_trace_attr, rollout_trace_op, ) from verl.utils.transferqueue_utils import tqbridge +from verl.workers.config import HFModelConfig, RolloutConfig from verl.workers.rollout.replica import TokenOutput, get_rollout_replica_class logger = logging.getLogger(__file__) logger.setLevel(os.getenv("VERL_LOGGING_LEVEL", "WARN")) +def _get_rollout_and_model_config(config: DictConfig) -> tuple[DictConfig, DictConfig]: + # TODO: backward compatibility, remove this once we switch to new trainer. + if config.get("actor_rollout_ref"): + return config.actor_rollout_ref.rollout, config.actor_rollout_ref.model + else: + return config.rollout, config.model + + class AsyncLLMServerManager: """ A class to manage multiple OpenAI compatible LLM servers. This class provides @@ -64,7 +72,7 @@ def __init__(self, config: DictConfig, server_handles: list[ray.actor.ActorHandl """Initialize the AsyncLLMServerManager. Args: - config (DictConfig): YAML config. + config (DictConfig): whole config for main entrypoint. server_handles (List[ray.actor.ActorHandle]): OpenAI compatible LLM server actor handles. max_cache_size (int, optional): max cache size for request_id to server mapping. Defaults to 10000. """ @@ -190,7 +198,16 @@ def __init__(self, config: DictConfig): class AgentLoopBase(ABC): """An agent loop takes an input message, chat with OpenAI compatible LLM server and interact with various - environments.""" + environments. + + Args: + trainer_config (DictConfig): whole config for main entrypoint. + server_manager (AsyncLLMServerManager): OpenAI compatible LLM server manager. + tokenizer (AutoTokenizer): Tokenizer for tokenize messages. + processor (AutoProcessor): Processor for process messages. + dataset_cls (type[Dataset]): Dataset class for creating dataset, Defaults to RLHFDataset. + data_config (DictConfigWrap): Dataset config. + """ def __init__( self, @@ -199,26 +216,17 @@ def __init__( tokenizer: AutoTokenizer, processor: AutoProcessor, dataset_cls: type[RLHFDataset], - dataset_config: DictConfigWrap, + data_config: DictConfigWrap, **kwargs, ): - """Initialize agent loop, each sample will have its own loop instance. - - Args: - trainer_config (DictConfigWrap): trainer config. - server_manager (AsyncLLMServerManager): OpenAI compatible LLM server manager. - tokenizer (AutoTokenizer): Tokenizer for tokenize messages. - processor (AutoProcessor): Processor for process messages. - dataset_cls (type[Dataset]): Dataset class for creating dataset, Defaults to RLHFDataset. - dataset_config (DictConfigWrap): Dataset config. - """ self.config = trainer_config.config + self.rollout_config, _ = _get_rollout_and_model_config(self.config) self.server_manager = server_manager self.tokenizer = tokenizer self.processor = processor self.dataset_cls = dataset_cls - self.dataset_config = dataset_config.config - self.apply_chat_template_kwargs = self.dataset_config.get("apply_chat_template_kwargs", {}) + self.data_config = data_config.config + self.apply_chat_template_kwargs = self.data_config.get("apply_chat_template_kwargs", {}) self.system_prompt = initialize_system_prompt(self.tokenizer, **self.apply_chat_template_kwargs) self.loop = get_event_loop() @@ -234,7 +242,7 @@ async def process_vision_info(self, messages: list[dict]) -> dict: multi_modal_data = {} if self.processor is not None: images, videos = await self.dataset_cls.process_vision_info( - messages, image_patch_size=self.processor.image_processor.patch_size, config=self.dataset_config + messages, image_patch_size=self.processor.image_processor.patch_size, config=self.data_config ) if images is not None: multi_modal_data["images"] = images @@ -342,7 +350,13 @@ def decorator(subclass: type[AgentLoopBase]) -> type[AgentLoopBase]: class AgentLoopWorker: - """Agent loop worker takes a batch of messages and run each message in an agent loop.""" + """Agent loop worker takes a batch of messages and run each message in an agent loop. + + Args: + config (DictConfig): whole config for main entrypoint. + server_handles (List[ray.actor.ActorHandle]): OpenAI compatible LLM server actor handles. + reward_loop_worker_handles (List[ray.actor.ActorHandle]): Actor handles for streaming reward computation. + """ def __init__( self, @@ -350,13 +364,10 @@ def __init__( server_handles: list[ray.actor.ActorHandle], reward_loop_worker_handles: list[ray.actor.ActorHandle] = None, ): - """Initialize agent loop manager. - Args: - config (DictConfig): YAML config. - server_handles (List[ray.actor.ActorHandle]): OpenAI compatible LLM server actor handles. - reward_loop_worker_handles (List[ray.actor.ActorHandle]): Actor handles for streaming reward computation. - """ self.config = config + rollout_config, model_config = _get_rollout_and_model_config(config) + self.rollout_config: RolloutConfig = omega_conf_to_dataclass(rollout_config) + self.model_config: HFModelConfig = omega_conf_to_dataclass(model_config) # for recipe to change if not hasattr(self, "server_manager"): @@ -365,27 +376,24 @@ def __init__( self.dataset_cls = get_dataset_class(config.data) self.reward_loop_worker_handles = reward_loop_worker_handles - model_path = config.actor_rollout_ref.model.path - self.model_name = "/".join(model_path.split("/")[-2:]) - local_path = copy_to_local(config.actor_rollout_ref.model.path) - self.tokenizer = hf_tokenizer(local_path, trust_remote_code=True) - self.processor = hf_processor(local_path, trust_remote_code=True) + self.tokenizer = self.model_config.tokenizer + self.processor = self.model_config.processor - agent_loop_config_path = config.actor_rollout_ref.rollout.agent.agent_loop_config_path + agent_loop_config_path = self.rollout_config.agent.agent_loop_config_path if agent_loop_config_path: resolved_path = resolve_config_path(agent_loop_config_path) agent_loop_configs = OmegaConf.load(resolved_path) for agent_loop_config in agent_loop_configs: _agent_loop_registry[agent_loop_config.name] = agent_loop_config - if self.config.actor_rollout_ref.model.get("custom_chat_template", None) is not None: - if self.processor is not None: - self.processor.chat_template = self.config.actor_rollout_ref.model.custom_chat_template - self.tokenizer.chat_template = self.config.actor_rollout_ref.model.custom_chat_template + if self.model_config.get("custom_chat_template", None) is not None: + if self.model_config.processor is not None: + self.model_config.processor.chat_template = self.model_config.custom_chat_template + self.model_config.tokenizer.chat_template = self.model_config.custom_chat_template - trace_config = self.config.actor_rollout_ref.rollout.get("trace", {}) + trace_config = self.rollout_config.trace RolloutTraceConfig.init( - self.config.trainer.project_name, - self.config.trainer.experiment_name, + self.rollout_config.trace.project_name, + self.rollout_config.trace.experiment_name, trace_config.get("backend"), trace_config.get("token2text", False), trace_config.get("max_samples_per_step_per_worker", None), @@ -413,7 +421,7 @@ async def generate_sequences(self, batch: DataProto) -> DataProto: responses: |<- LLM generation ->|<- tool_calls ->|<- LLM generation ->|<- padding ->| response_mask: | 1, 1, 1, ..., 1, 1 | 0, 0, .., 0, 0 | 1, 1, 1, ..., 1, 1 | 0, 0, ..., 0| """ - config = self.config.actor_rollout_ref.rollout + config = self.rollout_config sampling_params = dict( temperature=config.temperature, top_p=config.top_p, @@ -502,7 +510,7 @@ async def _run_agent_loop( tokenizer=self.tokenizer, processor=self.processor, dataset_cls=self.dataset_cls, - dataset_config=DictConfigWrap(self.config.data), + data_config=DictConfigWrap(self.config.data), ) output: AgentLoopOutput = await agent_loop.run(sampling_params, **kwargs) return await self._agent_loop_postprocess(output, **kwargs) @@ -536,7 +544,7 @@ async def _agent_loop_postprocess(self, output, **kwargs) -> _InternalAgentLoopO prompt_output = self.tokenizer.pad( {"input_ids": output.prompt_ids}, padding="max_length", - max_length=self.config.actor_rollout_ref.rollout.prompt_length, + max_length=self.rollout_config.prompt_length, return_tensors="pt", return_attention_mask=True, ) @@ -548,7 +556,7 @@ async def _agent_loop_postprocess(self, output, **kwargs) -> _InternalAgentLoopO response_output = self.tokenizer.pad( {"input_ids": output.response_ids}, padding="max_length", - max_length=self.config.actor_rollout_ref.rollout.response_length, + max_length=self.rollout_config.response_length, return_tensors="pt", return_attention_mask=True, ) @@ -559,7 +567,7 @@ async def _agent_loop_postprocess(self, output, **kwargs) -> _InternalAgentLoopO response_mask_output = self.tokenizer.pad( {"input_ids": output.response_mask}, padding="max_length", - max_length=self.config.actor_rollout_ref.rollout.response_length, + max_length=self.rollout_config.response_length, return_tensors="pt", return_attention_mask=False, ) @@ -568,7 +576,7 @@ async def _agent_loop_postprocess(self, output, **kwargs) -> _InternalAgentLoopO response_logprobs = None if output.response_logprobs is not None: - pad_size = self.config.actor_rollout_ref.rollout.response_length - len(output.response_logprobs) + pad_size = self.rollout_config.response_length - len(output.response_logprobs) response_logprobs = torch.tensor(output.response_logprobs + [0.0] * pad_size).unsqueeze(0) response_mask = response_mask_output["input_ids"] * response_output["attention_mask"] @@ -846,7 +854,17 @@ async def get_trajectory_info(step, index, validate): class AgentLoopManager: - """Agent loop manager that manages a group of agent loop workers.""" + """Agent loop manager that manages a group of agent loop workers. + + - if worker_group is not None, rollout server is in hybrid mode, share GPUs with training engine. + - otherwise, rollout server is in standalone mode, use separate GPUs, e.g., one-step-off/fully async training. + + Args: + config (DictConfig): whole config for main entrypoint. + worker_group (RayWorkerGroup): ActorRolloutRef worker group for hybrid mode; None for standalone mode. + rollout_resource_pool (RayResourcePool): Resource pool for hybrid mode, only used by TensorRT-LLM. + reward_loop_worker_handles (List[ray.actor.ActorHandle]): Actor handles for streaming reward computation. + """ def __init__( self, @@ -855,63 +873,70 @@ def __init__( rollout_resource_pool: RayResourcePool = None, reward_loop_worker_handles: list[ray.actor.ActorHandle] = None, ): - """Initialize agent loop manager. - - Args: - config (DictConfig): trainer config. - worker_group (RayWorkerGroup): ActorRolloutRef worker group for hybrid mode; None for standalone mode. - rollout_resource_pool (RayResourcePool): Resource pool for actor rollout (Colocate or Standalone mode). - reward_loop_worker_handles (List[ray.actor.ActorHandle]): Actor handles for streaming reward computation. - """ self.config = config + self.rollout_config, self.model_config = _get_rollout_and_model_config(config) self.worker_group = worker_group + self.rollout_resource_pool = rollout_resource_pool self.reward_loop_worker_handles = reward_loop_worker_handles + assert worker_group is not None or self.rollout_config.nnodes > 0, "nnodes must be > 0 in standalone mode" + # for recipe to change if not hasattr(self, "rollout_replica_class"): - self.rollout_replica_class = get_rollout_replica_class(self.config.actor_rollout_ref.rollout.name) + self.rollout_replica_class = get_rollout_replica_class(self.rollout_config.name) if not hasattr(self, "agent_loop_workers_class"): self.agent_loop_workers_class = ray.remote(AgentLoopWorker) - self._initialize_llm_servers(rollout_resource_pool) - self._init_agent_loop_workers() + @classmethod + @auto_await + async def create( + cls, + config: DictConfig, + worker_group: RayWorkerGroup = None, + rollout_resource_pool: RayResourcePool = None, + reward_loop_worker_handles: list[ray.actor.ActorHandle] = None, + ): + """Create agent loop manager.""" + instance = cls(config, worker_group, rollout_resource_pool, reward_loop_worker_handles) + await instance._initialize_llm_servers() + await instance._init_agent_loop_workers() + return instance - def _initialize_llm_servers(self, rollout_resource_pool: RayResourcePool): + async def _initialize_llm_servers(self): rollout_world_size = ( - self.config.actor_rollout_ref.rollout.tensor_model_parallel_size - * self.config.actor_rollout_ref.rollout.data_parallel_size - * self.config.actor_rollout_ref.rollout.pipeline_model_parallel_size + self.rollout_config.tensor_model_parallel_size + * self.rollout_config.data_parallel_size + * self.rollout_config.pipeline_model_parallel_size ) world_size = ( self.worker_group.world_size if self.worker_group - else self.config.trainer.n_gpus_per_node * self.config.trainer.nnodes + else self.rollout_config.n_gpus_per_node * self.rollout_config.nnodes ) num_replicas = world_size // rollout_world_size - rollout_config = self.config.actor_rollout_ref.rollout - model_config = self.config.actor_rollout_ref.model self.rollout_replicas = [ self.rollout_replica_class( replica_rank=replica_rank, - config=rollout_config, - model_config=model_config, - gpus_per_node=self.config.trainer.n_gpus_per_node, + config=self.rollout_config, + model_config=self.model_config, + gpus_per_node=self.rollout_config.n_gpus_per_node, ) for replica_rank in range(num_replicas) ] - if self.worker_group and rollout_config.name != "trtllm": - self._run_all([server.init_hybrid(self.worker_group) for server in self.rollout_replicas]) - elif self.worker_group and rollout_config.name == "trtllm": - self._run_all( - [ - server.init_hybrid_colocated(self.worker_group, rollout_resource_pool) + if self.worker_group and self.rollout_config.name != "trtllm": + await asyncio.gather(*[server.init_hybrid(self.worker_group) for server in self.rollout_replicas]) + # TODO: unify trtllm to init_hybrid + elif self.worker_group and self.rollout_config.name == "trtllm": + await asyncio.gather( + *[ + server.init_hybrid_colocated(self.worker_group, self.rollout_resource_pool) for server in self.rollout_replicas ] ) else: - self._run_all([server.init_standalone() for server in self.rollout_replicas]) + await asyncio.gather(*[server.init_standalone() for server in self.rollout_replicas]) self.server_handles = [server._server_handle for server in self.rollout_replicas] self.server_addresses = [server._server_address for server in self.rollout_replicas] @@ -919,14 +944,14 @@ def _initialize_llm_servers(self, rollout_resource_pool: RayResourcePool): print(f"AgentLoopManager: {self.server_addresses}") # Update Prometheus configuration with server addresses - if rollout_config.prometheus.enable: - if rollout_config.disable_log_stats: + if self.rollout_config.prometheus.enable: + if self.rollout_config.disable_log_stats: raise ValueError("PROMETHEUS needs disable_log_stats==False, but it is currently True.") - update_prometheus_config(rollout_config.prometheus, self.server_addresses, rollout_config.name) + update_prometheus_config(self.rollout_config.prometheus, self.server_addresses, self.rollout_config.name) - def _init_agent_loop_workers(self): + async def _init_agent_loop_workers(self): self.agent_loop_workers = [] - num_workers = self.config.actor_rollout_ref.rollout.agent.num_workers + num_workers = self.rollout_config.agent.num_workers node_ids = [node["NodeID"] for node in ray.nodes() if node["Alive"] and node["Resources"].get("CPU", 0) > 0] for i in range(num_workers): @@ -941,7 +966,8 @@ def _init_agent_loop_workers(self): ).remote(self.config, self.server_handles, self.reward_loop_worker_handles) ) - def generate_sequences(self, prompts: DataProto) -> DataProto: + @auto_await + async def generate_sequences(self, prompts: DataProto) -> DataProto: """Split input batch and dispatch to agent loop workers. Args: @@ -952,8 +978,8 @@ def generate_sequences(self, prompts: DataProto) -> DataProto: """ chunkes = prompts.chunk(len(self.agent_loop_workers)) - outputs = ray.get( - [ + outputs = await asyncio.gather( + *[ worker.generate_sequences.remote(chunk) for worker, chunk in zip(self.agent_loop_workers, chunkes, strict=True) ] @@ -994,20 +1020,17 @@ def _performance_metrics(self, metrics: list[list[dict[str, str]]], output: Data return timing - def clear_kv_cache(self): + @auto_await + async def clear_kv_cache(self): """Clear all rollout kv cache, but don`t sleep.""" - self._run_all([replica.clear_kv_cache() for replica in self.rollout_replicas]) + await asyncio.gather(*[replica.clear_kv_cache() for replica in self.rollout_replicas]) - def start_profile(self, **kwargs): + @auto_await + async def start_profile(self, **kwargs): """Start profiling on all rollout replicas.""" - self._run_all([replica.start_profile(**kwargs) for replica in self.rollout_replicas]) + await asyncio.gather(*[replica.start_profile(**kwargs) for replica in self.rollout_replicas]) - def stop_profile(self): + @auto_await + async def stop_profile(self): """Stop profiling on all rollout replicas.""" - self._run_all([replica.stop_profile() for replica in self.rollout_replicas]) - - def _run_all(self, tasks: list[asyncio.Task]): - async def run_all(): - await asyncio.gather(*tasks) - - asyncio.run(run_all()) + await asyncio.gather(*[replica.stop_profile() for replica in self.rollout_replicas]) diff --git a/verl/experimental/agent_loop/single_turn_agent_loop.py b/verl/experimental/agent_loop/single_turn_agent_loop.py index 0bd893ae4ad..6ad3aa429b3 100644 --- a/verl/experimental/agent_loop/single_turn_agent_loop.py +++ b/verl/experimental/agent_loop/single_turn_agent_loop.py @@ -29,8 +29,8 @@ class SingleTurnAgentLoop(AgentLoopBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.prompt_length = self.config.actor_rollout_ref.rollout.prompt_length - self.response_length = self.config.actor_rollout_ref.rollout.response_length + self.prompt_length = self.rollout_config.prompt_length + self.response_length = self.rollout_config.response_length async def run(self, sampling_params: dict[str, Any], **kwargs) -> AgentLoopOutput: messages = list(kwargs["raw_prompt"]) diff --git a/verl/experimental/agent_loop/tool_agent_loop.py b/verl/experimental/agent_loop/tool_agent_loop.py index d8cf336bdb1..c649a2fc3fd 100644 --- a/verl/experimental/agent_loop/tool_agent_loop.py +++ b/verl/experimental/agent_loop/tool_agent_loop.py @@ -21,13 +21,10 @@ import torch from PIL import Image -from transformers import AutoProcessor, AutoTokenizer from verl.experimental.agent_loop.agent_loop import ( AgentLoopBase, AgentLoopOutput, - AsyncLLMServerManager, - DictConfigWrap, register, ) from verl.experimental.agent_loop.tool_parser import FunctionCall, ToolParser @@ -96,37 +93,27 @@ def __init__( @register("tool_agent") class ToolAgentLoop(AgentLoopBase): - def __init__( - self, - trainer_config: DictConfigWrap, - server_manager: AsyncLLMServerManager, - tokenizer: AutoTokenizer, - processor: AutoProcessor, - **kwargs, - ): - super().__init__(trainer_config, server_manager, tokenizer, processor, **kwargs) - config = trainer_config.config + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) # Initialize tools from config file - self.max_user_turns = config.actor_rollout_ref.rollout.multi_turn.max_user_turns - self.max_assistant_turns = config.actor_rollout_ref.rollout.multi_turn.max_assistant_turns - self.max_parallel_calls = config.actor_rollout_ref.rollout.multi_turn.max_parallel_calls - self.max_tool_response_length = config.actor_rollout_ref.rollout.multi_turn.max_tool_response_length - self.tool_response_truncate_side = config.actor_rollout_ref.rollout.multi_turn.tool_response_truncate_side - tool_config_path = config.actor_rollout_ref.rollout.multi_turn.tool_config_path + self.max_user_turns = self.rollout_config.multi_turn.max_user_turns + self.max_assistant_turns = self.rollout_config.multi_turn.max_assistant_turns + self.max_parallel_calls = self.rollout_config.multi_turn.max_parallel_calls + self.max_tool_response_length = self.rollout_config.multi_turn.max_tool_response_length + self.tool_response_truncate_side = self.rollout_config.multi_turn.tool_response_truncate_side + tool_config_path = self.rollout_config.multi_turn.tool_config_path tool_list = initialize_tools_from_config(tool_config_path) if tool_config_path else [] self.tools = {tool.name: tool for tool in tool_list} self.tool_schemas = [tool.tool_schema.model_dump(exclude_unset=True, exclude_none=True) for tool in tool_list] - self.tool_parser = ToolParser.get_tool_parser( - config.actor_rollout_ref.rollout.multi_turn.format, self.tokenizer - ) - self.tool_parser_name = config.actor_rollout_ref.rollout.multi_turn.format + self.tool_parser = ToolParser.get_tool_parser(self.rollout_config.multi_turn.format, self.tokenizer) + self.tool_parser_name = self.rollout_config.multi_turn.format - self.prompt_length = config.actor_rollout_ref.rollout.prompt_length - self.response_length = config.actor_rollout_ref.rollout.response_length + self.prompt_length = self.rollout_config.prompt_length + self.response_length = self.rollout_config.response_length # Initialize interactions from config file - self.interaction_config_file = config.actor_rollout_ref.rollout.multi_turn.interaction_config_path + self.interaction_config_file = self.rollout_config.multi_turn.interaction_config_path if self.interaction_config_file: self.interaction_map: dict[str, BaseInteraction] = self._initialize_interactions( self.interaction_config_file diff --git a/verl/experimental/fully_async_policy/agent_loop/agent_loop.py b/verl/experimental/fully_async_policy/agent_loop/agent_loop.py index 9240000c61c..88a012224eb 100644 --- a/verl/experimental/fully_async_policy/agent_loop/agent_loop.py +++ b/verl/experimental/fully_async_policy/agent_loop/agent_loop.py @@ -28,11 +28,11 @@ AsyncLLMServerManager, DictConfigWrap, _agent_loop_registry, + _get_rollout_and_model_config, get_trajectory_info, ) -from verl.experimental.agent_loop.prometheus_utils import update_prometheus_config from verl.protocol import DataProto -from verl.single_controller.ray import RayWorkerGroup +from verl.single_controller.ray import RayResourcePool, RayWorkerGroup from verl.utils.rollout_trace import ( rollout_trace_attr, rollout_trace_op, @@ -102,7 +102,7 @@ async def generate_sequences_no_post( Returns: list[AgentLoopOutput]: List of agent loop outputs, one per sample in the batch. """ - config = self.config.actor_rollout_ref.rollout + config = self.rollout_config sampling_params = dict( temperature=config.temperature, top_p=config.top_p, @@ -191,7 +191,7 @@ async def _partial_run_agent_loop( tokenizer=self.tokenizer, processor=self.processor, dataset_cls=self.dataset_cls, - dataset_config=DictConfigWrap(config=self.config.data), + data_config=DictConfigWrap(config=self.config.data), ) output: AgentLoopOutput = await agent_loop.run( sampling_params, cancellation_event=self.cancellation_event, **kwargs @@ -219,15 +219,17 @@ def __init__( self, config: DictConfig, worker_group: RayWorkerGroup = None, + rollout_resource_pool: RayResourcePool = None, reward_loop_worker_handles: list[ray.actor.ActorHandle] = None, ): self.config = config + self.rollout_config, self.model_config = _get_rollout_and_model_config(config) self.worker_group = worker_group self.reward_loop_worker_handles = reward_loop_worker_handles self.agent_loop_workers_class = FullyAsyncAgentLoopWorker # Select rollout replica class based on rollout name - rollout_name = config.actor_rollout_ref.rollout.name + rollout_name = self.rollout_config.name if rollout_name == "sglang": from verl.experimental.fully_async_policy.sglang_rollout.sglang_async_server import FullyAsyncSGLangReplica @@ -246,63 +248,6 @@ def __init__( self.server_addresses = None self.agent_loop_workers = None - @classmethod - async def create( - cls, - config: DictConfig, - worker_group: RayWorkerGroup = None, - reward_loop_worker_handles: list[ray.actor.ActorHandle] = None, - ): - instance = cls(config, worker_group, reward_loop_worker_handles) - await instance._async_init() - return instance - - async def _async_init(self): - await self._initialize_llm_servers_async() - self._init_agent_loop_workers() - - async def _initialize_llm_servers_async(self): - rollout_world_size = ( - self.config.actor_rollout_ref.rollout.tensor_model_parallel_size - * self.config.actor_rollout_ref.rollout.data_parallel_size - * self.config.actor_rollout_ref.rollout.pipeline_model_parallel_size - ) - world_size = ( - self.worker_group.world_size - if self.worker_group - else self.config.rollout.n_gpus_per_node * self.config.rollout.nnodes - ) - num_replicas = world_size // rollout_world_size - - rollout_config = self.config.actor_rollout_ref.rollout - model_config = self.config.actor_rollout_ref.model - self.rollout_replicas = [ - self.rollout_replica_class( - replica_rank=replica_rank, - config=rollout_config, - model_config=model_config, - gpus_per_node=self.config.rollout.n_gpus_per_node, - ) - for replica_rank in range(num_replicas) - ] - - if self.worker_group: - await asyncio.gather(*[server.init_hybrid(self.worker_group) for server in self.rollout_replicas]) - else: - await asyncio.gather(*[server.init_standalone() for server in self.rollout_replicas]) - - self.server_handles = [server._server_handle for server in self.rollout_replicas] - self.server_addresses = [server._server_address for server in self.rollout_replicas] - - print(f"AgentLoopManager: {self.server_addresses}") - # Update Prometheus configuration with server addresses - if rollout_config.prometheus.enable: - if rollout_config.disable_log_stats: - raise ValueError("PROMETHEUS needs disable_log_stats==False, but it is currently True.") - await asyncio.to_thread( - update_prometheus_config, rollout_config.prometheus, self.server_addresses, rollout_config.name - ) - async def generate_single_sample_async( self, sample: DataProto, diff --git a/verl/experimental/fully_async_policy/agent_loop/partial_single_turn_agent_loop.py b/verl/experimental/fully_async_policy/agent_loop/partial_single_turn_agent_loop.py index b0aef45bd67..6982184f8f6 100644 --- a/verl/experimental/fully_async_policy/agent_loop/partial_single_turn_agent_loop.py +++ b/verl/experimental/fully_async_policy/agent_loop/partial_single_turn_agent_loop.py @@ -30,9 +30,9 @@ class PartialSingleTurnAgentLoop(AgentLoopBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.prompt_length = self.config.actor_rollout_ref.rollout.prompt_length - self.response_length = self.config.actor_rollout_ref.rollout.response_length - self.apply_chat_template_kwargs = self.config.data.get("apply_chat_template_kwargs", {}) + self.prompt_length = self.rollout_config.prompt_length + self.response_length = self.rollout_config.response_length + self.apply_chat_template_kwargs = self.data_config.get("apply_chat_template_kwargs", {}) async def run(self, sampling_params: dict[str, Any], **kwargs) -> AgentLoopOutput: output: Optional[AgentLoopOutput] = kwargs.get("output", None) diff --git a/verl/experimental/fully_async_policy/agent_loop/partial_tool_agent_loop.py b/verl/experimental/fully_async_policy/agent_loop/partial_tool_agent_loop.py index 0082fc13bc8..370587f0364 100644 --- a/verl/experimental/fully_async_policy/agent_loop/partial_tool_agent_loop.py +++ b/verl/experimental/fully_async_policy/agent_loop/partial_tool_agent_loop.py @@ -33,9 +33,9 @@ class AsyncPartialToolAgentLoop(ToolAgentLoop): """ - def __init__(self, trainer_config, **kwargs): - super().__init__(trainer_config, **kwargs) - self.enable_partial_rollout = trainer_config.config.async_training.get("partial_rollout", False) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.enable_partial_rollout = self.config.async_training.get("partial_rollout", False) # async def run(self, sampling_params: dict[str, Any], **kwargs) -> AgentLoopOutput: async def run( diff --git a/verl/experimental/fully_async_policy/fully_async_main.py b/verl/experimental/fully_async_policy/fully_async_main.py index fe43abb1b6e..4e9e509475f 100644 --- a/verl/experimental/fully_async_policy/fully_async_main.py +++ b/verl/experimental/fully_async_policy/fully_async_main.py @@ -287,6 +287,9 @@ def main(config): from time import time start_time = time() + # TODO: unify rollout config with actor_rollout_ref + config.actor_rollout_ref.rollout.nnodes = config.rollout.nnodes + config.actor_rollout_ref.rollout.n_gpus_per_node = config.rollout.n_gpus_per_node run_ppo(config, task_runner_class=FullyAsyncTaskRunner) print(f"total time: {time() - start_time:.2f} seconds") diff --git a/verl/experimental/one_step_off_policy/agent_loop/agent_loop.py b/verl/experimental/one_step_off_policy/agent_loop/agent_loop.py index 2ae476df4da..85455d655b2 100644 --- a/verl/experimental/one_step_off_policy/agent_loop/agent_loop.py +++ b/verl/experimental/one_step_off_policy/agent_loop/agent_loop.py @@ -18,9 +18,7 @@ import ray from verl.experimental.agent_loop.agent_loop import AgentLoopManager -from verl.experimental.agent_loop.prometheus_utils import update_prometheus_config from verl.protocol import DataProto -from verl.single_controller.ray import RayResourcePool logger = logging.getLogger(__file__) logger.setLevel(os.getenv("VERL_LOGGING_LEVEL", "WARN")) @@ -56,54 +54,6 @@ async def generate_sequences_async(self, prompts: DataProto) -> DataProto: output.meta_info = {"timing": timing, **outputs[0].meta_info} return output - def _initialize_llm_servers(self, rollout_resource_pool: RayResourcePool): - rollout_world_size = ( - self.config.actor_rollout_ref.rollout.tensor_model_parallel_size - * self.config.actor_rollout_ref.rollout.data_parallel_size - * self.config.actor_rollout_ref.rollout.pipeline_model_parallel_size - ) - world_size = ( - self.worker_group.world_size - if self.worker_group - else self.config.rollout.n_gpus_per_node * self.config.rollout.nnodes - ) - num_replicas = world_size // rollout_world_size - - rollout_config = self.config.actor_rollout_ref.rollout - model_config = self.config.actor_rollout_ref.model - self.rollout_replicas = [ - self.rollout_replica_class( - replica_rank=replica_rank, - config=rollout_config, - model_config=model_config, - gpus_per_node=self.config.rollout.n_gpus_per_node, - ) - for replica_rank in range(num_replicas) - ] - - if self.worker_group and rollout_config.name != "trtllm": - self._run_all([server.init_hybrid(self.worker_group) for server in self.rollout_replicas]) - elif self.worker_group and rollout_config.name == "trtllm": - self._run_all( - [ - server.init_hybrid_colocated(self.worker_group, rollout_resource_pool) - for server in self.rollout_replicas - ] - ) - else: - self._run_all([server.init_standalone() for server in self.rollout_replicas]) - - self.server_handles = [server._server_handle for server in self.rollout_replicas] - self.server_addresses = [server._server_address for server in self.rollout_replicas] - - print(f"AgentLoopManager: {self.server_addresses}") - - # Update Prometheus configuration with server addresses - if rollout_config.prometheus.enable: - if rollout_config.disable_log_stats: - raise ValueError("PROMETHEUS needs disable_log_stats==False, but it is currently True.") - update_prometheus_config(rollout_config.prometheus, self.server_addresses, rollout_config.name) - async def wake_up(self): await asyncio.gather(*[replica.wake_up() for replica in self.rollout_replicas]) diff --git a/verl/experimental/one_step_off_policy/main_ppo.py b/verl/experimental/one_step_off_policy/main_ppo.py index 2c2fe6108ea..0c6ecaedf0e 100644 --- a/verl/experimental/one_step_off_policy/main_ppo.py +++ b/verl/experimental/one_step_off_policy/main_ppo.py @@ -182,6 +182,10 @@ def main(config): # Automatically set `config.trainer.device = npu` when running on Ascend NPU. auto_set_device(config) + # TODO: unify rollout config with actor_rollout_ref + config.actor_rollout_ref.rollout.nnodes = config.rollout.nnodes + config.actor_rollout_ref.rollout.n_gpus_per_node = config.rollout.n_gpus_per_node + run_ppo(config, task_runner_class=OneStepTaskRunner) print(f"total time: {time() - start_time:.2f} seconds") diff --git a/verl/experimental/one_step_off_policy/ray_trainer.py b/verl/experimental/one_step_off_policy/ray_trainer.py index 70a2a3d3d90..144632dead5 100644 --- a/verl/experimental/one_step_off_policy/ray_trainer.py +++ b/verl/experimental/one_step_off_policy/ray_trainer.py @@ -182,7 +182,7 @@ def _init_async_rollout_manager(self): from verl.experimental.one_step_off_policy.agent_loop import OneStepOffAgentLoopManager self.async_rollout_mode = True - self.async_rollout_manager = OneStepOffAgentLoopManager( + self.async_rollout_manager = OneStepOffAgentLoopManager.create( config=self.config, reward_loop_worker_handles=reward_loop_worker_handles ) diff --git a/verl/experimental/transfer_queue/ray_trainer.py b/verl/experimental/transfer_queue/ray_trainer.py index 96c6d181334..dfb2e721d66 100644 --- a/verl/experimental/transfer_queue/ray_trainer.py +++ b/verl/experimental/transfer_queue/ray_trainer.py @@ -817,7 +817,7 @@ def init_workers(self): reward_loop_worker_handles = ( self.reward_loop_manager.reward_loop_workers if enable_agent_reward_loop else None ) - self.async_rollout_manager = AgentLoopManager( + self.async_rollout_manager = AgentLoopManager.create( config=self.config, worker_group=self.actor_rollout_wg, reward_loop_worker_handles=reward_loop_worker_handles, diff --git a/verl/trainer/config/_generated_ppo_megatron_trainer.yaml b/verl/trainer/config/_generated_ppo_megatron_trainer.yaml index ea60c881619..09391ec6af3 100644 --- a/verl/trainer/config/_generated_ppo_megatron_trainer.yaml +++ b/verl/trainer/config/_generated_ppo_megatron_trainer.yaml @@ -216,6 +216,8 @@ actor_rollout_ref: _target_: verl.workers.config.RolloutConfig name: ??? mode: async + nnodes: 0 + n_gpus_per_node: ${oc.select:trainer.n_gpus_per_node,8} temperature: 1.0 top_k: -1 top_p: 1 @@ -290,6 +292,8 @@ actor_rollout_ref: engine_kwargs: {} trace: _target_: verl.workers.config.TraceConfig + project_name: ${oc.select:trainer.project_name,null} + experiment_name: ${oc.select:trainer.experiment_name,null} backend: null token2text: false max_samples_per_step_per_worker: null diff --git a/verl/trainer/config/_generated_ppo_torchtitan_trainer.yaml b/verl/trainer/config/_generated_ppo_torchtitan_trainer.yaml index b9a8b3aaf84..b923da853ec 100644 --- a/verl/trainer/config/_generated_ppo_torchtitan_trainer.yaml +++ b/verl/trainer/config/_generated_ppo_torchtitan_trainer.yaml @@ -205,6 +205,8 @@ actor_rollout_ref: _target_: verl.workers.config.RolloutConfig name: ??? mode: async + nnodes: 0 + n_gpus_per_node: ${oc.select:trainer.n_gpus_per_node,8} temperature: 1.0 top_k: -1 top_p: 1 @@ -279,6 +281,8 @@ actor_rollout_ref: engine_kwargs: {} trace: _target_: verl.workers.config.TraceConfig + project_name: ${oc.select:trainer.project_name,null} + experiment_name: ${oc.select:trainer.experiment_name,null} backend: null token2text: false max_samples_per_step_per_worker: null diff --git a/verl/trainer/config/_generated_ppo_trainer.yaml b/verl/trainer/config/_generated_ppo_trainer.yaml index 6b97103ae9f..1cdc21b1ec8 100644 --- a/verl/trainer/config/_generated_ppo_trainer.yaml +++ b/verl/trainer/config/_generated_ppo_trainer.yaml @@ -204,6 +204,8 @@ actor_rollout_ref: _target_: verl.workers.config.RolloutConfig name: ??? mode: async + nnodes: 0 + n_gpus_per_node: ${oc.select:trainer.n_gpus_per_node,8} temperature: 1.0 top_k: -1 top_p: 1 @@ -278,6 +280,8 @@ actor_rollout_ref: engine_kwargs: {} trace: _target_: verl.workers.config.TraceConfig + project_name: ${oc.select:trainer.project_name,null} + experiment_name: ${oc.select:trainer.experiment_name,null} backend: null token2text: false max_samples_per_step_per_worker: null diff --git a/verl/trainer/config/_generated_ppo_veomni_trainer.yaml b/verl/trainer/config/_generated_ppo_veomni_trainer.yaml index 4528e0d667d..ccaf6582902 100644 --- a/verl/trainer/config/_generated_ppo_veomni_trainer.yaml +++ b/verl/trainer/config/_generated_ppo_veomni_trainer.yaml @@ -186,6 +186,8 @@ actor_rollout_ref: _target_: verl.workers.config.RolloutConfig name: ??? mode: async + nnodes: 0 + n_gpus_per_node: ${oc.select:trainer.n_gpus_per_node,8} temperature: 1.0 top_k: -1 top_p: 1 @@ -260,6 +262,8 @@ actor_rollout_ref: engine_kwargs: {} trace: _target_: verl.workers.config.TraceConfig + project_name: ${oc.select:trainer.project_name,null} + experiment_name: ${oc.select:trainer.experiment_name,null} backend: null token2text: false max_samples_per_step_per_worker: null diff --git a/verl/trainer/config/rollout/rollout.yaml b/verl/trainer/config/rollout/rollout.yaml index e1a4d2dad6d..894538d1d87 100644 --- a/verl/trainer/config/rollout/rollout.yaml +++ b/verl/trainer/config/rollout/rollout.yaml @@ -7,6 +7,12 @@ name: ??? # sync: LLM, async: AsyncLLM mode: async +# Number of nodes for standalone rollout server, must be > 0 in one-step-off/fully async training. +nnodes: 0 + +# Number of GPUs per node for rollout server. +n_gpus_per_node: ${oc.select:trainer.n_gpus_per_node,8} + # Sampling temperature for rollout. temperature: 1.0 @@ -273,6 +279,12 @@ trace: # Required when using verl.utils.omega_conf_to_dataclass to instantiate dataclass configs _target_: verl.workers.config.TraceConfig + # Project name for experiment tracking (e.g., wandb) + project_name: ${oc.select:trainer.project_name,null} + + # Experiment name for run identification in tracking tools + experiment_name: ${oc.select:trainer.experiment_name,null} + # trace backend, support mlflow, weave backend: null diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py index 9d6560881be..ae43d2bad5c 100644 --- a/verl/trainer/ppo/ray_trainer.py +++ b/verl/trainer/ppo/ray_trainer.py @@ -831,7 +831,7 @@ def init_workers(self): # if enable_agent_reward_loop, we directly pass reward_loop_workers to agent loop manager # to stream reward computation with actor rollout reward_loop_worker_handles = self.reward_loop_manager.reward_loop_workers if enable_agent_reward_loop else None - self.async_rollout_manager = AgentLoopManager( + self.async_rollout_manager = AgentLoopManager.create( config=self.config, worker_group=self.actor_rollout_wg, rollout_resource_pool=actor_rollout_resource_pool, diff --git a/verl/utils/ray_utils.py b/verl/utils/ray_utils.py index 5ba20649365..eff3d91085f 100644 --- a/verl/utils/ray_utils.py +++ b/verl/utils/ray_utils.py @@ -97,9 +97,13 @@ def get_event_loop(): def auto_await(func): """Auto await a coroutine function. - If the function is called in an async context (with a running event loop), - it will return the coroutine object. Otherwise, it will block the current thread - and run the coroutine until completion. + Handles three cases: + 1. When the decorated function is called with await: returns the coroutine + so the caller can await it. + 2. When called directly and there is no running event loop: runs the + coroutine with asyncio.run() and returns the result. + 3. When called directly and the event loop is already running: runs the + coroutine (e.g. in a thread pool to avoid deadlock) and returns the result. """ @functools.wraps(func) @@ -114,9 +118,22 @@ def wrapper(*args, **kwargs): except RuntimeError: loop = None - if loop and loop.is_running(): - return coro - else: + # Case 1: No running loop -> run with asyncio.run() + if loop is None: return asyncio.run(coro) + # Case 2: Running loop -> return coro if caller will await + caller_frame = inspect.currentframe() + if caller_frame is not None: + caller_frame = caller_frame.f_back + caller_is_async = caller_frame is not None and (caller_frame.f_code.co_flags & inspect.CO_COROUTINE) != 0 + if caller_is_async: + return coro + + # Case 3: Running loop -> run coro in thread pool + # (cannot block the loop thread without deadlock) + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(asyncio.run, coro) + return future.result() + return wrapper diff --git a/verl/workers/config/rollout.py b/verl/workers/config/rollout.py index 8d0d732e263..d1d5c8f1768 100644 --- a/verl/workers/config/rollout.py +++ b/verl/workers/config/rollout.py @@ -80,6 +80,8 @@ class AgentLoopConfig(BaseConfig): @dataclass class TraceConfig(BaseConfig): + project_name: Optional[str] = None + experiment_name: Optional[str] = None backend: Optional[str] = None token2text: bool = False max_samples_per_step_per_worker: Optional[int] = None @@ -138,6 +140,8 @@ class RolloutConfig(BaseConfig): name: Optional[str] = MISSING mode: str = "async" + nnodes: int = 0 + n_gpus_per_node: int = 8 temperature: float = 1.0 top_k: int = -1