"""Regression tests for KIN-128 — Quality over speed. Covers 4 mechanisms: 1. Analyst auto-injection on 2nd+ revision (revise_count >= 2) 2. Smoke tester: cannot_confirm → task blocked; confirmed → pipeline continues 3. Agent prompts contain mandatory proof block (backend_dev, frontend_dev, debugger) 4. Tech debt output creates child task in DB """ import json import subprocess from pathlib import Path from unittest.mock import patch, MagicMock import pytest from core.db import init_db from core import models from agents.runner import _save_tech_debt_output, _TECH_DEBT_ROLES, run_pipeline PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts" # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def conn(): """Fresh in-memory DB with a seeded project and task.""" c = init_db(":memory:") models.create_project(c, "p1", "P1", "/p1", tech_stack=["python"]) models.create_task(c, "P1-001", "p1", "Fix bug", brief={"text": "fix"}) yield c c.close() @pytest.fixture def api_client(tmp_path): """FastAPI TestClient with isolated DB.""" import web.api as api_module api_module.DB_PATH = tmp_path / "test.db" from web.api import app from fastapi.testclient import TestClient client = TestClient(app) client.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"}) client.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"}) return client def _mock_success(output_data): m = MagicMock() m.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data m.stderr = "" m.returncode = 0 return m # =========================================================================== # 1. Analyst injection on 2nd+ revision # =========================================================================== class TestAnalystInjectionOnRevise: def test_first_revise_does_not_inject_analyst(self, api_client): """revise_count=1 → analyst НЕ добавляется в шаги пайплайна.""" steps = [{"role": "backend_dev", "model": "sonnet"}] with patch("web.api._launch_pipeline_subprocess"): r = api_client.post( "/api/tasks/P1-001/revise", json={"comment": "please fix", "steps": steps}, ) assert r.status_code == 200 data = r.json() assert data["revise_count"] == 1 roles = [s["role"] for s in (data.get("pipeline_steps") or [])] assert roles[0] != "analyst", "analyst не должен быть первым шагом при revise_count=1" def test_second_revise_injects_analyst_as_first_step(self, api_client): """revise_count=2 → analyst автоматически первым шагом пайплайна.""" steps = [{"role": "backend_dev", "model": "sonnet"}] with patch("web.api._launch_pipeline_subprocess"): # first revise api_client.post( "/api/tasks/P1-001/revise", json={"comment": "first attempt", "steps": steps}, ) # second revise r = api_client.post( "/api/tasks/P1-001/revise", json={"comment": "second attempt", "steps": steps}, ) assert r.status_code == 200 data = r.json() assert data["revise_count"] == 2 pipeline_steps = data.get("pipeline_steps") or [] assert pipeline_steps, "pipeline_steps не должен быть пустым" assert pipeline_steps[0]["role"] == "analyst", ( f"Первый шаг должен быть analyst, получили: {pipeline_steps[0]['role']}" ) def test_second_revise_analyst_not_duplicated_if_already_first(self, api_client): """Если analyst уже первый шаг — не добавлять второй.""" steps = [{"role": "analyst", "model": "sonnet"}, {"role": "backend_dev", "model": "sonnet"}] with patch("web.api._launch_pipeline_subprocess"): api_client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": steps}) r = api_client.post("/api/tasks/P1-001/revise", json={"comment": "r2", "steps": steps}) data = r.json() pipeline_steps = data.get("pipeline_steps") or [] analyst_steps = [s for s in pipeline_steps if s["role"] == "analyst"] assert len(analyst_steps) == 1, "analyst не должен дублироваться" def test_third_revise_also_injects_analyst(self, api_client): """revise_count=3 тоже инжектирует analyst.""" steps = [{"role": "backend_dev", "model": "sonnet"}] with patch("web.api._launch_pipeline_subprocess"): for comment in ("r1", "r2", "r3"): r = api_client.post( "/api/tasks/P1-001/revise", json={"comment": comment, "steps": steps}, ) data = r.json() assert data["revise_count"] == 3 assert (data.get("pipeline_steps") or [{}])[0]["role"] == "analyst" @patch("agents.runner.subprocess.run") def test_pipeline_with_analyst_first_executes_analyst_as_first_agent(self, mock_run, conn): """Интеграционный тест: run_pipeline с analyst первым шагом запускает analyst первым. Проверяет РЕАЛЬНОЕ выполнение пайплайна через run_pipeline() напрямую, без мока _launch_pipeline_subprocess — в отличие от других тестов класса, которые проверяют только JSON-ответ API. Структура вызовов subprocess.run: call[0] — check_claude_auth (cmd: [claude, "-p", "ok", ...]) call[1] — первый агент пайплайна (должен быть analyst) """ analyst_output = { "status": "done", "root_problem": "Предыдущий подход не учитывал корневую причину X", "suggested_approach": "Переработать алгоритм с учётом Y", "confidence": "high", } mock_run.return_value = _mock_success(analyst_output) # Шаги, которые revise_task() строит при ревизии 2+ (после инжекции analyst) analyst_step = { "role": "analyst", "model": "sonnet", "brief": "Задача вернулась на ревизию №2. Проведи анализ.", } steps = [analyst_step] result = run_pipeline(conn, "P1-001", steps) # conftest._mock_check_claude_auth авто-мокирует check_claude_auth для всех тестов, # поэтому auth-check НЕ занимает slot в call_args_list. # Ожидаемый порядок вызовов subprocess.run: # call[0] = analyst (первый шаг пайплайна) # call[1] = learning extractor (авто-обучение после завершения пайплайна) assert mock_run.call_count >= 1, ( f"Ожидалось >= 1 вызова subprocess.run (analyst), " f"получили {mock_run.call_count}" ) # Первый вызов (индекс 0): cmd = [claude, "-p", prompt, ...] first_agent_cmd = list(mock_run.call_args_list[0].args[0]) assert "-p" in first_agent_cmd, "Claude CLI cmd должна содержать -p" prompt_idx = first_agent_cmd.index("-p") + 1 first_prompt = first_agent_cmd[prompt_idx] # agents/prompts/analyst.md начинается с "You are an Analyst for the Kin..." assert "Analyst" in first_prompt, ( f"Первый выполненный агент должен быть analyst. " f"Промпт начинается с: {first_prompt[:200]}" ) # =========================================================================== # 2. Smoke tester: cannot_confirm → blocked; confirmed → continues # =========================================================================== class TestSmokeTester: def _make_pipeline(self, smoke_status: str): """Build single-step pipeline with smoke_tester returning given status.""" return [{"role": "smoke_tester", "model": "sonnet"}], { "status": smoke_status, "commands_run": [], "evidence": "HTTP 200" if smoke_status == "confirmed" else None, "reason": "нет доступа к prod" if smoke_status == "cannot_confirm" else None, } @patch("agents.runner.subprocess.run") def test_smoke_tester_cannot_confirm_blocks_task(self, mock_run, conn): """smoke_tester с cannot_confirm → задача переходит в blocked.""" steps, smoke_output = self._make_pipeline("cannot_confirm") mock_run.return_value = _mock_success(smoke_output) result = run_pipeline(conn, "P1-001", steps) assert result["success"] is False assert result.get("blocked_by") == "smoke_tester" task = models.get_task(conn, "P1-001") assert task["status"] == "blocked" assert task["blocked_agent_role"] == "smoke_tester" assert "cannot_confirm" in (task.get("blocked_reason") or "") @patch("agents.runner.subprocess.run") def test_smoke_tester_confirmed_continues_pipeline(self, mock_run, conn): """smoke_tester с confirmed → пайплайн продолжается, задача не блокируется.""" steps = [ {"role": "smoke_tester", "model": "sonnet"}, ] _, smoke_output = self._make_pipeline("confirmed") mock_run.return_value = _mock_success(smoke_output) result = run_pipeline(conn, "P1-001", steps) # Pipeline completes (only one step — smoke_tester — which passed) assert result.get("blocked_by") != "smoke_tester" task = models.get_task(conn, "P1-001") assert task["status"] != "blocked" @patch("agents.runner.subprocess.run") def test_smoke_tester_confirmed_saves_result_to_db(self, mock_run, conn): """smoke_tester confirmed → результат сохраняется в smoke_test_result.""" steps = [{"role": "smoke_tester", "model": "sonnet"}] smoke_output = { "status": "confirmed", "commands_run": ["curl https://example.com/health"], "evidence": "HTTP 200 OK", "reason": None, } mock_run.return_value = _mock_success(smoke_output) run_pipeline(conn, "P1-001", steps) task = models.get_task(conn, "P1-001") result = task.get("smoke_test_result") assert result is not None assert result.get("status") == "confirmed" @patch("agents.runner.subprocess.run") def test_smoke_tester_cannot_confirm_saves_result_to_db(self, mock_run, conn): """smoke_tester cannot_confirm → результат также сохраняется в smoke_test_result.""" steps = [{"role": "smoke_tester", "model": "sonnet"}] smoke_output = { "status": "cannot_confirm", "commands_run": [], "evidence": None, "reason": "prod недоступен", } mock_run.return_value = _mock_success(smoke_output) run_pipeline(conn, "P1-001", steps) task = models.get_task(conn, "P1-001") result = task.get("smoke_test_result") assert result is not None assert result.get("status") == "cannot_confirm" # =========================================================================== # 3. Agent prompts contain mandatory proof block # =========================================================================== class TestAgentPromptsProofBlock: """Verify that agent prompts structurally require a proof block.""" @pytest.mark.parametrize("prompt_file,forbidden_keyword,proof_keyword", [ ("backend_dev.md", "status: done", "proof"), ("frontend_dev.md", "status: done", "proof"), ("debugger.md", "status: fixed", "proof"), ]) def test_prompt_contains_proof_field(self, prompt_file, forbidden_keyword, proof_keyword): """Промпт содержит поле proof в разделе Output format.""" content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8") assert proof_keyword in content, ( f"{prompt_file}: отсутствует поле '{proof_keyword}' в Output format" ) @pytest.mark.parametrize("prompt_file,rule_phrase", [ ("backend_dev.md", "ЗАПРЕЩЕНО"), ("frontend_dev.md", "ЗАПРЕЩЕНО"), ("debugger.md", "ЗАПРЕЩЕНО"), ]) def test_prompt_forbids_done_without_proof(self, prompt_file, rule_phrase): """Промпт явно запрещает возврат 'готово' без доказательства.""" content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8") assert rule_phrase in content, ( f"{prompt_file}: не содержит правило ЗАПРЕЩЕНО о proof" ) def test_smoke_tester_prompt_exists(self): """Промпт smoke_tester.md существует и содержит cannot_confirm.""" path = PROMPTS_DIR / "smoke_tester.md" assert path.exists(), "smoke_tester.md не найден" content = path.read_text(encoding="utf-8") assert "cannot_confirm" in content def test_analyst_prompt_exists(self): """Промпт analyst.md существует и содержит root_problem.""" path = PROMPTS_DIR / "analyst.md" assert path.exists(), "analyst.md не найден" content = path.read_text(encoding="utf-8") assert "root_problem" in content def test_backend_dev_proof_fields_complete(self): """backend_dev.md содержит все три поля внутри proof.""" content = (PROMPTS_DIR / "backend_dev.md").read_text(encoding="utf-8") for field in ("what_was_done", "how_verified", "verification_result"): assert field in content, f"backend_dev.md: отсутствует поле proof.{field}" def test_debugger_proof_fields_complete(self): """debugger.md содержит все три поля внутри proof.""" content = (PROMPTS_DIR / "debugger.md").read_text(encoding="utf-8") for field in ("what_was_fixed", "how_verified", "verification_result"): assert field in content, f"debugger.md: отсутствует поле proof.{field}" # =========================================================================== # 4. Tech debt output creates child task in DB # =========================================================================== class TestTechDebtTaskCreation: def test_tech_debt_roles_contains_expected_roles(self): """_TECH_DEBT_ROLES содержит все dev-роли + sysadmin.""" assert "backend_dev" in _TECH_DEBT_ROLES assert "frontend_dev" in _TECH_DEBT_ROLES assert "debugger" in _TECH_DEBT_ROLES assert "sysadmin" in _TECH_DEBT_ROLES def test_save_tech_debt_creates_child_task(self, conn): """tech_debt с description → создаётся дочерняя задача [TECH DEBT].""" result = { "success": True, "raw_output": json.dumps({ "status": "done", "tech_debt": { "description": "Хардкод timeout в 30s нужно вынести в конфиг", "reason_temporary": "MVP без конфига", "proper_fix": "Добавить config.yaml с полем agent_timeout", "category": "FIX", }, }), } out = _save_tech_debt_output(conn, "p1", "P1-001", result) assert out["created"] is True assert out["task_id"] is not None child = models.get_task(conn, out["task_id"]) assert child is not None assert "[TECH DEBT]" in child["title"] assert "Хардкод timeout" in child["title"] assert child["parent_task_id"] == "P1-001" assert child["category"] == "FIX" def test_save_tech_debt_without_description_creates_nothing(self, conn): """tech_debt без description → задача НЕ создаётся.""" result = { "success": True, "raw_output": json.dumps({ "status": "done", "tech_debt": {"description": "", "reason_temporary": ""}, }), } out = _save_tech_debt_output(conn, "p1", "P1-001", result) assert out["created"] is False def test_save_tech_debt_without_field_creates_nothing(self, conn): """Вывод без поля tech_debt → задача НЕ создаётся.""" result = { "success": True, "raw_output": json.dumps({"status": "done", "changes": []}), } out = _save_tech_debt_output(conn, "p1", "P1-001", result) assert out["created"] is False def test_save_tech_debt_idempotent(self, conn): """Повторный вызов с тем же описанием — дубликат НЕ создаётся.""" raw = json.dumps({ "status": "done", "tech_debt": { "description": "Duplicate tech debt check", "reason_temporary": "quick fix", "proper_fix": "refactor", }, }) result = {"success": True, "raw_output": raw} out1 = _save_tech_debt_output(conn, "p1", "P1-001", result) out2 = _save_tech_debt_output(conn, "p1", "P1-001", result) assert out1["created"] is True assert out2["created"] is False assert out1["task_id"] == out2["task_id"] def test_save_tech_debt_invalid_category_defaults_to_fix(self, conn): """Неизвестная category в tech_debt → дефолтная категория FIX.""" result = { "success": True, "raw_output": json.dumps({ "status": "done", "tech_debt": { "description": "Временный костыль", "category": "UNKNOWN_GARBAGE", }, }), } out = _save_tech_debt_output(conn, "p1", "P1-001", result) assert out["created"] is True child = models.get_task(conn, out["task_id"]) assert child["category"] == "FIX" @patch("agents.runner.subprocess.run") def test_pipeline_creates_tech_debt_task_for_backend_dev(self, mock_run, conn): """run_pipeline для backend_dev с tech_debt → создаётся дочерняя задача.""" agent_output = { "status": "done", "changes": [], "tech_debt": { "description": "Pipeline integration: хардкод URL", "reason_temporary": "нет конфига", "proper_fix": "вынести в env", "category": "FIX", }, "proof": { "what_was_done": "done", "how_verified": "tests", "verification_result": "ok", }, } mock_run.return_value = _mock_success(agent_output) steps = [{"role": "backend_dev", "model": "sonnet"}] run_pipeline(conn, "P1-001", steps) # Check if a [TECH DEBT] child task was created children = models.get_children(conn, "P1-001") tech_debt_tasks = [t for t in children if "[TECH DEBT]" in (t.get("title") or "")] assert len(tech_debt_tasks) >= 1, "Дочерняя tech debt задача должна быть создана" # =========================================================================== # 5. DB schema: smoke_test_result column exists # =========================================================================== def test_schema_tasks_has_smoke_test_result_column(conn): """KIN-128: таблица tasks содержит колонку smoke_test_result.""" cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()} assert "smoke_test_result" in cols, "KIN-128: smoke_test_result должна быть в таблице tasks" def test_update_task_smoke_test_result_roundtrip(conn): """smoke_test_result сохраняется и читается как dict.""" data = {"status": "confirmed", "evidence": "HTTP 200"} models.update_task(conn, "P1-001", smoke_test_result=data) task = models.get_task(conn, "P1-001") assert task["smoke_test_result"] == data