File tree Expand file tree Collapse file tree 1 file changed +7
-5
lines changed
verl/experimental/fully_async_policy Expand file tree Collapse file tree 1 file changed +7
-5
lines changed Original file line number Diff line number Diff line change @@ -519,14 +519,16 @@ async def _processor_worker(self):
519519 self .paused = True
520520 while self .active_tasks :
521521 async with self .lock :
522- # After acquiring the lock, the number of active_tasks may change, need to be verified again
523- if self .active_tasks :
524- done_tasks , self .active_tasks = await asyncio .wait (
525- self .active_tasks , return_when = asyncio .FIRST_COMPLETED
526- )
522+ tasks_to_wait = set (self .active_tasks ) if self .active_tasks else set ()
523+
524+ if tasks_to_wait :
525+ done_tasks , _ = await asyncio .wait (tasks_to_wait , return_when = asyncio .FIRST_COMPLETED )
527526 for task in done_tasks :
528527 await task
529528
529+ async with self .lock :
530+ self .active_tasks -= done_tasks
531+
530532 async with self .lock :
531533 while self .paused :
532534 self .idle_start_time = time .time ()
You can’t perform that action at this time.
0 commit comments