Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions examples/gdpo_trainer/run_gdpo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
export DATA_DIR="verl/dataset/rlla_4k"
export BASE_MODEL="Qwen/Qwen2.5-1.5B-Instruct"
export EXPERIMENT_NAME="qwen2.5-1.5B-GDPO"
export CKPT_DIR="verl/results/gdpo"

PROJECT_DIR="$(pwd)"

trainer_n_gpus_per_node=8
trainer_nnodes=1

python3 -u -m verl.trainer.main_ppo \
algorithm.adv_estimator=gdpo \
data.train_files=$DATA_DIR/train.parquet \
data.val_files=$DATA_DIR/test.parquet \
data.train_batch_size=32 \
data.val_batch_size=16 \
data.max_prompt_length=2048 \
data.max_response_length=1024 \
actor_rollout_ref.model.path=$BASE_MODEL \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.model.use_remove_padding=True \
actor_rollout_ref.actor.ppo_mini_batch_size=4 \
actor_rollout_ref.actor.use_dynamic_bsz=True \
actor_rollout_ref.actor.use_kl_loss=False \
actor_rollout_ref.actor.kl_loss_coef=0.001 \
actor_rollout_ref.actor.kl_loss_type=low_var_kl \
actor_rollout_ref.model.enable_gradient_checkpointing=True \
actor_rollout_ref.actor.fsdp_config.param_offload=False \
actor_rollout_ref.actor.fsdp_config.grad_offload=False \
actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \
actor_rollout_ref.rollout.tensor_model_parallel_size=1 \
actor_rollout_ref.rollout.name=vllm \
actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \
actor_rollout_ref.rollout.n=4 \
actor_rollout_ref.ref.fsdp_config.param_offload=True \
algorithm.kl_ctrl.kl_coef=0.001 \
reward.custom_reward_function.path="$PROJECT_DIR/verl/experimental/reward_loop/reward_manager/gdpo.py" \
reward.custom_reward_function.name=compute_score \
trainer.critic_warmup=0 \
trainer.logger=['console'] \
trainer.project_name=Var_inspect \
trainer.n_gpus_per_node=$trainer_n_gpus_per_node \
trainer.experiment_name=$EXPERIMENT_NAME \
trainer.n_gpus_per_node=$trainer_nnodes \
trainer.nnodes=1 \
trainer.save_freq=5 \
trainer.test_freq=10 \
trainer.default_local_dir=$CKPT_DIR \
trainer.total_epochs=15 \
trainer.val_before_train=False 2>&1 | tee ${LOG_PATH}
102 changes: 102 additions & 0 deletions verl/experimental/reward_loop/reward_manager/gdpo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect

from verl import DataProto
from verl.experimental.reward_loop.reward_manager import register
from verl.experimental.reward_loop.reward_manager.base import RewardManagerBase
from verl.utils.reward_score import default_compute_score


@register("gdpo")
class GDPOdRewardManager(RewardManagerBase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There appears to be a typo in the class name GDPOdRewardManager. It should likely be GDPORewardManager to align with the algorithm name ("gdpo") it's registered for. This will improve clarity and prevent confusion.

Suggested change
class GDPOdRewardManager(RewardManagerBase):
class GDPORewardManager(RewardManagerBase):

"""GDPO Reward Manager."""

def __init__(self, config, tokenizer, compute_score, reward_router_address=None, reward_model_tokenizer=None):
super().__init__(config, tokenizer, compute_score)
self.compute_score = compute_score or default_compute_score
self.is_async_reward_score = inspect.iscoroutinefunction(self.compute_score)

# GDPO Reward Config
self.max_resp_len = config.reward.get("reward_kwargs", {}).get("max_resp_len", None)

self.reward_router_address = reward_router_address
self.reward_model_tokenizer = reward_model_tokenizer

async def run_single(self, data: DataProto) -> dict:
assert len(data) == 1, "Only support single data item"
data_item = data[0]
response_ids = data_item.batch["responses"]
response_length = response_ids.shape[-1]
valid_response_length = data_item.batch["attention_mask"][-response_length:].sum()
valid_response_ids = response_ids[:valid_response_length]

data_source = data_item.non_tensor_batch["data_source"]
ground_truth = data_item.non_tensor_batch["reward_model"]["ground_truth"]
extra_info = data_item.non_tensor_batch.get("extra_info", {})

response_str = await self.loop.run_in_executor(
None, lambda: self.tokenizer.decode(valid_response_ids, skip_special_tokens=True)
)
extra_reward_kwargs = (
{
"reward_router_address": self.reward_router_address,
"reward_model_tokenizer": self.reward_model_tokenizer,
}
if self.reward_router_address is not None
else {}
)
if self.is_async_reward_score:
result = await self.compute_score(
data_source=data_source,
solution_str=response_str,
ground_truth=ground_truth,
extra_info=extra_info,
**extra_reward_kwargs,
)
else:
result = await self.loop.run_in_executor(
None,
lambda: self.compute_score(
data_source=data_source,
solution_str=response_str,
ground_truth=ground_truth,
extra_info=extra_info,
**extra_reward_kwargs,
),
)

# result = {
# "score": score,
# "score_list": [fomrat_score, correctness_score],
# }

# return = {"reward_score": reward, "reward_extra_info": reward_extra_info}
# reward_extra_info = {"score": score, "score_list": [fomrat_score, correctness_score]}

reward_extra_info = {}

score: float
if isinstance(result, dict):
score = result["score"]
for key, value in result.items():
reward_extra_info[key] = value
else:
score = result
reward_extra_info["acc"] = score
Comment on lines +92 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current logic for processing the result from compute_score is incorrect when the function returns a tuple, as verl.utils.reward_score.rlla.compute_score does. This will assign a tuple to the score variable, which will cause a runtime error later when it's expected to be a float (e.g., in torch.tensor(scores)). You need to handle the tuple case explicitly to correctly extract the score and other reward components.

Suggested change
if isinstance(result, dict):
score = result["score"]
for key, value in result.items():
reward_extra_info[key] = value
else:
score = result
reward_extra_info["acc"] = score
if isinstance(result, dict):
score = result["score"]
reward_extra_info.update(result)
elif isinstance(result, tuple) and len(result) == 4:
score, format_score, correctness_score, length_score = result
reward_extra_info["score"] = score
reward_extra_info["format_score"] = format_score
reward_extra_info["correctness_score"] = correctness_score
reward_extra_info["length_score"] = length_score
else:
score = result
reward_extra_info["acc"] = score


reward = score

return {"reward_score": reward, "reward_extra_info": reward_extra_info}
68 changes: 68 additions & 0 deletions verl/trainer/ppo/core_algos.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class AdvantageEstimator(str, Enum):

GAE = "gae"
GRPO = "grpo"
GDPO = "gdpo"
REINFORCE_PLUS_PLUS = "reinforce_plus_plus"
REINFORCE_PLUS_PLUS_BASELINE = "reinforce_plus_plus_baseline"
REMAX = "remax"
Expand Down Expand Up @@ -2370,3 +2371,70 @@ def compute_policy_loss_bypass_mode(
pg_metrics.update(rollout_metrics)

return pg_loss, pg_metrics


@register_adv_est(AdvantageEstimator.GDPO) # or simply: @register_adv_est("gdpo")
def compute_gdpo_outcome_advantage(
token_level_rewards: torch.Tensor,
response_mask: torch.Tensor,
index: np.ndarray,
epsilon: float = 1e-6,
norm_adv_by_std_in_grpo: bool = True,
config: Optional[AlgoConfig] = None,
score_list: Optional[list[torch.Tensor]] = None,
) -> tuple[torch.Tensor, torch.Tensor]:
"""
Compute advantage for GDPO, operating only on Outcome reward
(with only one scalar reward for each response).

Args:
token_level_rewards: `(torch.Tensor)`
shape is (bs, response_length)
response_mask: `(torch.Tensor)`
shape is (bs, response_length)
index: `(np.ndarray)`
index array for grouping
epsilon: `(float)`
small value to avoid division by zero
norm_adv_by_std_in_grpo: `(bool)`
whether to scale the GRPO advantage
config: `(Optional[AlgoConfig])`
algorithm configuration object
score_list: `(Optional[list[torch.Tensor]])`
multi scores for GDPO

Note:
Ref GDPO (https://arxiv.org/abs/2601.05242).

Returns:
advantages: `(torch.Tensor)`
shape is (bs, response_length)
Returns: `(torch.Tensor)`
shape is (bs, response_length)
"""
if score_list is None:
score_list = [token_level_rewards]
# for debug
print("------no multi-score-find---------")
num_scores = len(score_list)
new_advantage = None
for i in range(num_scores):
token_level_scores = score_list[i]

normalized_score, _ = compute_grpo_outcome_advantage(
token_level_rewards=token_level_scores,
response_mask=response_mask,
index=index,
epsilon=epsilon,
norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo,
config=config,
)

if new_advantage is None:
new_advantage = normalized_score
else:
new_advantage += normalized_score

advantages = verl_F.masked_whiten(new_advantage, response_mask) * response_mask

return advantages, advantages
52 changes: 52 additions & 0 deletions verl/trainer/ppo/ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,58 @@ def compute_advantage(
rollout_is_weights = data.batch.get("rollout_is_weights", None)
adv_kwargs["rollout_is_weights"] = rollout_is_weights

if adv_estimator == AdvantageEstimator.GDPO:
assert "score_list" in data.batch, (
"GDPO need multi-scores. "
"Please change the config: reward.custom_reward_function.path to point gdpo.py or "
"change the reward function to compute multi-scores."
)

# prompt_length = prompt_ids.size(1)
# response_length = attention_mask[:, prompt_length:].sum(dim=1) - 1
# rm_scores = torch.zeros_like(response_mask, dtype=torch.float32)
# rm_scores[torch.arange(response_mask.size(0)), response_length] =
# torch.tensor(scores, dtype=torch.float32)
# batch["rm_scores"] = rm_scores

# batch = TensorDict(
# {
# "prompts": prompt_ids, # [bsz, prompt_length]
# "responses": response_ids, # [bsz, response_length]
# "response_mask": response_mask, # [bsz, response_length]
# "input_ids": input_ids, # [bsz, prompt_length + response_length]
# "attention_mask": attention_mask, # [bsz, prompt_length + response_length]
# # position_ids: [bsz, 3, prompt_length + response_length]
# or [bsz, prompt_length + response_length]
# "position_ids": position_ids,
# **optional_outputs,
# },
# batch_size=len(inputs),
# )
score_list = []
multi_score_tensor = torch.tensor(
data.non_tensor_batch["score_list"], dtype=torch.float32
) # # [bsz, score_num, 1]
print(f"----------multi_score_tensor:{multi_score_tensor.shape}")

for i in range(multi_score_tensor.shape[1]):
rm_score = multi_score_tensor[:, i]
prompt_length = data.batch["prompts"].size(1)
response_length = data.batch["attention_mask"][:, prompt_length:].sum(dim=1) - 1
rm_scores = torch.zeros_like(data.batch["response_mask"], dtype=torch.float32)
rm_scores[torch.arange(data.batch["response_mask"].size(0)), response_length] = torch.tensor(
rm_score, dtype=torch.float32
)
score_list.append(rm_scores)

# sum_score_tensor = data.batch["token_level_rewards"]

# rm_scores[torch.arange(rm_scores.size(0)), valid_response_length - 1] = torch.tensor(
# scores, dtype=torch.float32
# )
adv_kwargs["score_list"] = score_list

# np.array([[format_score,correct_score] for info in reward_extra_infos])
# calculate advantage estimator
advantages, returns = adv_estimator_fn(**adv_kwargs)
data.batch["advantages"] = advantages
Expand Down
Loading