kin/tests/test_kin_135_escalation.py
2026-03-21 08:18:11 +02:00

580 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tests for KIN-135: Task return escalation mechanism.
Covers:
1. Models: record_task_return — counter increment, return_number sequencing, invalid category
2. Models: get_task_returns — history retrieval
3. Models: check_return_pattern — pattern detection, false positives prevention
4. Models: edge case — return_count behavior after task completion
5. Runner: _save_return_analyst_output — escalation pipeline creation
6. Runner: gate cannot_close records return in standard pipeline; skips in escalation pipeline
7. Context builder: return_count and return_history in PM / return_analyst context
"""
import json
import pytest
from unittest.mock import patch, MagicMock
from core.db import init_db
from core import models
from core.models import RETURN_CATEGORIES
from core.context_builder import build_context
from agents.runner import _save_return_analyst_output, run_pipeline, _AUTO_RETURN_MAX
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB with a project and task for each test."""
c = init_db(":memory:")
models.create_project(c, "p1", "Project One", "/tmp/p1", tech_stack=["python"])
models.create_task(c, "P1-001", "p1", "Implement feature",
brief={"route_type": "feature"})
yield c
c.close()
@pytest.fixture
def conn_autocomplete():
"""Fresh in-memory DB with auto_complete execution mode on the task."""
c = init_db(":memory:")
models.create_project(c, "p1", "Project One", "/tmp/p1", tech_stack=["python"])
models.create_task(c, "P1-001", "p1", "Implement feature",
brief={"route_type": "feature"})
models.update_task(c, "P1-001", execution_mode="auto_complete")
yield c
c.close()
def _mock_subprocess(agent_output: dict) -> MagicMock:
"""Build a subprocess.run mock returning agent_output in claude JSON format."""
m = MagicMock()
m.stdout = json.dumps({"result": json.dumps(agent_output, ensure_ascii=False)})
m.stderr = ""
m.returncode = 0
return m
# ===========================================================================
# Section 1: record_task_return
# ===========================================================================
class TestRecordTaskReturn:
def test_increments_return_count_on_first_return(self, conn):
"""After first return, task.return_count == 1."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 1
def test_increments_return_count_multiple_times(self, conn):
"""After three returns with different categories, task.return_count == 3."""
for cat in ["requirements_unclear", "missing_context", "technical_blocker"]:
models.record_task_return(conn, "P1-001", cat)
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 3
def test_assigns_sequential_return_numbers(self, conn):
"""return_number is assigned as 1, 2, 3 for successive calls."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
models.record_task_return(conn, "P1-001", "technical_blocker")
returns = models.get_task_returns(conn, "P1-001")
assert [r["return_number"] for r in returns] == [1, 2, 3]
def test_invalid_category_defaults_to_missing_context(self, conn):
"""Unknown reason_category is silently coerced to 'missing_context' (fail-open)."""
result = models.record_task_return(conn, "P1-001", "totally_unknown_reason")
assert result["reason_category"] == "missing_context"
def test_valid_category_stored_unchanged(self, conn):
"""Valid reason_category is stored as-is."""
result = models.record_task_return(conn, "P1-001", "scope_too_large",
reason_text="Too much scope")
assert result["reason_category"] == "scope_too_large"
def test_reason_text_stored_correctly(self, conn):
"""Provided reason_text is stored in the returned row."""
result = models.record_task_return(conn, "P1-001", "missing_context",
reason_text="No spec attached")
assert result["reason_text"] == "No spec attached"
def test_returned_by_stored_correctly(self, conn):
"""returned_by value is stored in the returned row."""
result = models.record_task_return(conn, "P1-001", "requirements_unclear",
returned_by="pm")
assert result["returned_by"] == "pm"
def test_returns_dict_with_task_id_and_return_number(self, conn):
"""Return value is a dict containing task_id and return_number."""
result = models.record_task_return(conn, "P1-001", "requirements_unclear")
assert result["task_id"] == "P1-001"
assert result["return_number"] == 1
def test_return_numbers_independent_per_task(self, conn):
"""Two different tasks have independent return_number sequences."""
models.create_task(conn, "P1-002", "p1", "Another task")
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
models.record_task_return(conn, "P1-002", "technical_blocker")
returns_001 = models.get_task_returns(conn, "P1-001")
returns_002 = models.get_task_returns(conn, "P1-002")
assert [r["return_number"] for r in returns_001] == [1, 2]
assert returns_002[0]["return_number"] == 1
def test_all_valid_return_categories_accepted(self, conn):
"""Every value in RETURN_CATEGORIES is accepted and stored without coercion."""
models.create_task(conn, f"P1-CAT", "p1", "Category test task")
for i, cat in enumerate(RETURN_CATEGORIES):
result = models.record_task_return(conn, "P1-CAT", cat)
assert result["reason_category"] == cat, f"Category '{cat}' should be stored as-is"
# ===========================================================================
# Section 2: get_task_returns
# ===========================================================================
class TestGetTaskReturns:
def test_empty_for_task_with_no_returns(self, conn):
"""Task without any returns → empty list."""
result = models.get_task_returns(conn, "P1-001")
assert result == []
def test_returns_history_in_ascending_order(self, conn):
"""Returns are ordered by return_number ASC."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "scope_too_large")
models.record_task_return(conn, "P1-001", "technical_blocker")
returns = models.get_task_returns(conn, "P1-001")
assert len(returns) == 3
assert returns[0]["reason_category"] == "requirements_unclear"
assert returns[1]["reason_category"] == "scope_too_large"
assert returns[2]["reason_category"] == "technical_blocker"
def test_limit_parameter_caps_results(self, conn):
"""limit parameter restricts the number of returned records."""
for _ in range(5):
models.record_task_return(conn, "P1-001", "missing_context")
returns = models.get_task_returns(conn, "P1-001", limit=3)
assert len(returns) == 3
# ===========================================================================
# Section 3: check_return_pattern
# ===========================================================================
class TestCheckReturnPattern:
def test_no_history_returns_no_pattern(self, conn):
"""Task with no returns → pattern_detected=False, dominant_category=None."""
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is False
assert result["dominant_category"] is None
assert result["occurrences"] == 0
def test_single_return_no_pattern(self, conn):
"""One return → pattern_detected=False (needs >= 2 same category)."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is False
assert result["occurrences"] == 1
def test_same_category_twice_detects_pattern(self, conn):
"""Two returns with same category → pattern_detected=True."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "requirements_unclear")
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is True
assert result["dominant_category"] == "requirements_unclear"
assert result["occurrences"] == 2
def test_same_category_three_times_detects_pattern(self, conn):
"""Three returns with same category → pattern_detected=True, occurrences=3."""
for _ in range(3):
models.record_task_return(conn, "P1-001", "technical_blocker")
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is True
assert result["dominant_category"] == "technical_blocker"
assert result["occurrences"] == 3
def test_all_different_categories_no_false_positive(self, conn):
"""All different categories → no pattern detected (no false positives)."""
categories = [
"requirements_unclear", "scope_too_large", "technical_blocker",
"missing_context", "recurring_quality_fail",
]
for cat in categories:
models.record_task_return(conn, "P1-001", cat)
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is False
def test_two_different_categories_no_false_positive(self, conn):
"""Two returns with two different categories → no pattern (each appears once)."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "scope_too_large")
result = models.check_return_pattern(conn, "P1-001")
assert result["pattern_detected"] is False
def test_dominant_category_is_most_frequent(self, conn):
"""When multiple categories appear, dominant_category is the most frequent one."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
result = models.check_return_pattern(conn, "P1-001")
assert result["dominant_category"] == "requirements_unclear"
assert result["occurrences"] == 2
# ===========================================================================
# Section 4: Edge case — return_count after task completion
# ===========================================================================
class TestReturnCountAfterTaskCompletion:
def test_return_count_not_reset_when_task_set_to_done(self, conn):
"""return_count persists after task status transitions to 'done'.
KIN-135 spec mentions a reset on success, but update_task() does not
implement this. This test documents the current (non-resetting) behavior.
"""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
models.update_task(conn, "P1-001", status="done")
task = models.get_task(conn, "P1-001")
# Documents current behavior: counter persists (not reset to 0)
assert task["return_count"] == 2
def test_return_count_starts_at_zero_for_new_task(self, conn):
"""Newly created task has return_count == 0."""
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 0
# ===========================================================================
# Section 5: _save_return_analyst_output
# ===========================================================================
class TestSaveReturnAnalystOutput:
def _make_analyst_result(self, escalate: bool,
root_cause: str = "Recurring scope issue",
refined_brief: str = "Focus on X only") -> dict:
"""Build a mock return_analyst agent result dict."""
output = {
"escalate_to_dept_head": escalate,
"root_cause_analysis": root_cause,
"refined_brief": refined_brief,
"pattern_summary": "3 returns with similar root cause",
}
return {"success": True, "raw_output": json.dumps(output)}
def test_creates_escalation_pipeline_when_escalate_true(self, conn):
"""escalate_to_dept_head=true → escalation pipeline created, escalated=True."""
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["escalated"] is True
assert result["escalation_pipeline_id"] is not None
assert result["dept_head"] is not None
def test_no_escalation_when_escalate_false(self, conn):
"""escalate_to_dept_head=false → no escalation pipeline, escalated=False."""
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=False),
)
assert result["escalated"] is False
assert result["escalation_pipeline_id"] is None
assert result["dept_head"] is None
def test_dept_head_sec_category_maps_to_security_head(self, conn):
"""SEC task category → dept_head is security_head."""
models.update_task(conn, "P1-001", category="SEC")
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "security_head"
def test_dept_head_ui_category_maps_to_frontend_head(self, conn):
"""UI task category → dept_head is frontend_head."""
models.update_task(conn, "P1-001", category="UI")
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "frontend_head"
def test_dept_head_api_category_maps_to_backend_head(self, conn):
"""API task category → dept_head is backend_head."""
models.update_task(conn, "P1-001", category="API")
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "backend_head"
def test_dept_head_infra_category_maps_to_infra_head(self, conn):
"""INFRA task category → dept_head is infra_head."""
models.update_task(conn, "P1-001", category="INFRA")
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "infra_head"
def test_unknown_category_defaults_to_cto_advisor(self, conn):
"""Unknown/missing task category defaults to cto_advisor."""
# Task category not set (None by default)
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
assert result["dept_head"] == "cto_advisor"
def test_escalation_pipeline_has_pipeline_type_escalation_in_db(self, conn):
"""Created escalation pipeline has pipeline_type='escalation' in the DB."""
result = _save_return_analyst_output(
conn, "P1-001", "p1",
self._make_analyst_result(escalate=True),
)
pipeline_id = result["escalation_pipeline_id"]
row = conn.execute(
"SELECT pipeline_type FROM pipelines WHERE id = ?", (pipeline_id,)
).fetchone()
assert row["pipeline_type"] == "escalation"
def test_no_escalation_when_output_is_invalid_json(self, conn):
"""Non-parseable JSON output → fail-open, escalated=False."""
bad_result = {"success": True, "raw_output": "not valid json {{{"}
result = _save_return_analyst_output(conn, "P1-001", "p1", bad_result)
assert result["escalated"] is False
def test_no_escalation_when_output_is_empty(self, conn):
"""Empty output → fail-open, escalated=False."""
result = _save_return_analyst_output(
conn, "P1-001", "p1",
{"success": True, "raw_output": ""},
)
assert result["escalated"] is False
def test_no_escalation_when_output_missing_field(self, conn):
"""Output without escalate_to_dept_head field → escalated=False."""
output = {"root_cause_analysis": "some analysis", "refined_brief": "refined"}
result = _save_return_analyst_output(
conn, "P1-001", "p1",
{"success": True, "raw_output": json.dumps(output)},
)
assert result["escalated"] is False
def test_accepts_dict_raw_output(self, conn):
"""raw_output as dict (not string) is handled via json.dumps path."""
output_dict = {
"escalate_to_dept_head": True,
"root_cause_analysis": "cause",
"refined_brief": "brief",
}
result = _save_return_analyst_output(
conn, "P1-001", "p1",
{"success": True, "raw_output": output_dict},
)
assert result["escalated"] is True
# ===========================================================================
# Section 6: Gate cannot_close — return recording in pipeline
# ===========================================================================
class TestGateCannotCloseRecordsReturn:
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_gate_rejection_increments_return_count_in_standard_pipeline(
self, mock_run, mock_hooks, mock_followup, conn_autocomplete
):
"""Gate cannot_close in standard pipeline → return_count increments on each auto-return.
KIN-136: in auto_complete mode, gate rejection triggers auto-return up to _AUTO_RETURN_MAX
times before escalating. Each auto-return increments return_count once; the final
escalation via the human-escalation path also records one more return.
Total: _AUTO_RETURN_MAX + 1 returns (3 auto-returns + 1 final escalation = 4).
"""
conn = conn_autocomplete
mock_run.return_value = _mock_subprocess(
{"verdict": "changes_requested", "reason": "Missing tests"}
)
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
steps = [{"role": "reviewer", "brief": "review"}]
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
# KIN-136: _AUTO_RETURN_MAX auto-returns + 1 final escalation recording (git log: 2026-03-21)
assert task["return_count"] == _AUTO_RETURN_MAX + 1, (
"Gate rejection with persistent failure should increment return_count "
f"_AUTO_RETURN_MAX+1 times ({_AUTO_RETURN_MAX + 1})"
)
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_gate_approval_does_not_increment_return_count(
self, mock_run, mock_hooks, mock_followup, conn_autocomplete
):
"""Gate approved → task.return_count stays 0."""
conn = conn_autocomplete
mock_run.return_value = _mock_subprocess(
{"verdict": "approved", "reason": ""}
)
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
steps = [{"role": "reviewer", "brief": "review"}]
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 0, (
"Gate approval should not increment return_count"
)
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_gate_rejection_records_return_with_recurring_quality_fail_category(
self, mock_run, mock_hooks, mock_followup, conn_autocomplete
):
"""Gate rejection uses 'recurring_quality_fail' as reason_category.
KIN-136: all auto-return records use the same category; final escalation also uses it.
Total returns = _AUTO_RETURN_MAX + 1 (git log: 2026-03-21).
"""
conn = conn_autocomplete
mock_run.return_value = _mock_subprocess(
{"verdict": "changes_requested", "reason": "Need unit tests"}
)
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
steps = [{"role": "reviewer", "brief": "review"}]
run_pipeline(conn, "P1-001", steps)
returns = models.get_task_returns(conn, "P1-001")
assert len(returns) == _AUTO_RETURN_MAX + 1
assert all(r["reason_category"] == "recurring_quality_fail" for r in returns), (
"Все записи возврата должны иметь категорию recurring_quality_fail"
)
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_gate_rejection_skips_return_recording_in_escalation_pipeline(
self, mock_run, mock_hooks, mock_followup, conn_autocomplete
):
"""Gate rejection in escalation pipeline → return_count NOT incremented.
Escalation pipelines must not trigger further return tracking to avoid
infinite loops.
"""
conn = conn_autocomplete
mock_run.return_value = _mock_subprocess(
{"verdict": "changes_requested", "reason": "Still not good"}
)
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
# Intercept create_pipeline and update_pipeline to force pipeline_type='escalation'
_real_create = models.create_pipeline
_real_update = models.update_pipeline
def _create_as_escalation(db, *args, **kwargs):
result = _real_create(db, *args, **kwargs)
db.execute(
"UPDATE pipelines SET pipeline_type = 'escalation' WHERE id = ?",
(result["id"],),
)
db.commit()
return {**result, "pipeline_type": "escalation"}
def _update_keeping_escalation(db, id, **kwargs):
result = _real_update(db, id, **kwargs)
return {**result, "pipeline_type": "escalation"} if result else result
with patch("agents.runner.models.create_pipeline", side_effect=_create_as_escalation), \
patch("agents.runner.models.update_pipeline", side_effect=_update_keeping_escalation):
steps = [{"role": "reviewer", "brief": "review"}]
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
assert task["return_count"] == 0, (
"Gate rejection in escalation pipeline must NOT increment return_count"
)
# ===========================================================================
# Section 7: Context builder — return_count in PM / return_analyst context
# ===========================================================================
class TestContextBuilderReturnHistory:
def test_pm_context_has_return_count_zero_when_no_returns(self, conn):
"""PM context includes return_count=0 when task has no returns."""
ctx = build_context(conn, "P1-001", "pm", "p1")
assert ctx.get("return_count") == 0
def test_pm_context_has_empty_return_history_when_no_returns(self, conn):
"""PM context includes return_history=[] when task has no returns."""
ctx = build_context(conn, "P1-001", "pm", "p1")
assert ctx.get("return_history") == []
def test_pm_context_includes_return_count_when_returns_exist(self, conn):
"""PM context reflects actual return_count when returns have been recorded."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "requirements_unclear")
models.record_task_return(conn, "P1-001", "missing_context")
ctx = build_context(conn, "P1-001", "pm", "p1")
assert ctx["return_count"] == 3
def test_pm_context_includes_return_history_when_returns_exist(self, conn):
"""PM context includes up to 5 most recent returns in return_history."""
for _ in range(3):
models.record_task_return(conn, "P1-001", "requirements_unclear")
ctx = build_context(conn, "P1-001", "pm", "p1")
assert len(ctx["return_history"]) == 3
assert ctx["return_history"][0]["reason_category"] == "requirements_unclear"
def test_return_analyst_context_has_return_count(self, conn):
"""return_analyst context includes return_count reflecting actual returns."""
models.record_task_return(conn, "P1-001", "scope_too_large")
models.record_task_return(conn, "P1-001", "scope_too_large")
ctx = build_context(conn, "P1-001", "return_analyst", "p1")
assert ctx["return_count"] == 2
def test_return_analyst_context_has_full_return_history(self, conn):
"""return_analyst context includes full return history (limit=20)."""
for i in range(7):
models.record_task_return(conn, "P1-001", "technical_blocker")
ctx = build_context(conn, "P1-001", "return_analyst", "p1")
assert len(ctx["return_history"]) == 7
def test_non_pm_role_does_not_get_return_count(self, conn):
"""Non-PM roles (e.g., debugger) do not get return_count in context."""
models.record_task_return(conn, "P1-001", "requirements_unclear")
ctx = build_context(conn, "P1-001", "debugger", "p1")
# debugger context should not include return_count
assert "return_count" not in ctx