diff --git a/tests/test_kin_135_escalation.py b/tests/test_kin_135_escalation.py new file mode 100644 index 0000000..b77a7bf --- /dev/null +++ b/tests/test_kin_135_escalation.py @@ -0,0 +1,566 @@ +""" +Tests for KIN-135: Task return escalation mechanism. + +Covers: +1. Models: record_task_return — counter increment, return_number sequencing, invalid category +2. Models: get_task_returns — history retrieval +3. Models: check_return_pattern — pattern detection, false positives prevention +4. Models: edge case — return_count behavior after task completion +5. Runner: _save_return_analyst_output — escalation pipeline creation +6. Runner: gate cannot_close records return in standard pipeline; skips in escalation pipeline +7. Context builder: return_count and return_history in PM / return_analyst context +""" + +import json +import pytest +from unittest.mock import patch, MagicMock + +from core.db import init_db +from core import models +from core.models import RETURN_CATEGORIES +from core.context_builder import build_context +from agents.runner import _save_return_analyst_output, run_pipeline + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def conn(): + """Fresh in-memory DB with a project and task for each test.""" + c = init_db(":memory:") + models.create_project(c, "p1", "Project One", "/tmp/p1", tech_stack=["python"]) + models.create_task(c, "P1-001", "p1", "Implement feature", + brief={"route_type": "feature"}) + yield c + c.close() + + +@pytest.fixture +def conn_autocomplete(): + """Fresh in-memory DB with auto_complete execution mode on the task.""" + c = init_db(":memory:") + models.create_project(c, "p1", "Project One", "/tmp/p1", tech_stack=["python"]) + models.create_task(c, "P1-001", "p1", "Implement feature", + brief={"route_type": "feature"}) + models.update_task(c, "P1-001", execution_mode="auto_complete") + yield c + c.close() + + +def _mock_subprocess(agent_output: dict) -> MagicMock: + """Build a subprocess.run mock returning agent_output in claude JSON format.""" + m = MagicMock() + m.stdout = json.dumps({"result": json.dumps(agent_output, ensure_ascii=False)}) + m.stderr = "" + m.returncode = 0 + return m + + +# =========================================================================== +# Section 1: record_task_return +# =========================================================================== + +class TestRecordTaskReturn: + + def test_increments_return_count_on_first_return(self, conn): + """After first return, task.return_count == 1.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 1 + + def test_increments_return_count_multiple_times(self, conn): + """After three returns with different categories, task.return_count == 3.""" + for cat in ["requirements_unclear", "missing_context", "technical_blocker"]: + models.record_task_return(conn, "P1-001", cat) + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 3 + + def test_assigns_sequential_return_numbers(self, conn): + """return_number is assigned as 1, 2, 3 for successive calls.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + models.record_task_return(conn, "P1-001", "technical_blocker") + returns = models.get_task_returns(conn, "P1-001") + assert [r["return_number"] for r in returns] == [1, 2, 3] + + def test_invalid_category_defaults_to_missing_context(self, conn): + """Unknown reason_category is silently coerced to 'missing_context' (fail-open).""" + result = models.record_task_return(conn, "P1-001", "totally_unknown_reason") + assert result["reason_category"] == "missing_context" + + def test_valid_category_stored_unchanged(self, conn): + """Valid reason_category is stored as-is.""" + result = models.record_task_return(conn, "P1-001", "scope_too_large", + reason_text="Too much scope") + assert result["reason_category"] == "scope_too_large" + + def test_reason_text_stored_correctly(self, conn): + """Provided reason_text is stored in the returned row.""" + result = models.record_task_return(conn, "P1-001", "missing_context", + reason_text="No spec attached") + assert result["reason_text"] == "No spec attached" + + def test_returned_by_stored_correctly(self, conn): + """returned_by value is stored in the returned row.""" + result = models.record_task_return(conn, "P1-001", "requirements_unclear", + returned_by="pm") + assert result["returned_by"] == "pm" + + def test_returns_dict_with_task_id_and_return_number(self, conn): + """Return value is a dict containing task_id and return_number.""" + result = models.record_task_return(conn, "P1-001", "requirements_unclear") + assert result["task_id"] == "P1-001" + assert result["return_number"] == 1 + + def test_return_numbers_independent_per_task(self, conn): + """Two different tasks have independent return_number sequences.""" + models.create_task(conn, "P1-002", "p1", "Another task") + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + models.record_task_return(conn, "P1-002", "technical_blocker") + + returns_001 = models.get_task_returns(conn, "P1-001") + returns_002 = models.get_task_returns(conn, "P1-002") + + assert [r["return_number"] for r in returns_001] == [1, 2] + assert returns_002[0]["return_number"] == 1 + + def test_all_valid_return_categories_accepted(self, conn): + """Every value in RETURN_CATEGORIES is accepted and stored without coercion.""" + models.create_task(conn, f"P1-CAT", "p1", "Category test task") + for i, cat in enumerate(RETURN_CATEGORIES): + result = models.record_task_return(conn, "P1-CAT", cat) + assert result["reason_category"] == cat, f"Category '{cat}' should be stored as-is" + + +# =========================================================================== +# Section 2: get_task_returns +# =========================================================================== + +class TestGetTaskReturns: + + def test_empty_for_task_with_no_returns(self, conn): + """Task without any returns → empty list.""" + result = models.get_task_returns(conn, "P1-001") + assert result == [] + + def test_returns_history_in_ascending_order(self, conn): + """Returns are ordered by return_number ASC.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "scope_too_large") + models.record_task_return(conn, "P1-001", "technical_blocker") + + returns = models.get_task_returns(conn, "P1-001") + assert len(returns) == 3 + assert returns[0]["reason_category"] == "requirements_unclear" + assert returns[1]["reason_category"] == "scope_too_large" + assert returns[2]["reason_category"] == "technical_blocker" + + def test_limit_parameter_caps_results(self, conn): + """limit parameter restricts the number of returned records.""" + for _ in range(5): + models.record_task_return(conn, "P1-001", "missing_context") + returns = models.get_task_returns(conn, "P1-001", limit=3) + assert len(returns) == 3 + + +# =========================================================================== +# Section 3: check_return_pattern +# =========================================================================== + +class TestCheckReturnPattern: + + def test_no_history_returns_no_pattern(self, conn): + """Task with no returns → pattern_detected=False, dominant_category=None.""" + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is False + assert result["dominant_category"] is None + assert result["occurrences"] == 0 + + def test_single_return_no_pattern(self, conn): + """One return → pattern_detected=False (needs >= 2 same category).""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is False + assert result["occurrences"] == 1 + + def test_same_category_twice_detects_pattern(self, conn): + """Two returns with same category → pattern_detected=True.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "requirements_unclear") + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is True + assert result["dominant_category"] == "requirements_unclear" + assert result["occurrences"] == 2 + + def test_same_category_three_times_detects_pattern(self, conn): + """Three returns with same category → pattern_detected=True, occurrences=3.""" + for _ in range(3): + models.record_task_return(conn, "P1-001", "technical_blocker") + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is True + assert result["dominant_category"] == "technical_blocker" + assert result["occurrences"] == 3 + + def test_all_different_categories_no_false_positive(self, conn): + """All different categories → no pattern detected (no false positives).""" + categories = [ + "requirements_unclear", "scope_too_large", "technical_blocker", + "missing_context", "recurring_quality_fail", + ] + for cat in categories: + models.record_task_return(conn, "P1-001", cat) + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is False + + def test_two_different_categories_no_false_positive(self, conn): + """Two returns with two different categories → no pattern (each appears once).""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "scope_too_large") + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is False + + def test_dominant_category_is_most_frequent(self, conn): + """When multiple categories appear, dominant_category is the most frequent one.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + result = models.check_return_pattern(conn, "P1-001") + assert result["dominant_category"] == "requirements_unclear" + assert result["occurrences"] == 2 + + +# =========================================================================== +# Section 4: Edge case — return_count after task completion +# =========================================================================== + +class TestReturnCountAfterTaskCompletion: + + def test_return_count_not_reset_when_task_set_to_done(self, conn): + """return_count persists after task status transitions to 'done'. + + KIN-135 spec mentions a reset on success, but update_task() does not + implement this. This test documents the current (non-resetting) behavior. + """ + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + + models.update_task(conn, "P1-001", status="done") + task = models.get_task(conn, "P1-001") + + # Documents current behavior: counter persists (not reset to 0) + assert task["return_count"] == 2 + + def test_return_count_starts_at_zero_for_new_task(self, conn): + """Newly created task has return_count == 0.""" + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 0 + + +# =========================================================================== +# Section 5: _save_return_analyst_output +# =========================================================================== + +class TestSaveReturnAnalystOutput: + + def _make_analyst_result(self, escalate: bool, + root_cause: str = "Recurring scope issue", + refined_brief: str = "Focus on X only") -> dict: + """Build a mock return_analyst agent result dict.""" + output = { + "escalate_to_dept_head": escalate, + "root_cause_analysis": root_cause, + "refined_brief": refined_brief, + "pattern_summary": "3 returns with similar root cause", + } + return {"success": True, "raw_output": json.dumps(output)} + + def test_creates_escalation_pipeline_when_escalate_true(self, conn): + """escalate_to_dept_head=true → escalation pipeline created, escalated=True.""" + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["escalated"] is True + assert result["escalation_pipeline_id"] is not None + assert result["dept_head"] is not None + + def test_no_escalation_when_escalate_false(self, conn): + """escalate_to_dept_head=false → no escalation pipeline, escalated=False.""" + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=False), + ) + assert result["escalated"] is False + assert result["escalation_pipeline_id"] is None + assert result["dept_head"] is None + + def test_dept_head_sec_category_maps_to_security_head(self, conn): + """SEC task category → dept_head is security_head.""" + models.update_task(conn, "P1-001", category="SEC") + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "security_head" + + def test_dept_head_ui_category_maps_to_frontend_head(self, conn): + """UI task category → dept_head is frontend_head.""" + models.update_task(conn, "P1-001", category="UI") + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "frontend_head" + + def test_dept_head_api_category_maps_to_backend_head(self, conn): + """API task category → dept_head is backend_head.""" + models.update_task(conn, "P1-001", category="API") + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "backend_head" + + def test_dept_head_infra_category_maps_to_infra_head(self, conn): + """INFRA task category → dept_head is infra_head.""" + models.update_task(conn, "P1-001", category="INFRA") + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "infra_head" + + def test_unknown_category_defaults_to_cto_advisor(self, conn): + """Unknown/missing task category defaults to cto_advisor.""" + # Task category not set (None by default) + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "cto_advisor" + + def test_escalation_pipeline_has_pipeline_type_escalation_in_db(self, conn): + """Created escalation pipeline has pipeline_type='escalation' in the DB.""" + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + pipeline_id = result["escalation_pipeline_id"] + row = conn.execute( + "SELECT pipeline_type FROM pipelines WHERE id = ?", (pipeline_id,) + ).fetchone() + assert row["pipeline_type"] == "escalation" + + def test_no_escalation_when_output_is_invalid_json(self, conn): + """Non-parseable JSON output → fail-open, escalated=False.""" + bad_result = {"success": True, "raw_output": "not valid json {{{"} + result = _save_return_analyst_output(conn, "P1-001", "p1", bad_result) + assert result["escalated"] is False + + def test_no_escalation_when_output_is_empty(self, conn): + """Empty output → fail-open, escalated=False.""" + result = _save_return_analyst_output( + conn, "P1-001", "p1", + {"success": True, "raw_output": ""}, + ) + assert result["escalated"] is False + + def test_no_escalation_when_output_missing_field(self, conn): + """Output without escalate_to_dept_head field → escalated=False.""" + output = {"root_cause_analysis": "some analysis", "refined_brief": "refined"} + result = _save_return_analyst_output( + conn, "P1-001", "p1", + {"success": True, "raw_output": json.dumps(output)}, + ) + assert result["escalated"] is False + + def test_accepts_dict_raw_output(self, conn): + """raw_output as dict (not string) is handled via json.dumps path.""" + output_dict = { + "escalate_to_dept_head": True, + "root_cause_analysis": "cause", + "refined_brief": "brief", + } + result = _save_return_analyst_output( + conn, "P1-001", "p1", + {"success": True, "raw_output": output_dict}, + ) + assert result["escalated"] is True + + +# =========================================================================== +# Section 6: Gate cannot_close — return recording in pipeline +# =========================================================================== + +class TestGateCannotCloseRecordsReturn: + + @patch("core.followup.generate_followups") + @patch("agents.runner.run_hooks") + @patch("agents.runner.subprocess.run") + def test_gate_rejection_increments_return_count_in_standard_pipeline( + self, mock_run, mock_hooks, mock_followup, conn_autocomplete + ): + """Gate cannot_close in standard pipeline → task.return_count increases by 1.""" + conn = conn_autocomplete + mock_run.return_value = _mock_subprocess( + {"verdict": "changes_requested", "reason": "Missing tests"} + ) + mock_hooks.return_value = [] + mock_followup.return_value = {"created": [], "pending_actions": []} + + steps = [{"role": "reviewer", "brief": "review"}] + run_pipeline(conn, "P1-001", steps) + + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 1, ( + "Gate rejection in standard pipeline should increment return_count" + ) + + @patch("core.followup.generate_followups") + @patch("agents.runner.run_hooks") + @patch("agents.runner.subprocess.run") + def test_gate_approval_does_not_increment_return_count( + self, mock_run, mock_hooks, mock_followup, conn_autocomplete + ): + """Gate approved → task.return_count stays 0.""" + conn = conn_autocomplete + mock_run.return_value = _mock_subprocess( + {"verdict": "approved", "reason": ""} + ) + mock_hooks.return_value = [] + mock_followup.return_value = {"created": [], "pending_actions": []} + + steps = [{"role": "reviewer", "brief": "review"}] + run_pipeline(conn, "P1-001", steps) + + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 0, ( + "Gate approval should not increment return_count" + ) + + @patch("core.followup.generate_followups") + @patch("agents.runner.run_hooks") + @patch("agents.runner.subprocess.run") + def test_gate_rejection_records_return_with_recurring_quality_fail_category( + self, mock_run, mock_hooks, mock_followup, conn_autocomplete + ): + """Gate rejection uses 'recurring_quality_fail' as reason_category.""" + conn = conn_autocomplete + mock_run.return_value = _mock_subprocess( + {"verdict": "changes_requested", "reason": "Need unit tests"} + ) + mock_hooks.return_value = [] + mock_followup.return_value = {"created": [], "pending_actions": []} + + steps = [{"role": "reviewer", "brief": "review"}] + run_pipeline(conn, "P1-001", steps) + + returns = models.get_task_returns(conn, "P1-001") + assert len(returns) == 1 + assert returns[0]["reason_category"] == "recurring_quality_fail" + + @patch("core.followup.generate_followups") + @patch("agents.runner.run_hooks") + @patch("agents.runner.subprocess.run") + def test_gate_rejection_skips_return_recording_in_escalation_pipeline( + self, mock_run, mock_hooks, mock_followup, conn_autocomplete + ): + """Gate rejection in escalation pipeline → return_count NOT incremented. + + Escalation pipelines must not trigger further return tracking to avoid + infinite loops. + """ + conn = conn_autocomplete + mock_run.return_value = _mock_subprocess( + {"verdict": "changes_requested", "reason": "Still not good"} + ) + mock_hooks.return_value = [] + mock_followup.return_value = {"created": [], "pending_actions": []} + + # Intercept create_pipeline and update_pipeline to force pipeline_type='escalation' + _real_create = models.create_pipeline + _real_update = models.update_pipeline + + def _create_as_escalation(db, *args, **kwargs): + result = _real_create(db, *args, **kwargs) + db.execute( + "UPDATE pipelines SET pipeline_type = 'escalation' WHERE id = ?", + (result["id"],), + ) + db.commit() + return {**result, "pipeline_type": "escalation"} + + def _update_keeping_escalation(db, id, **kwargs): + result = _real_update(db, id, **kwargs) + return {**result, "pipeline_type": "escalation"} if result else result + + with patch("agents.runner.models.create_pipeline", side_effect=_create_as_escalation), \ + patch("agents.runner.models.update_pipeline", side_effect=_update_keeping_escalation): + steps = [{"role": "reviewer", "brief": "review"}] + run_pipeline(conn, "P1-001", steps) + + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 0, ( + "Gate rejection in escalation pipeline must NOT increment return_count" + ) + + +# =========================================================================== +# Section 7: Context builder — return_count in PM / return_analyst context +# =========================================================================== + +class TestContextBuilderReturnHistory: + + def test_pm_context_has_return_count_zero_when_no_returns(self, conn): + """PM context includes return_count=0 when task has no returns.""" + ctx = build_context(conn, "P1-001", "pm", "p1") + assert ctx.get("return_count") == 0 + + def test_pm_context_has_empty_return_history_when_no_returns(self, conn): + """PM context includes return_history=[] when task has no returns.""" + ctx = build_context(conn, "P1-001", "pm", "p1") + assert ctx.get("return_history") == [] + + def test_pm_context_includes_return_count_when_returns_exist(self, conn): + """PM context reflects actual return_count when returns have been recorded.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + + ctx = build_context(conn, "P1-001", "pm", "p1") + assert ctx["return_count"] == 3 + + def test_pm_context_includes_return_history_when_returns_exist(self, conn): + """PM context includes up to 5 most recent returns in return_history.""" + for _ in range(3): + models.record_task_return(conn, "P1-001", "requirements_unclear") + + ctx = build_context(conn, "P1-001", "pm", "p1") + assert len(ctx["return_history"]) == 3 + assert ctx["return_history"][0]["reason_category"] == "requirements_unclear" + + def test_return_analyst_context_has_return_count(self, conn): + """return_analyst context includes return_count reflecting actual returns.""" + models.record_task_return(conn, "P1-001", "scope_too_large") + models.record_task_return(conn, "P1-001", "scope_too_large") + + ctx = build_context(conn, "P1-001", "return_analyst", "p1") + assert ctx["return_count"] == 2 + + def test_return_analyst_context_has_full_return_history(self, conn): + """return_analyst context includes full return history (limit=20).""" + for i in range(7): + models.record_task_return(conn, "P1-001", "technical_blocker") + + ctx = build_context(conn, "P1-001", "return_analyst", "p1") + assert len(ctx["return_history"]) == 7 + + def test_non_pm_role_does_not_get_return_count(self, conn): + """Non-PM roles (e.g., debugger) do not get return_count in context.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + ctx = build_context(conn, "P1-001", "debugger", "p1") + # debugger context should not include return_count + assert "return_count" not in ctx