1414from uuid import uuid4
1515
1616import pytest
17- from prefect import flow
17+ from prefect import flow , get_run_logger
1818from prefect .client .orchestration import PrefectClient
1919from prefect .client .schemas .actions import GlobalConcurrencyLimitCreate
2020from prefect .client .schemas .objects import FlowRun , GlobalConcurrencyLimit
2121from prefect .states import Crashed
2222from pydantic_ai import Agent
2323
24- from evals ._tools .spy import ToolCallSpy
25-
2624LEASE_RENEWAL_ERROR = (
2725 "Concurrency lease renewal failed - slots are no longer reserved. "
2826 "Terminating execution to prevent over-allocation."
3129
3230@pytest .fixture
3331async def concurrency_limit (prefect_client : PrefectClient ) -> GlobalConcurrencyLimit :
34- """Create a global concurrency limit that the flow was using ."""
35- limit_name = f"api-calls -{ uuid4 ().hex [:8 ]} "
32+ """Create a global concurrency limit."""
33+ limit_name = f"database-connections -{ uuid4 ().hex [:8 ]} "
3634 await prefect_client .create_global_concurrency_limit (
3735 concurrency_limit = GlobalConcurrencyLimitCreate (
3836 name = limit_name ,
39- limit = 5 ,
37+ limit = 1 ,
4038 )
4139 )
4240 return await prefect_client .read_global_concurrency_limit_by_name (limit_name )
4341
4442
4543@pytest .fixture
46- async def crashed_lease_renewal_flow_run (
44+ async def crashed_flow_run (
4745 prefect_client : PrefectClient ,
4846 concurrency_limit : GlobalConcurrencyLimit ,
4947) -> FlowRun :
5048 """Create a flow run that crashed due to lease renewal failure.
5149
52- The flow was using a concurrency limit when its lease renewal failed ,
53- causing Prefect to crash the run to prevent over-allocation .
50+ The flow logs show it acquired a concurrency slot before crashing ,
51+ matching the real user experience from GitHub issues .
5452 """
5553
56- @flow (name = f"api-consumer-{ uuid4 ().hex [:8 ]} " )
57- def api_consumer ():
54+ @flow (name = f"db-sync-job-{ uuid4 ().hex [:8 ]} " )
55+ def db_sync_job ():
56+ logger = get_run_logger ()
57+ logger .info (f"Acquired concurrency slot for '{ concurrency_limit .name } '" )
58+ logger .info ("Starting database sync operation" )
59+ logger .info ("Processing batch 1 of 5" )
60+ logger .info ("Processing batch 2 of 5" )
5861 return "completed"
5962
60- # Run the flow to create a flow run
61- state = api_consumer (return_state = True )
63+ state = db_sync_job (return_state = True )
6264 flow_run = await prefect_client .read_flow_run (state .state_details .flow_run_id )
6365
64- # Force to Crashed state with lease renewal error message
66+ # Simulate the crash that occurs when lease renewal fails
6567 crashed_state = Crashed (message = LEASE_RENEWAL_ERROR )
6668 await prefect_client .set_flow_run_state (
6769 flow_run_id = flow_run .id ,
@@ -74,15 +76,12 @@ def api_consumer():
7476
7577async def test_diagnoses_lease_renewal_failure (
7678 simple_agent : Agent ,
77- crashed_lease_renewal_flow_run : FlowRun ,
78- concurrency_limit : GlobalConcurrencyLimit ,
79+ crashed_flow_run : FlowRun ,
7980 evaluate_response : Callable [[str , str ], Awaitable [None ]],
80- tool_call_spy : ToolCallSpy ,
8181) -> None :
8282 """Test agent identifies concurrency lease renewal failure as crash cause."""
8383 prompt = (
84- f"My flow run '{ crashed_lease_renewal_flow_run .name } ' crashed unexpectedly. "
85- "What happened?"
84+ f"My flow run '{ crashed_flow_run .name } ' crashed unexpectedly. What happened?"
8685 )
8786
8887 async with simple_agent :
@@ -95,5 +94,3 @@ async def test_diagnoses_lease_renewal_failure(
9594 "terminated because the lease could not be renewed." ,
9695 result .output ,
9796 )
98-
99- tool_call_spy .assert_tool_was_called ("get_flow_runs" )
0 commit comments