diff --git a/core/models.py b/core/models.py index 1693c27..2553c47 100644 --- a/core/models.py +++ b/core/models.py @@ -484,11 +484,15 @@ def get_decisions( if category: query += " AND d.category = ?" params.append(category) - if types: + if types is not None: + if not types: + return [] placeholders = ", ".join("?" for _ in types) query += f" AND d.type IN ({placeholders})" params.extend(types) - if tags: + if tags is not None: + if not tags: + return [] query += """ AND d.id IN ( SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t WHERE t.value IN ({}) diff --git a/tests/test_kin_128_regression.py b/tests/test_kin_128_regression.py new file mode 100644 index 0000000..89d9106 --- /dev/null +++ b/tests/test_kin_128_regression.py @@ -0,0 +1,409 @@ +"""Regression tests for KIN-128 — Quality over speed. + +Covers 4 mechanisms: +1. Analyst auto-injection on 2nd+ revision (revise_count >= 2) +2. Smoke tester: cannot_confirm → task blocked; confirmed → pipeline continues +3. Agent prompts contain mandatory proof block (backend_dev, frontend_dev, debugger) +4. Tech debt output creates child task in DB +""" + +import json +import subprocess +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +from core.db import init_db +from core import models +from agents.runner import _save_tech_debt_output, _TECH_DEBT_ROLES, run_pipeline + + +PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def conn(): + """Fresh in-memory DB with a seeded project and task.""" + c = init_db(":memory:") + models.create_project(c, "p1", "P1", "/p1", tech_stack=["python"]) + models.create_task(c, "P1-001", "p1", "Fix bug", brief={"text": "fix"}) + yield c + c.close() + + +@pytest.fixture +def api_client(tmp_path): + """FastAPI TestClient with isolated DB.""" + import web.api as api_module + api_module.DB_PATH = tmp_path / "test.db" + from web.api import app + from fastapi.testclient import TestClient + client = TestClient(app) + client.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"}) + client.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"}) + return client + + +def _mock_success(output_data): + m = MagicMock() + m.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data + m.stderr = "" + m.returncode = 0 + return m + + +# =========================================================================== +# 1. Analyst injection on 2nd+ revision +# =========================================================================== + +class TestAnalystInjectionOnRevise: + def test_first_revise_does_not_inject_analyst(self, api_client): + """revise_count=1 → analyst НЕ добавляется в шаги пайплайна.""" + steps = [{"role": "backend_dev", "model": "sonnet"}] + with patch("web.api._launch_pipeline_subprocess"): + r = api_client.post( + "/api/tasks/P1-001/revise", + json={"comment": "please fix", "steps": steps}, + ) + assert r.status_code == 200 + data = r.json() + assert data["revise_count"] == 1 + roles = [s["role"] for s in (data.get("pipeline_steps") or [])] + assert roles[0] != "analyst", "analyst не должен быть первым шагом при revise_count=1" + + def test_second_revise_injects_analyst_as_first_step(self, api_client): + """revise_count=2 → analyst автоматически первым шагом пайплайна.""" + steps = [{"role": "backend_dev", "model": "sonnet"}] + with patch("web.api._launch_pipeline_subprocess"): + # first revise + api_client.post( + "/api/tasks/P1-001/revise", + json={"comment": "first attempt", "steps": steps}, + ) + # second revise + r = api_client.post( + "/api/tasks/P1-001/revise", + json={"comment": "second attempt", "steps": steps}, + ) + assert r.status_code == 200 + data = r.json() + assert data["revise_count"] == 2 + pipeline_steps = data.get("pipeline_steps") or [] + assert pipeline_steps, "pipeline_steps не должен быть пустым" + assert pipeline_steps[0]["role"] == "analyst", ( + f"Первый шаг должен быть analyst, получили: {pipeline_steps[0]['role']}" + ) + + def test_second_revise_analyst_not_duplicated_if_already_first(self, api_client): + """Если analyst уже первый шаг — не добавлять второй.""" + steps = [{"role": "analyst", "model": "sonnet"}, {"role": "backend_dev", "model": "sonnet"}] + with patch("web.api._launch_pipeline_subprocess"): + api_client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": steps}) + r = api_client.post("/api/tasks/P1-001/revise", json={"comment": "r2", "steps": steps}) + data = r.json() + pipeline_steps = data.get("pipeline_steps") or [] + analyst_steps = [s for s in pipeline_steps if s["role"] == "analyst"] + assert len(analyst_steps) == 1, "analyst не должен дублироваться" + + def test_third_revise_also_injects_analyst(self, api_client): + """revise_count=3 тоже инжектирует analyst.""" + steps = [{"role": "backend_dev", "model": "sonnet"}] + with patch("web.api._launch_pipeline_subprocess"): + for comment in ("r1", "r2", "r3"): + r = api_client.post( + "/api/tasks/P1-001/revise", + json={"comment": comment, "steps": steps}, + ) + data = r.json() + assert data["revise_count"] == 3 + assert (data.get("pipeline_steps") or [{}])[0]["role"] == "analyst" + + +# =========================================================================== +# 2. Smoke tester: cannot_confirm → blocked; confirmed → continues +# =========================================================================== + +class TestSmokeTester: + def _make_pipeline(self, smoke_status: str): + """Build single-step pipeline with smoke_tester returning given status.""" + return [{"role": "smoke_tester", "model": "sonnet"}], { + "status": smoke_status, + "commands_run": [], + "evidence": "HTTP 200" if smoke_status == "confirmed" else None, + "reason": "нет доступа к prod" if smoke_status == "cannot_confirm" else None, + } + + @patch("agents.runner.subprocess.run") + def test_smoke_tester_cannot_confirm_blocks_task(self, mock_run, conn): + """smoke_tester с cannot_confirm → задача переходит в blocked.""" + steps, smoke_output = self._make_pipeline("cannot_confirm") + mock_run.return_value = _mock_success(smoke_output) + + result = run_pipeline(conn, "P1-001", steps) + + assert result["success"] is False + assert result.get("blocked_by") == "smoke_tester" + + task = models.get_task(conn, "P1-001") + assert task["status"] == "blocked" + assert task["blocked_agent_role"] == "smoke_tester" + assert "cannot_confirm" in (task.get("blocked_reason") or "") + + @patch("agents.runner.subprocess.run") + def test_smoke_tester_confirmed_continues_pipeline(self, mock_run, conn): + """smoke_tester с confirmed → пайплайн продолжается, задача не блокируется.""" + steps = [ + {"role": "smoke_tester", "model": "sonnet"}, + ] + _, smoke_output = self._make_pipeline("confirmed") + mock_run.return_value = _mock_success(smoke_output) + + result = run_pipeline(conn, "P1-001", steps) + + # Pipeline completes (only one step — smoke_tester — which passed) + assert result.get("blocked_by") != "smoke_tester" + task = models.get_task(conn, "P1-001") + assert task["status"] != "blocked" + + @patch("agents.runner.subprocess.run") + def test_smoke_tester_confirmed_saves_result_to_db(self, mock_run, conn): + """smoke_tester confirmed → результат сохраняется в smoke_test_result.""" + steps = [{"role": "smoke_tester", "model": "sonnet"}] + smoke_output = { + "status": "confirmed", + "commands_run": ["curl https://example.com/health"], + "evidence": "HTTP 200 OK", + "reason": None, + } + mock_run.return_value = _mock_success(smoke_output) + + run_pipeline(conn, "P1-001", steps) + + task = models.get_task(conn, "P1-001") + result = task.get("smoke_test_result") + assert result is not None + assert result.get("status") == "confirmed" + + @patch("agents.runner.subprocess.run") + def test_smoke_tester_cannot_confirm_saves_result_to_db(self, mock_run, conn): + """smoke_tester cannot_confirm → результат также сохраняется в smoke_test_result.""" + steps = [{"role": "smoke_tester", "model": "sonnet"}] + smoke_output = { + "status": "cannot_confirm", + "commands_run": [], + "evidence": None, + "reason": "prod недоступен", + } + mock_run.return_value = _mock_success(smoke_output) + + run_pipeline(conn, "P1-001", steps) + + task = models.get_task(conn, "P1-001") + result = task.get("smoke_test_result") + assert result is not None + assert result.get("status") == "cannot_confirm" + + +# =========================================================================== +# 3. Agent prompts contain mandatory proof block +# =========================================================================== + +class TestAgentPromptsProofBlock: + """Verify that agent prompts structurally require a proof block.""" + + @pytest.mark.parametrize("prompt_file,forbidden_keyword,proof_keyword", [ + ("backend_dev.md", "status: done", "proof"), + ("frontend_dev.md", "status: done", "proof"), + ("debugger.md", "status: fixed", "proof"), + ]) + def test_prompt_contains_proof_field(self, prompt_file, forbidden_keyword, proof_keyword): + """Промпт содержит поле proof в разделе Output format.""" + content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8") + assert proof_keyword in content, ( + f"{prompt_file}: отсутствует поле '{proof_keyword}' в Output format" + ) + + @pytest.mark.parametrize("prompt_file,rule_phrase", [ + ("backend_dev.md", "ЗАПРЕЩЕНО"), + ("frontend_dev.md", "ЗАПРЕЩЕНО"), + ("debugger.md", "ЗАПРЕЩЕНО"), + ]) + def test_prompt_forbids_done_without_proof(self, prompt_file, rule_phrase): + """Промпт явно запрещает возврат 'готово' без доказательства.""" + content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8") + assert rule_phrase in content, ( + f"{prompt_file}: не содержит правило ЗАПРЕЩЕНО о proof" + ) + + def test_smoke_tester_prompt_exists(self): + """Промпт smoke_tester.md существует и содержит cannot_confirm.""" + path = PROMPTS_DIR / "smoke_tester.md" + assert path.exists(), "smoke_tester.md не найден" + content = path.read_text(encoding="utf-8") + assert "cannot_confirm" in content + + def test_analyst_prompt_exists(self): + """Промпт analyst.md существует и содержит root_problem.""" + path = PROMPTS_DIR / "analyst.md" + assert path.exists(), "analyst.md не найден" + content = path.read_text(encoding="utf-8") + assert "root_problem" in content + + def test_backend_dev_proof_fields_complete(self): + """backend_dev.md содержит все три поля внутри proof.""" + content = (PROMPTS_DIR / "backend_dev.md").read_text(encoding="utf-8") + for field in ("what_was_done", "how_verified", "verification_result"): + assert field in content, f"backend_dev.md: отсутствует поле proof.{field}" + + def test_debugger_proof_fields_complete(self): + """debugger.md содержит все три поля внутри proof.""" + content = (PROMPTS_DIR / "debugger.md").read_text(encoding="utf-8") + for field in ("what_was_fixed", "how_verified", "verification_result"): + assert field in content, f"debugger.md: отсутствует поле proof.{field}" + + +# =========================================================================== +# 4. Tech debt output creates child task in DB +# =========================================================================== + +class TestTechDebtTaskCreation: + def test_tech_debt_roles_contains_expected_roles(self): + """_TECH_DEBT_ROLES содержит все dev-роли + sysadmin.""" + assert "backend_dev" in _TECH_DEBT_ROLES + assert "frontend_dev" in _TECH_DEBT_ROLES + assert "debugger" in _TECH_DEBT_ROLES + assert "sysadmin" in _TECH_DEBT_ROLES + + def test_save_tech_debt_creates_child_task(self, conn): + """tech_debt с description → создаётся дочерняя задача [TECH DEBT].""" + result = { + "success": True, + "raw_output": json.dumps({ + "status": "done", + "tech_debt": { + "description": "Хардкод timeout в 30s нужно вынести в конфиг", + "reason_temporary": "MVP без конфига", + "proper_fix": "Добавить config.yaml с полем agent_timeout", + "category": "FIX", + }, + }), + } + out = _save_tech_debt_output(conn, "p1", "P1-001", result) + + assert out["created"] is True + assert out["task_id"] is not None + + child = models.get_task(conn, out["task_id"]) + assert child is not None + assert "[TECH DEBT]" in child["title"] + assert "Хардкод timeout" in child["title"] + assert child["parent_task_id"] == "P1-001" + assert child["category"] == "FIX" + + def test_save_tech_debt_without_description_creates_nothing(self, conn): + """tech_debt без description → задача НЕ создаётся.""" + result = { + "success": True, + "raw_output": json.dumps({ + "status": "done", + "tech_debt": {"description": "", "reason_temporary": ""}, + }), + } + out = _save_tech_debt_output(conn, "p1", "P1-001", result) + assert out["created"] is False + + def test_save_tech_debt_without_field_creates_nothing(self, conn): + """Вывод без поля tech_debt → задача НЕ создаётся.""" + result = { + "success": True, + "raw_output": json.dumps({"status": "done", "changes": []}), + } + out = _save_tech_debt_output(conn, "p1", "P1-001", result) + assert out["created"] is False + + def test_save_tech_debt_idempotent(self, conn): + """Повторный вызов с тем же описанием — дубликат НЕ создаётся.""" + raw = json.dumps({ + "status": "done", + "tech_debt": { + "description": "Duplicate tech debt check", + "reason_temporary": "quick fix", + "proper_fix": "refactor", + }, + }) + result = {"success": True, "raw_output": raw} + + out1 = _save_tech_debt_output(conn, "p1", "P1-001", result) + out2 = _save_tech_debt_output(conn, "p1", "P1-001", result) + + assert out1["created"] is True + assert out2["created"] is False + assert out1["task_id"] == out2["task_id"] + + def test_save_tech_debt_invalid_category_defaults_to_fix(self, conn): + """Неизвестная category в tech_debt → дефолтная категория FIX.""" + result = { + "success": True, + "raw_output": json.dumps({ + "status": "done", + "tech_debt": { + "description": "Временный костыль", + "category": "UNKNOWN_GARBAGE", + }, + }), + } + out = _save_tech_debt_output(conn, "p1", "P1-001", result) + assert out["created"] is True + child = models.get_task(conn, out["task_id"]) + assert child["category"] == "FIX" + + @patch("agents.runner.subprocess.run") + def test_pipeline_creates_tech_debt_task_for_backend_dev(self, mock_run, conn): + """run_pipeline для backend_dev с tech_debt → создаётся дочерняя задача.""" + agent_output = { + "status": "done", + "changes": [], + "tech_debt": { + "description": "Pipeline integration: хардкод URL", + "reason_temporary": "нет конфига", + "proper_fix": "вынести в env", + "category": "FIX", + }, + "proof": { + "what_was_done": "done", + "how_verified": "tests", + "verification_result": "ok", + }, + } + mock_run.return_value = _mock_success(agent_output) + + steps = [{"role": "backend_dev", "model": "sonnet"}] + run_pipeline(conn, "P1-001", steps) + + # Check if a [TECH DEBT] child task was created + children = models.get_children(conn, "P1-001") + tech_debt_tasks = [t for t in children if "[TECH DEBT]" in (t.get("title") or "")] + assert len(tech_debt_tasks) >= 1, "Дочерняя tech debt задача должна быть создана" + + +# =========================================================================== +# 5. DB schema: smoke_test_result column exists +# =========================================================================== + +def test_schema_tasks_has_smoke_test_result_column(conn): + """KIN-128: таблица tasks содержит колонку smoke_test_result.""" + cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()} + assert "smoke_test_result" in cols, "KIN-128: smoke_test_result должна быть в таблице tasks" + + +def test_update_task_smoke_test_result_roundtrip(conn): + """smoke_test_result сохраняется и читается как dict.""" + data = {"status": "confirmed", "evidence": "HTTP 200"} + models.update_task(conn, "P1-001", smoke_test_result=data) + task = models.get_task(conn, "P1-001") + assert task["smoke_test_result"] == data diff --git a/tests/test_models.py b/tests/test_models.py index f73fae2..79db69c 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1173,3 +1173,78 @@ def test_get_decisions_empty_tags_differs_from_none(conn): assert len(result_empty) == 0, "tags=[] должен давать 0 результатов" assert len(result_none) == 2, "tags=None должен давать все 2 результата" + + +# --------------------------------------------------------------------------- +# Углублённое исследование: дополнительные edge cases get_decisions с [] +# Задача KIN-P1-001 revision — оба параметра пустые, пустой проект, decisions без тегов +# --------------------------------------------------------------------------- + +def test_get_decisions_both_types_and_tags_empty_returns_empty(conn): + """types=[] + tags=[] одновременно — должен вернуть 0 результатов. + + Ранний возврат по types=[] должен сработать до проверки tags=[], результат []. + """ + models.create_project(conn, "p1", "P1", "/p1") + models.add_decision(conn, "p1", "gotcha", "Ловушка A", "desc", tags=["safari"]) + models.add_decision(conn, "p1", "decision", "Решение B", "desc", tags=["chrome"]) + + result = models.get_decisions(conn, "p1", types=[], tags=[]) + assert result == [], ( + f"types=[] + tags=[] должен вернуть [], получено {len(result)} записей" + ) + + +def test_get_decisions_empty_types_with_tags_none_returns_empty(conn): + """types=[] при tags=None — ранний возврат по types, теги не проверяются → 0 результатов.""" + models.create_project(conn, "p1", "P1", "/p1") + models.add_decision(conn, "p1", "gotcha", "Ловушка 1", "desc", tags=["safari"]) + + result = models.get_decisions(conn, "p1", types=[], tags=None) + assert result == [], ( + f"types=[] должен дать ранний возврат [], даже когда tags=None, получено {len(result)} записей" + ) + + +def test_get_decisions_empty_tags_with_types_none_returns_empty(conn): + """tags=[] при types=None — фильтр по тегам даёт ранний возврат → 0 результатов.""" + models.create_project(conn, "p1", "P1", "/p1") + models.add_decision(conn, "p1", "gotcha", "Ловушка 1", "desc", tags=["safari"]) + + result = models.get_decisions(conn, "p1", types=None, tags=[]) + assert result == [], ( + f"tags=[] должен дать ранний возврат [], даже когда types=None, получено {len(result)} записей" + ) + + +def test_get_decisions_empty_types_on_empty_project_returns_empty(conn): + """types=[] на проекте без решений — должен вернуть [] (ранний возврат, не обращение к пустой таблице).""" + models.create_project(conn, "p1", "P1", "/p1") + + result = models.get_decisions(conn, "p1", types=[]) + assert result == [], ( + f"types=[] на пустом проекте должен вернуть [], получено {result!r}" + ) + + +def test_get_decisions_empty_tags_on_decisions_without_tags(conn): + """tags=[] при наличии decisions с tags=None — должен вернуть [] (не включать decisions без тегов).""" + models.create_project(conn, "p1", "P1", "/p1") + models.add_decision(conn, "p1", "decision", "Без тегов 1", "desc", tags=None) + models.add_decision(conn, "p1", "gotcha", "Без тегов 2", "desc", tags=None) + + result = models.get_decisions(conn, "p1", tags=[]) + assert result == [], ( + f"tags=[] должен вернуть [], даже если decisions имеют tags=None, получено {len(result)} записей" + ) + + +def test_get_decisions_empty_types_with_category_ignores_category(conn): + """types=[] + category='ui' — ранний возврат по types=[], category не влияет → 0 результатов.""" + models.create_project(conn, "p1", "P1", "/p1") + models.add_decision(conn, "p1", "gotcha", "UI ловушка", "desc", category="ui") + + result = models.get_decisions(conn, "p1", types=[], category="ui") + assert result == [], ( + f"types=[] должен давать ранний возврат [] независимо от category, получено {len(result)} записей" + )