diff --git a/examples/gdpo_trainer/run_qwen1_5b_gdpo.sh b/examples/gdpo_trainer/run_qwen1_5b_gdpo.sh new file mode 100644 index 00000000000..1b882404c5a --- /dev/null +++ b/examples/gdpo_trainer/run_qwen1_5b_gdpo.sh @@ -0,0 +1,58 @@ +export DATA_DIR="verl/dataset/rlla_4k" +export BASE_MODEL="Qwen/Qwen2.5-1.5B-Instruct" +export EXPERIMENT_NAME="qwen2.5-1.5B-GDPO" +export CKPT_DIR="verl/results/gdpo" + +# Env variables for computing score in rlla.py +export REFINEDREWARD=0 +export COARSEREWARD=0 +export CORRECTMAX1=0 +export MAX1STEP30MAX3=0 +export SCHEDULEREWARD=0 +export SCHEDULELENGTH=0 + +PROJECT_DIR="$(pwd)" + +trainer_n_gpus_per_node=8 +trainer_nnodes=1 + +python3 -u -m verl.trainer.main_ppo \ + algorithm.adv_estimator=gdpo \ + data.train_files=$DATA_DIR/train.parquet \ + data.val_files=$DATA_DIR/test.parquet \ + data.train_batch_size=32 \ + data.val_batch_size=16 \ + data.max_prompt_length=2048 \ + data.max_response_length=1024 \ + actor_rollout_ref.model.path=$BASE_MODEL \ + actor_rollout_ref.actor.optim.lr=1e-6 \ + actor_rollout_ref.model.use_remove_padding=True \ + actor_rollout_ref.actor.ppo_mini_batch_size=4 \ + actor_rollout_ref.actor.use_dynamic_bsz=True \ + actor_rollout_ref.actor.use_kl_loss=False \ + actor_rollout_ref.actor.kl_loss_coef=0.001 \ + actor_rollout_ref.actor.kl_loss_type=low_var_kl \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.actor.fsdp_config.param_offload=False \ + actor_rollout_ref.actor.fsdp_config.grad_offload=False \ + actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \ + actor_rollout_ref.rollout.tensor_model_parallel_size=1 \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ + actor_rollout_ref.rollout.n=4 \ + actor_rollout_ref.ref.fsdp_config.param_offload=True \ + algorithm.kl_ctrl.kl_coef=0.001 \ + reward.custom_reward_function.path="$PROJECT_DIR/verl/utils/reward_score/rlla.py" \ + reward.custom_reward_function.name=compute_score \ + trainer.critic_warmup=0 \ + trainer.logger=['console'] \ + trainer.project_name=Var_inspect \ + trainer.n_gpus_per_node=$trainer_n_gpus_per_node \ + trainer.experiment_name=$EXPERIMENT_NAME \ + trainer.n_gpus_per_node=$trainer_nnodes \ + trainer.nnodes=1 \ + trainer.save_freq=5 \ + trainer.test_freq=10 \ + trainer.default_local_dir=$CKPT_DIR \ + trainer.total_epochs=15 \ + trainer.val_before_train=False 2>&1 | tee ${LOG_PATH} \ No newline at end of file diff --git a/verl/experimental/reward_loop/reward_manager/gdpo.py b/verl/experimental/reward_loop/reward_manager/gdpo.py new file mode 100644 index 00000000000..a7a1ef98bcb --- /dev/null +++ b/verl/experimental/reward_loop/reward_manager/gdpo.py @@ -0,0 +1,94 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect + +from verl import DataProto +from verl.experimental.reward_loop.reward_manager import register +from verl.experimental.reward_loop.reward_manager.base import RewardManagerBase +from verl.utils.reward_score import default_compute_score + + +@register("gdpo") +class GDPOdRewardManager(RewardManagerBase): + """GDPO Reward Manager.""" + + def __init__(self, config, tokenizer, compute_score, reward_router_address=None, reward_model_tokenizer=None): + super().__init__(config, tokenizer, compute_score) + self.compute_score = compute_score or default_compute_score + self.is_async_reward_score = inspect.iscoroutinefunction(self.compute_score) + + # GDPO Reward Config + self.max_resp_len = config.reward.get("reward_kwargs", {}).get("max_resp_len", None) + + self.reward_router_address = reward_router_address + self.reward_model_tokenizer = reward_model_tokenizer + + async def run_single(self, data: DataProto) -> dict: + assert len(data) == 1, "Only support single data item" + data_item = data[0] + response_ids = data_item.batch["responses"] + response_length = response_ids.shape[-1] + valid_response_length = data_item.batch["attention_mask"][-response_length:].sum() + valid_response_ids = response_ids[:valid_response_length] + + data_source = data_item.non_tensor_batch["data_source"] + ground_truth = data_item.non_tensor_batch["reward_model"]["ground_truth"] + extra_info = data_item.non_tensor_batch.get("extra_info", {}) + + response_str = await self.loop.run_in_executor( + None, lambda: self.tokenizer.decode(valid_response_ids, skip_special_tokens=True) + ) + extra_reward_kwargs = ( + { + "reward_router_address": self.reward_router_address, + "reward_model_tokenizer": self.reward_model_tokenizer, + } + if self.reward_router_address is not None + else {} + ) + if self.is_async_reward_score: + result = await self.compute_score( + data_source=data_source, + solution_str=response_str, + ground_truth=ground_truth, + extra_info=extra_info, + **extra_reward_kwargs, + ) + else: + result = await self.loop.run_in_executor( + None, + lambda: self.compute_score( + data_source=data_source, + solution_str=response_str, + ground_truth=ground_truth, + extra_info=extra_info, + **extra_reward_kwargs, + ), + ) + + reward_extra_info = {} + + score: float + if isinstance(result, dict): + score = result["score"] + for key, value in result.items(): + reward_extra_info[key] = value + else: + score = result + reward_extra_info["acc"] = score + + reward = score + + return {"reward_score": reward, "reward_extra_info": reward_extra_info} diff --git a/verl/trainer/ppo/core_algos.py b/verl/trainer/ppo/core_algos.py index b222dc1b705..0a7cc95c18f 100644 --- a/verl/trainer/ppo/core_algos.py +++ b/verl/trainer/ppo/core_algos.py @@ -96,6 +96,7 @@ class AdvantageEstimator(str, Enum): GAE = "gae" GRPO = "grpo" + GDPO = "gdpo" REINFORCE_PLUS_PLUS = "reinforce_plus_plus" REINFORCE_PLUS_PLUS_BASELINE = "reinforce_plus_plus_baseline" REMAX = "remax" @@ -2370,3 +2371,69 @@ def compute_policy_loss_bypass_mode( pg_metrics.update(rollout_metrics) return pg_loss, pg_metrics + + +@register_adv_est(AdvantageEstimator.GDPO) # or simply: @register_adv_est("gdpo") +def compute_gdpo_outcome_advantage( + token_level_rewards: torch.Tensor, + response_mask: torch.Tensor, + index: np.ndarray, + epsilon: float = 1e-6, + norm_adv_by_std_in_grpo: bool = True, + config: Optional[AlgoConfig] = None, + score_list: Optional[list[torch.Tensor]] = None, +) -> tuple[torch.Tensor, torch.Tensor]: + """ + Compute advantage for GDPO with multi scalar rewards for each response. + (normalization first, then sum) + + Args: + token_level_rewards: `(torch.Tensor)` + shape is (bs, response_length) + response_mask: `(torch.Tensor)` + shape is (bs, response_length) + index: `(np.ndarray)` + index array for grouping + epsilon: `(float)` + small value to avoid division by zero + norm_adv_by_std_in_grpo: `(bool)` + whether to scale the GRPO advantage + config: `(Optional[AlgoConfig])` + algorithm configuration object + score_list: `(Optional[list[torch.Tensor]])` + multi scores for GDPO + + Note: + Ref GDPO (https://arxiv.org/abs/2601.05242). + + Returns: + advantages: `(torch.Tensor)` + shape is (bs, response_length) + Returns: `(torch.Tensor)` + shape is (bs, response_length) + """ + if score_list is None: # single score + score_list = [token_level_rewards] + + num_scores = len(score_list) + new_advantage = None + for i in range(num_scores): + token_level_scores = score_list[i] + + normalized_score, _ = compute_grpo_outcome_advantage( + token_level_rewards=token_level_scores, + response_mask=response_mask, + index=index, + epsilon=epsilon, + norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo, + config=config, + ) + + if new_advantage is None: + new_advantage = normalized_score + else: + new_advantage += normalized_score + + advantages = verl_F.masked_whiten(new_advantage, response_mask) * response_mask + + return advantages, advantages diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py index 9d6560881be..278d5278b0d 100644 --- a/verl/trainer/ppo/ray_trainer.py +++ b/verl/trainer/ppo/ray_trainer.py @@ -211,6 +211,30 @@ def compute_advantage( rollout_is_weights = data.batch.get("rollout_is_weights", None) adv_kwargs["rollout_is_weights"] = rollout_is_weights + if adv_estimator == AdvantageEstimator.GDPO: + assert "score_list" in data.batch, ( + "GDPO need multi-scores. " + "Please change the config: reward.custom_reward_function.path to point gdpo.py or " + "change the reward function to compute multi-scores." + ) + + score_list = [] + multi_score_tensor = torch.tensor( + data.non_tensor_batch["score_list"], dtype=torch.float32 + ) # # [bsz, score_num, 1] + + for i in range(multi_score_tensor.shape[1]): + rm_score = multi_score_tensor[:, i] + prompt_length = data.batch["prompts"].size(1) + response_length = data.batch["attention_mask"][:, prompt_length:].sum(dim=1) - 1 + rm_scores = torch.zeros_like(data.batch["response_mask"], dtype=torch.float32) + rm_scores[torch.arange(data.batch["response_mask"].size(0)), response_length] = torch.tensor( + rm_score, dtype=torch.float32 + ) + score_list.append(rm_scores) + + adv_kwargs["score_list"] = score_list + # calculate advantage estimator advantages, returns = adv_estimator_fn(**adv_kwargs) data.batch["advantages"] = advantages diff --git a/verl/utils/reward_score/rlla.py b/verl/utils/reward_score/rlla.py new file mode 100644 index 00000000000..a96dcad1c6a --- /dev/null +++ b/verl/utils/reward_score/rlla.py @@ -0,0 +1,324 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import re +from collections import Counter + + +def match_score(list1, list2): + """Compute a similarity score considering element frequency, ignoring order.""" + if list1 == list2: + return 1.0 + + if os.getenv("REFINEDREWARD", 0) == "1": + print("REFINEDREWARD is set to 1, so strict match is used") + if list1 != list2: + return 0.0 + + if not list1 or not list2: + return 0.0 + + count1 = Counter(list1) # Frequency count for list1 + count2 = Counter(list2) # Frequency count for list2 + + intersection = sum(min(count1[k], count2[k]) for k in count1.keys() & count2.keys()) + max_possible = len(list1) + len(list2) - intersection + + return intersection / max_possible if max_possible > 0 else 0.0 + + +# custoimzed reward functions: format +def customize_format_reward_func(completions, answer, step, max_possible_reward, min_possible_reward, **kwargs): + if str(os.getenv("MAX1STEP30MAX3", 0)) == "1": + print("MAX1STEP30MAX3 is set to 1, so max 1 -> 30 steps -> max 3") + if step >= 30: + max_possible_reward = max_possible_reward / 2 + min_possible_reward = min_possible_reward / 2 + else: + max_possible_reward = max_possible_reward + min_possible_reward = min_possible_reward + + # schedule reward + if str(os.getenv("SCHEDULEREWARD", 0)) == "1": + print("SCHEDULEREWARD is set to 1, so schedule reward is used") + max_possible_reward = 2 - (2 - max_possible_reward) * step / 150 + min_possible_reward = -2 + (2 + min_possible_reward) * step / 150 + if max_possible_reward < 1.0: + max_possible_reward = 1.0 + if min_possible_reward > -1.0: + min_possible_reward = -1.0 + + rewards = [] + responses = [completion[0]["content"] for completion in completions] + + print("\n======= Answer ======= ") + print(answer[0]) + print("\n======= Responses ======= ") + for idx, response in enumerate(responses): + print(f"*** Response {idx + 1}***\n{response}") + + for response, ans in zip(responses, answer, strict=False): + reward = min_possible_reward + if "" in ans and "" not in ans: + pattern = r"^.*?\n.*?$" + if ( + re.search(pattern, response, re.DOTALL) + and response.count("") == 1 + and response.count("") == 1 + ): + reward = max_possible_reward + elif "" not in ans and "" in ans: + pattern = r"^.*?\n\n.*?\n$" + if ( + re.search(pattern, response, re.DOTALL) + and response.count("") == 1 + and response.count("") == 1 + ): + reward = max_possible_reward + elif "" in ans and "" in ans: + pattern = r"^.*?\n\n.*?\n\n.*?$" + if ( + re.search(pattern, response, re.DOTALL) + and response.count("") == 1 + and response.count("") == 1 + and response.count("") == 1 + and response.count("") == 1 + ): + reward = max_possible_reward + else: + pattern = r"^.*?$" + if re.search(pattern, response, re.DOTALL): + reward = max_possible_reward + + rewards.append(reward) + + print("\n======= Reward for =======") + print("Reward function for is called ...") + print(rewards) + return rewards + + +# customized reward functions: length +def customize_length_reward_func(completions, answer, step, max_possible_reward, min_possible_reward, **kwargs): + # schedule length + if os.getenv("SCHEDULELENGTH", 0) == "1": + print("SCHEDULELENGTH is set to 1, so schedule max reward for length is used") + max_reward_len = (640 - 384) * step / 105 + 384 + else: + max_reward_len = 512 + + """Reward function that gives higher scores to longer completions.""" + responses = [completion[0]["content"] for completion in completions] + rewards = [] + + for response, ans in zip(responses, answer, strict=False): + if "" not in response or "" not in response: + rewards.append(min_possible_reward) + continue + think_responses = response.split("")[-1].split("")[0].strip() + reward = round(len(think_responses.split()) / max_reward_len, 2) + if reward > 1.0: + reward = 1.0 + + final_reward = reward * (max_possible_reward - min_possible_reward) + min_possible_reward + rewards.append(final_reward) + + print("\n======= Reward for =======") + print("Reward function for is called ...") + print(rewards) + return rewards + + +def compute_tool_call_reward(gt_tools, pd_tools, max_possible_reward, min_possible_reward): + if gt_tools == pd_tools: + print("Max possible score:", "Exact Match!") + print("Score:", max_possible_reward) + return max_possible_reward + + if os.getenv("COARSEREWARD", 0) == "1": + print("COARSEREWARD is set to 1, so coarse reward is used") + if gt_tools != pd_tools: + return min_possible_reward + + gt_names = [tool["name"] for tool in gt_tools] + pd_names = [tool["name"] for tool in pd_tools] + score = match_score(list(gt_names), list(pd_names)) + + local_max_possible = 1.0 + used_pd_indices = set() # Keep track of matched pd_tools + + for gt_tool in gt_tools: + gt_name = gt_tool["name"] + gt_params = gt_tool["parameters"] + + if str(os.getenv("INTERMEDIATEREWARD", 0)) == "1": + print("INTERMEDIATEREWARD is set to 1, so local max possible is changed") + local_max_possible += 1.0 + else: + local_max_possible += 1.0 + len(gt_params) + + best_match = None + best_match_score = 0.0 + best_match_index = -1 + + # Find the best matching unused pd_tool + for i, pd_tool in enumerate(pd_tools): + if i in used_pd_indices or pd_tool["name"] != gt_name: + continue + + if str(os.getenv("INTERMEDIATEREWARD", 0)) == "1": + if gt_tool == pd_tool: + best_match = pd_tool + best_match_index = i + best_match_score = 1.0 + break + else: + continue + + pd_params = pd_tool["parameters"] + param_score = match_score(list(gt_params.keys()), list(pd_params.keys())) + + # Calculate correctness score for parameter values + correctness_score = sum(1.0 for k, v in gt_params.items() if k in pd_params and pd_params[k] == v) + + total_score = param_score + correctness_score + + if total_score > best_match_score: + best_match_score = total_score + best_match = pd_tool + best_match_index = i + + if best_match: + used_pd_indices.add(best_match_index) + score += best_match_score + + print() + print("Max possible score:", local_max_possible) + print("Score:", score) + + return (max_possible_reward - min_possible_reward) * score / local_max_possible + min_possible_reward + + +# custoimzed reward functions: tool call correctness +def customize_correctness_reward_tool(completions, answer, step, max_possible_reward, min_possible_reward, **kwargs): + if str(os.getenv("MAX1STEP30MAX3", 0)) == "1": + print("MAX1STEP30MAX3 is set to 1, so max 1 -> 30 steps -> max 3") + if step < 30: + max_possible_reward = max_possible_reward / 3 + min_possible_reward = min_possible_reward / 3 + else: + max_possible_reward = max_possible_reward + min_possible_reward = min_possible_reward + + if str(os.getenv("SCHEDULEREWARD", 0)) == "1": + print("SCHEDULEREWARD is set to 1, so schedule reward is used") + max_possible_reward = (max_possible_reward - 2) * step / 150 + 2 + min_possible_reward = (min_possible_reward + 2) * step / 150 - 2 + if max_possible_reward > 3.0: + max_possible_reward = 3.0 + if min_possible_reward < -3.0: + min_possible_reward = -3.0 + + responses = [completion[0]["content"] for completion in completions] + rewards = [] + + for response, ans in zip(responses, answer, strict=False): + reward = 0.0 + + if "" not in ans: + # if "" not in response and "" not in response: + # reward = max_possible_reward + # else: + # reward = min_possible_reward + rewards.append(reward) + continue + + gt_tool_call = ans.split("")[1].split("")[0].strip() + gt_tools = gt_tool_call.split("\n") + gt_tools = [json.loads(tool) for tool in gt_tools] # each diction contains "name" and "parameter" + + try: + # Change here as a constrint in training: if the format is not correct, + # directly give the lowest possible score + assert "" in response + assert "" in response + pd_tools = response.split("")[1].split("")[0].strip().split("\n") + pd_tools = [json.loads(tool) for tool in pd_tools] + reward = compute_tool_call_reward( + gt_tools, pd_tools, max_possible_reward, min_possible_reward + ) # top reward is 2 + except Exception: + reward = min_possible_reward + + rewards.append(reward) + + print("\n======= Reward for =======") + print("Reward function for correctness is called ...") + print(rewards) + return rewards + + +def compute_score(solution_str, ground_truth, step=0): + """The scoring function for GSM8k. + + Reference: Trung, Luong, et al. "Reft: Reasoning with reinforced fine-tuning." + Proceedings of the 62nd Annual Meeting of the Association for + Computational Linguistics (Volume 1: Long Papers). 2024. + + Args: + solution_str: the solution text + ground_truth: the ground truth + method: the method to extract the solution, choices are 'strict' and 'flexible' + format_score: the score for the format + score: the score for the correct answer + """ + exp_name = str(os.getenv("EXPERIMENT_NAME", "")) + if "llama" in exp_name: + predict_str = ( + solution_str.split("<|start_header_id|>assistant<|end_header_id|>")[-1].split("<|eot_id|>")[0].strip() + ) + elif "qwen" in exp_name: + predict_str = solution_str.split("<|im_start|>assistant")[-1].split("<|im_end|>")[0].strip() + else: + raise NotImplementedError(f"Unknown model name: {exp_name}") + + if str(os.getenv("CORRECTMAX1", 0)) == "1": + print("CORRECTMAX1 is set to 1, so max score is set to 1") + tool_max_possible = 1.0 + tool_min_possible = -1.0 + else: + tool_max_possible = 3.0 + tool_min_possible = -3.0 + + format_max_possible = 1.0 + format_min_possible = 0.0 + + completions = [[{"role": "assistant", "content": predict_str}]] + answer = [ground_truth] + + fomrat_score = customize_format_reward_func(completions, answer, step, format_max_possible, format_min_possible)[0] + correctness_score = customize_correctness_reward_tool( + completions, answer, step, tool_max_possible, tool_min_possible + )[0] + + score = fomrat_score + correctness_score + + result = { + "score": score, + "score_list": [fomrat_score, correctness_score], + } + + return result