Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions evals/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Provider defaults:
| **late_runs/test_deployment_concurrency** | verifies agent can diagnose late runs caused by deployment concurrency limits | ✅ implemented | [#32](https://github.com/PrefectHQ/prefect-mcp-server/issues/32) |
| **late_runs/test_tag_concurrency** | verifies agent can diagnose late runs caused by tag-based concurrency limits | ✅ implemented | [#32](https://github.com/PrefectHQ/prefect-mcp-server/issues/32) |
| **late_runs/test_cancel_late_runs** | verifies agent can cancel all late runs for a deployment using the prefect CLI | ✅ implemented | - |
| **late_runs/test_no_late_runs** | negative case: verifies agent correctly identifies when there are NO late runs (doesn't hallucinate problems) | ✅ implemented | - |
| **test_create_reactive_automation** | verifies agent can create reactive automations | ✅ implemented | [#47](https://github.com/PrefectHQ/prefect-mcp-server/pull/47) |
| **test_trigger_deployment_run** | verifies agent can trigger deployment runs with custom parameters | ✅ implemented | - |
| **test_debug_automation_not_firing** | verifies agent can debug why an automation didn't fire due to threshold mismatch | ✅ implemented | [#62](https://github.com/PrefectHQ/prefect-mcp-server/issues/62) |
Expand Down
113 changes: 113 additions & 0 deletions evals/late_runs/test_no_late_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Negative case eval: agent should correctly identify when a deployment has NO late runs.

This tests that the agent doesn't hallucinate problems when everything is healthy.
The blog post "Demystifying Evals for AI Agents" emphasizes balanced problem sets:
"Test both the cases where a behavior should occur and where it shouldn't."

Note: We scope this to a specific deployment because the test harness is session-scoped
and other tests may create late runs. This is also more realistic - users often ask
about specific deployments.
"""

from collections.abc import Awaitable, Callable
from uuid import uuid4

import pytest
from prefect import flow
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.actions import WorkPoolCreate
from prefect.client.schemas.responses import DeploymentResponse
from prefect.states import Completed, Running
from pydantic_ai import Agent


@pytest.fixture
async def healthy_deployment(prefect_client: PrefectClient) -> DeploymentResponse:
"""Create a healthy deployment with NO late runs.

- Work pool with active workers (READY status)
- No concurrency limits blocking runs
- Flow runs in healthy states (Scheduled, Running, Completed)
"""
work_pool_name = f"healthy-pool-{uuid4().hex[:8]}"

# Create work pool with no concurrency limit
work_pool_create = WorkPoolCreate(
name=work_pool_name,
type="process",
description="Healthy work pool with active workers",
)
await prefect_client.create_work_pool(work_pool=work_pool_create)

# Send heartbeat to make it READY
await prefect_client.send_worker_heartbeat(
work_pool_name=work_pool_name,
worker_name=f"healthy-worker-{uuid4().hex[:8]}",
heartbeat_interval_seconds=30,
)

@flow(name=f"healthy-flow-{uuid4().hex[:8]}")
def healthy_flow():
return "success"

flow_id = await prefect_client.create_flow(healthy_flow)
deployment_id = await prefect_client.create_deployment(
flow_id=flow_id,
name=f"healthy-deployment-{uuid4().hex[:8]}",
work_pool_name=work_pool_name,
)
deployment = await prefect_client.read_deployment(deployment_id)

# Create flow runs in unambiguously healthy states
# Note: We intentionally omit Scheduled runs because an agent might reasonably
# flag "scheduled but not started" as concerning, even though it's not technically
# in the "Late" state. Running and Completed are unambiguously healthy.
healthy_states = [
("running-run", Running()),
("completed-run-1", Completed()),
("completed-run-2", Completed()),
]

for name_suffix, state in healthy_states:
flow_run = await prefect_client.create_flow_run_from_deployment(
deployment_id=deployment_id,
name=f"{name_suffix}-{uuid4().hex[:8]}",
)
await prefect_client.set_flow_run_state(
flow_run_id=flow_run.id, state=state, force=True
)

return deployment


async def test_no_late_runs_for_deployment(
simple_agent: Agent,
healthy_deployment: DeploymentResponse,
evaluate_response: Callable[[str, str], Awaitable[None]],
) -> None:
"""Agent should correctly identify that a specific deployment has no late runs.

This is a negative case - the agent should NOT hallucinate problems for this
deployment. We scope to a specific deployment because:
1. Other tests in the session may create late runs (shared prefect_test_harness)
2. This is more realistic - users often ask about specific deployments
"""
deployment_name = healthy_deployment.name

async with simple_agent:
result = await simple_agent.run(
f"Are there any late flow runs for the deployment '{deployment_name}'? "
"Check if any runs from this deployment have been scheduled for a while "
"but haven't started executing."
)

await evaluate_response(
f"""Does the response correctly indicate that deployment '{deployment_name}'
has NO late runs? The agent should NOT claim there are late runs for this
specific deployment. It's acceptable to say "no late runs found for this
deployment" or "runs for {deployment_name} appear healthy".

Note: The agent may mention late runs from OTHER deployments - that's fine.
The key is that it correctly identifies THIS deployment has no late runs.""",
result.output,
)
14 changes: 14 additions & 0 deletions evals/late_runs/test_work_pool_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from prefect.states import Late
from pydantic_ai import Agent

from evals._tools.spy import ToolCallSpy


class LateRunsScenario(NamedTuple):
"""Container for late runs scenario data."""
Expand Down Expand Up @@ -101,6 +103,7 @@ async def test_diagnoses_work_pool_concurrency(
reasoning_agent: Agent,
work_pool_concurrency_scenario: LateRunsScenario,
evaluate_response: Callable[[str, str], Awaitable[None]],
tool_call_spy: ToolCallSpy,
) -> None:
"""Test agent diagnoses late runs caused by work pool concurrency limit."""
work_pool_name = work_pool_concurrency_scenario.work_pool.name
Expand All @@ -111,6 +114,17 @@ async def test_diagnoses_work_pool_concurrency(
been scheduled for a while but haven't begun execution."""
)

# State verification: agent response must mention the actual work pool name
# This catches cases where the LLM evaluation might pass on vague responses
assert work_pool_name in result.output, (
f"Response must mention the specific work pool '{work_pool_name}' "
f"but got: {result.output[:200]}..."
)

# Tool verification: agent should have inspected work pools
tool_call_spy.assert_tool_was_called("get_work_pools")

# LLM evaluation for response quality
Comment on lines +117 to +127
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this prescription actually necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eval below already validates that the specific work pool is mentioned 🤷

await evaluate_response(
f"""Does this response specifically identify that work pool
'{work_pool_name}' has a concurrency limit of 1 that is causing late
Expand Down
Loading