kin/tests/test_kin_135_escalation.py
2026-03-20 22:07:27 +02:00

566 lines
25 KiB
Python

"""
Tests for KIN-135: Task return escalation mechanism.
Covers:
1. Models: record_task_return — counter increment, return_number sequencing, invalid category
2. Models: get_task_returns — history retrieval
3. Models: check_return_pattern — pattern detection, false positives prevention
4. Models: edge case — return_count behavior after task completion
5. Runner: _save_return_analyst_output — escalation pipeline creation
6. Runner: gate cannot_close records return in standard pipeline; skips in escalation pipeline
7. Context builder: return_count and return_history in PM / return_analyst context
"""
import json
import pytest
from unittest.mock import patch, MagicMock
from core.db import init_db
from core import models
from core.models import RETURN_CATEGORIES
from core.context_builder import build_context
from agents.runner import _save_return_analyst_output, run_pipeline
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB with a project and task for each test."""
c = init_db(":memory:")
models.create_project(c, "p1", "Project One", "/tmp/p1", tech_stack=["python"])
models.create_task(c, "P1-001", "p1", "Implement feature",
brief={"route_type": "feature"})
yield c
c.close()
@pytest.fixture
def conn_autocomplete():
"""Fresh in-memory DB with auto_complete execution mode on the task."""
c = init_db(":memory:")
models.create_project(c, "p1", "Project One", "/tmp/p1", tech_stack=["python"])
models.create_task(c, "P1-001", "p1", "Implement feature",
brief={"route_type": "feature"})
models.update_task(c, "P1-001", execution_mode="auto_complete")
yield c
c.close()
def _mock_subprocess(agent_output: dict) -> MagicMock:
"""Build a subprocess.run mock returning agent_output in claude JSON format."""
m = MagicMock()
m.stdout = json.dumps({"result": json.dumps(agent_output, ensure_ascii=False)})
m.stderr = ""
m.returncode = 0
return m
# ===========================================================================
# Section 1: record_task_return
# ===========================================================================
class TestRecordTaskReturn:
def test_increments_return_count_on_first_return(self, conn):
"""After first return, task.return_count == 1."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 1
def test_increments_return_count_multiple_times(self, conn):
"""After three returns with different categories, task.return_count == 3."""
for cat in ["requirements_unclear", "missing_context", "technical_blocker"]:
models.record_task_return(conn, "P1-001", cat)
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 3
def test_assigns_sequential_return_numbers(self, conn):
"""return_number is assigned as 1, 2, 3 for successive calls."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
models.record_task_return(conn, "P1-001", "technical_blocker")
returns = models.get_task_returns(conn, "P1-001")
assert [r["return_number"] for r in returns] == [1, 2, 3]
def test_invalid_category_defaults_to_missing_context(self, conn):
"""Unknown reason_category is silently coerced to 'missing_context' (fail-open)."""
result = models.record_task_return(conn, "P1-001", "totally_unknown_reason")
assert result["reason_category"] == "missing_context"
def test_valid_category_stored_unchanged(self, conn):
"""Valid reason_category is stored as-is."""
result = models.record_task_return(conn, "P1-001", "scope_too_large",
reason_text="Too much scope")
assert result["reason_category"] == "scope_too_large"
def test_reason_text_stored_correctly(self, conn):
"""Provided reason_text is stored in the returned row."""
result = models.record_task_return(conn, "P1-001", "missing_context",
reason_text="No spec attached")
assert result["reason_text"] == "No spec attached"
def test_returned_by_stored_correctly(self, conn):
"""returned_by value is stored in the returned row."""
result = models.record_task_return(conn, "P1-001", "requirements_unclear",
returned_by="pm")
assert result["returned_by"] == "pm"
def test_returns_dict_with_task_id_and_return_number(self, conn):
"""Return value is a dict containing task_id and return_number."""
result = models.record_task_return(conn, "P1-001", "requirements_unclear")
assert result["task_id"] == "P1-001"
assert result["return_number"] == 1
def test_return_numbers_independent_per_task(self, conn):
"""Two different tasks have independent return_number sequences."""
models.create_task(conn, "P1-002", "p1", "Another task")
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
models.record_task_return(conn, "P1-002", "technical_blocker")
returns_001 = models.get_task_returns(conn, "P1-001")
returns_002 = models.get_task_returns(conn, "P1-002")
assert [r["return_number"] for r in returns_001] == [1, 2]
assert returns_002[0]["return_number"] == 1
def test_all_valid_return_categories_accepted(self, conn):
"""Every value in RETURN_CATEGORIES is accepted and stored without coercion."""
models.create_task(conn, f"P1-CAT", "p1", "Category test task")
for i, cat in enumerate(RETURN_CATEGORIES):
result = models.record_task_return(conn, "P1-CAT", cat)
assert result["reason_category"] == cat, f"Category '{cat}' should be stored as-is"
# ===========================================================================
# Section 2: get_task_returns
# ===========================================================================
class TestGetTaskReturns:
def test_empty_for_task_with_no_returns(self, conn):
"""Task without any returns → empty list."""
result = models.get_task_returns(conn, "P1-001")
assert result == []
def test_returns_history_in_ascending_order(self, conn):
"""Returns are ordered by return_number ASC."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "scope_too_large")
models.record_task_return(conn, "P1-001", "technical_blocker")
returns = models.get_task_returns(conn, "P1-001")
assert len(returns) == 3
assert returns[0]["reason_category"] == "requirements_unclear"
assert returns[1]["reason_category"] == "scope_too_large"
assert returns[2]["reason_category"] == "technical_blocker"
def test_limit_parameter_caps_results(self, conn):
"""limit parameter restricts the number of returned records."""
for _ in range(5):
models.record_task_return(conn, "P1-001", "missing_context")
returns = models.get_task_returns(conn, "P1-001", limit=3)
assert len(returns) == 3
# ===========================================================================
# Section 3: check_return_pattern
# ===========================================================================
class TestCheckReturnPattern:
def test_no_history_returns_no_pattern(self, conn):
"""Task with no returns → pattern_detected=False, dominant_category=None."""
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is False
assert result["dominant_category"] is None
assert result["occurrences"] == 0
def test_single_return_no_pattern(self, conn):
"""One return → pattern_detected=False (needs >= 2 same category)."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is False
assert result["occurrences"] == 1
def test_same_category_twice_detects_pattern(self, conn):
"""Two returns with same category → pattern_detected=True."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "requirements_unclear")
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is True
assert result["dominant_category"] == "requirements_unclear"
assert result["occurrences"] == 2
def test_same_category_three_times_detects_pattern(self, conn):
"""Three returns with same category → pattern_detected=True, occurrences=3."""
for _ in range(3):
models.record_task_return(conn, "P1-001", "technical_blocker")
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is True
assert result["dominant_category"] == "technical_blocker"
assert result["occurrences"] == 3
def test_all_different_categories_no_false_positive(self, conn):
"""All different categories → no pattern detected (no false positives)."""
categories = [
"requirements_unclear", "scope_too_large", "technical_blocker",
"missing_context", "recurring_quality_fail",
]
for cat in categories:
models.record_task_return(conn, "P1-001", cat)
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is False
def test_two_different_categories_no_false_positive(self, conn):
"""Two returns with two different categories → no pattern (each appears once)."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "scope_too_large")
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is False
def test_dominant_category_is_most_frequent(self, conn):
"""When multiple categories appear, dominant_category is the most frequent one."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
result = models.check_return_pattern(conn, "P1-001")
assert result["dominant_category"] == "requirements_unclear"
assert result["occurrences"] == 2
# ===========================================================================
# Section 4: Edge case — return_count after task completion
# ===========================================================================
class TestReturnCountAfterTaskCompletion:
def test_return_count_not_reset_when_task_set_to_done(self, conn):
"""return_count persists after task status transitions to 'done'.
KIN-135 spec mentions a reset on success, but update_task() does not
implement this. This test documents the current (non-resetting) behavior.
"""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
models.update_task(conn, "P1-001", status="done")
task = models.get_task(conn, "P1-001")
# Documents current behavior: counter persists (not reset to 0)
assert task["return_count"] == 2
def test_return_count_starts_at_zero_for_new_task(self, conn):
"""Newly created task has return_count == 0."""
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 0
# ===========================================================================
# Section 5: _save_return_analyst_output
# ===========================================================================
class TestSaveReturnAnalystOutput:
def _make_analyst_result(self, escalate: bool,
root_cause: str = "Recurring scope issue",
refined_brief: str = "Focus on X only") -> dict:
"""Build a mock return_analyst agent result dict."""
output = {
"escalate_to_dept_head": escalate,
"root_cause_analysis": root_cause,
"refined_brief": refined_brief,
"pattern_summary": "3 returns with similar root cause",
}
return {"success": True, "raw_output": json.dumps(output)}
def test_creates_escalation_pipeline_when_escalate_true(self, conn):
"""escalate_to_dept_head=true → escalation pipeline created, escalated=True."""
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["escalated"] is True
assert result["escalation_pipeline_id"] is not None
assert result["dept_head"] is not None
def test_no_escalation_when_escalate_false(self, conn):
"""escalate_to_dept_head=false → no escalation pipeline, escalated=False."""
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=False),
)
assert result["escalated"] is False
assert result["escalation_pipeline_id"] is None
assert result["dept_head"] is None
def test_dept_head_sec_category_maps_to_security_head(self, conn):
"""SEC task category → dept_head is security_head."""
models.update_task(conn, "P1-001", category="SEC")
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "security_head"
def test_dept_head_ui_category_maps_to_frontend_head(self, conn):
"""UI task category → dept_head is frontend_head."""
models.update_task(conn, "P1-001", category="UI")
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "frontend_head"
def test_dept_head_api_category_maps_to_backend_head(self, conn):
"""API task category → dept_head is backend_head."""
models.update_task(conn, "P1-001", category="API")
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "backend_head"
def test_dept_head_infra_category_maps_to_infra_head(self, conn):
"""INFRA task category → dept_head is infra_head."""
models.update_task(conn, "P1-001", category="INFRA")
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "infra_head"
def test_unknown_category_defaults_to_cto_advisor(self, conn):
"""Unknown/missing task category defaults to cto_advisor."""
# Task category not set (None by default)
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "cto_advisor"
def test_escalation_pipeline_has_pipeline_type_escalation_in_db(self, conn):
"""Created escalation pipeline has pipeline_type='escalation' in the DB."""
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
pipeline_id = result["escalation_pipeline_id"]
row = conn.execute(
"SELECT pipeline_type FROM pipelines WHERE id = ?", (pipeline_id,)
).fetchone()
assert row["pipeline_type"] == "escalation"
def test_no_escalation_when_output_is_invalid_json(self, conn):
"""Non-parseable JSON output → fail-open, escalated=False."""
bad_result = {"success": True, "raw_output": "not valid json {{{"}
result = _save_return_analyst_output(conn, "P1-001", "p1", bad_result)
assert result["escalated"] is False
def test_no_escalation_when_output_is_empty(self, conn):
"""Empty output → fail-open, escalated=False."""
result = _save_return_analyst_output(
conn, "P1-001", "p1",
{"success": True, "raw_output": ""},
)
assert result["escalated"] is False
def test_no_escalation_when_output_missing_field(self, conn):
"""Output without escalate_to_dept_head field → escalated=False."""
output = {"root_cause_analysis": "some analysis", "refined_brief": "refined"}
result = _save_return_analyst_output(
conn, "P1-001", "p1",
{"success": True, "raw_output": json.dumps(output)},
)
assert result["escalated"] is False
def test_accepts_dict_raw_output(self, conn):
"""raw_output as dict (not string) is handled via json.dumps path."""
output_dict = {
"escalate_to_dept_head": True,
"root_cause_analysis": "cause",
"refined_brief": "brief",
}
result = _save_return_analyst_output(
conn, "P1-001", "p1",
{"success": True, "raw_output": output_dict},
)
assert result["escalated"] is True
# ===========================================================================
# Section 6: Gate cannot_close — return recording in pipeline
# ===========================================================================
class TestGateCannotCloseRecordsReturn:
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_gate_rejection_increments_return_count_in_standard_pipeline(
self, mock_run, mock_hooks, mock_followup, conn_autocomplete
):
"""Gate cannot_close in standard pipeline → task.return_count increases by 1."""
conn = conn_autocomplete
mock_run.return_value = _mock_subprocess(
{"verdict": "changes_requested", "reason": "Missing tests"}
)
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
steps = [{"role": "reviewer", "brief": "review"}]
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 1, (
"Gate rejection in standard pipeline should increment return_count"
)
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_gate_approval_does_not_increment_return_count(
self, mock_run, mock_hooks, mock_followup, conn_autocomplete
):
"""Gate approved → task.return_count stays 0."""
conn = conn_autocomplete
mock_run.return_value = _mock_subprocess(
{"verdict": "approved", "reason": ""}
)
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
steps = [{"role": "reviewer", "brief": "review"}]
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 0, (
"Gate approval should not increment return_count"
)
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_gate_rejection_records_return_with_recurring_quality_fail_category(
self, mock_run, mock_hooks, mock_followup, conn_autocomplete
):
"""Gate rejection uses 'recurring_quality_fail' as reason_category."""
conn = conn_autocomplete
mock_run.return_value = _mock_subprocess(
{"verdict": "changes_requested", "reason": "Need unit tests"}
)
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
steps = [{"role": "reviewer", "brief": "review"}]
run_pipeline(conn, "P1-001", steps)
returns = models.get_task_returns(conn, "P1-001")
assert len(returns) == 1
assert returns[0]["reason_category"] == "recurring_quality_fail"
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_gate_rejection_skips_return_recording_in_escalation_pipeline(
self, mock_run, mock_hooks, mock_followup, conn_autocomplete
):
"""Gate rejection in escalation pipeline → return_count NOT incremented.
Escalation pipelines must not trigger further return tracking to avoid
infinite loops.
"""
conn = conn_autocomplete
mock_run.return_value = _mock_subprocess(
{"verdict": "changes_requested", "reason": "Still not good"}
)
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
# Intercept create_pipeline and update_pipeline to force pipeline_type='escalation'
_real_create = models.create_pipeline
_real_update = models.update_pipeline
def _create_as_escalation(db, *args, **kwargs):
result = _real_create(db, *args, **kwargs)
db.execute(
"UPDATE pipelines SET pipeline_type = 'escalation' WHERE id = ?",
(result["id"],),
)
db.commit()
return {**result, "pipeline_type": "escalation"}
def _update_keeping_escalation(db, id, **kwargs):
result = _real_update(db, id, **kwargs)
return {**result, "pipeline_type": "escalation"} if result else result
with patch("agents.runner.models.create_pipeline", side_effect=_create_as_escalation), \
patch("agents.runner.models.update_pipeline", side_effect=_update_keeping_escalation):
steps = [{"role": "reviewer", "brief": "review"}]
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 0, (
"Gate rejection in escalation pipeline must NOT increment return_count"
)
# ===========================================================================
# Section 7: Context builder — return_count in PM / return_analyst context
# ===========================================================================
class TestContextBuilderReturnHistory:
def test_pm_context_has_return_count_zero_when_no_returns(self, conn):
"""PM context includes return_count=0 when task has no returns."""
ctx = build_context(conn, "P1-001", "pm", "p1")
assert ctx.get("return_count") == 0
def test_pm_context_has_empty_return_history_when_no_returns(self, conn):
"""PM context includes return_history=[] when task has no returns."""
ctx = build_context(conn, "P1-001", "pm", "p1")
assert ctx.get("return_history") == []
def test_pm_context_includes_return_count_when_returns_exist(self, conn):
"""PM context reflects actual return_count when returns have been recorded."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
ctx = build_context(conn, "P1-001", "pm", "p1")
assert ctx["return_count"] == 3
def test_pm_context_includes_return_history_when_returns_exist(self, conn):
"""PM context includes up to 5 most recent returns in return_history."""
for _ in range(3):
models.record_task_return(conn, "P1-001", "requirements_unclear")
ctx = build_context(conn, "P1-001", "pm", "p1")
assert len(ctx["return_history"]) == 3
assert ctx["return_history"][0]["reason_category"] == "requirements_unclear"
def test_return_analyst_context_has_return_count(self, conn):
"""return_analyst context includes return_count reflecting actual returns."""
models.record_task_return(conn, "P1-001", "scope_too_large")
models.record_task_return(conn, "P1-001", "scope_too_large")
ctx = build_context(conn, "P1-001", "return_analyst", "p1")
assert ctx["return_count"] == 2
def test_return_analyst_context_has_full_return_history(self, conn):
"""return_analyst context includes full return history (limit=20)."""
for i in range(7):
models.record_task_return(conn, "P1-001", "technical_blocker")
ctx = build_context(conn, "P1-001", "return_analyst", "p1")
assert len(ctx["return_history"]) == 7
def test_non_pm_role_does_not_get_return_count(self, conn):
"""Non-PM roles (e.g., debugger) do not get return_count in context."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
ctx = build_context(conn, "P1-001", "debugger", "p1")
# debugger context should not include return_count
assert "return_count" not in ctx