1616import pytest
1717from prefect import flow
1818from prefect .client .orchestration import PrefectClient
19- from prefect .client .schemas .objects import FlowRun
19+ from prefect .client .schemas .actions import GlobalConcurrencyLimitCreate
20+ from prefect .client .schemas .objects import FlowRun , GlobalConcurrencyLimit
2021from prefect .states import Crashed
2122from pydantic_ai import Agent
2223
2930
3031
3132@pytest .fixture
32- async def crashed_lease_renewal_flow_run (prefect_client : PrefectClient ) -> FlowRun :
33- """Create a flow run that crashed due to lease renewal failure."""
33+ async def concurrency_limit (prefect_client : PrefectClient ) -> GlobalConcurrencyLimit :
34+ """Create a global concurrency limit that the flow was using."""
35+ limit_name = f"api-calls-{ uuid4 ().hex [:8 ]} "
36+ await prefect_client .create_global_concurrency_limit (
37+ concurrency_limit = GlobalConcurrencyLimitCreate (
38+ name = limit_name ,
39+ limit = 5 ,
40+ )
41+ )
42+ return await prefect_client .read_global_concurrency_limit_by_name (limit_name )
43+
3444
35- @flow (name = f"data-pipeline-{ uuid4 ().hex [:8 ]} " )
36- def data_pipeline ():
45+ @pytest .fixture
46+ async def crashed_lease_renewal_flow_run (
47+ prefect_client : PrefectClient ,
48+ concurrency_limit : GlobalConcurrencyLimit ,
49+ ) -> FlowRun :
50+ """Create a flow run that crashed due to lease renewal failure.
51+
52+ The flow was using a concurrency limit when its lease renewal failed,
53+ causing Prefect to crash the run to prevent over-allocation.
54+ """
55+
56+ @flow (name = f"api-consumer-{ uuid4 ().hex [:8 ]} " )
57+ def api_consumer ():
3758 return "completed"
3859
3960 # Run the flow to create a flow run
40- state = data_pipeline (return_state = True )
61+ state = api_consumer (return_state = True )
4162 flow_run = await prefect_client .read_flow_run (state .state_details .flow_run_id )
4263
4364 # Force to Crashed state with lease renewal error message
@@ -54,13 +75,15 @@ def data_pipeline():
5475async def test_diagnoses_lease_renewal_failure (
5576 simple_agent : Agent ,
5677 crashed_lease_renewal_flow_run : FlowRun ,
78+ concurrency_limit : GlobalConcurrencyLimit ,
5779 evaluate_response : Callable [[str , str ], Awaitable [None ]],
5880 tool_call_spy : ToolCallSpy ,
5981) -> None :
6082 """Test agent identifies concurrency lease renewal failure as crash cause."""
83+ # Don't mention concurrency - let the agent discover the cause from the crash state
6184 prompt = (
62- f"Why did my flow run '{ crashed_lease_renewal_flow_run .name } ' crash "
63- "unexpectedly during execution? It was running fine and then suddenly crashed. "
85+ f"My flow run '{ crashed_lease_renewal_flow_run .name } ' crashed unexpectedly. "
86+ "What happened? "
6487 )
6588
6689 async with simple_agent :
@@ -74,5 +97,4 @@ async def test_diagnoses_lease_renewal_failure(
7497 result .output ,
7598 )
7699
77- # Agent must use get_flow_runs to retrieve the crash details
78100 tool_call_spy .assert_tool_was_called ("get_flow_runs" )
0 commit comments