""" Regression tests for KIN-091: (1) Revise button — feedback loop, revise_count, target_role, max limit (2) Auto-test before review — _run_project_tests, fix loop, block on exhaustion (3) Spec-driven workflow — route exists and has correct steps in specialists.yaml (4) Git worktrees — create/merge/cleanup/ensure_gitignore with mocked subprocess (5) Auto-trigger pipeline — task with label 'auto' triggers pipeline on creation """ import json import subprocess import pytest from pathlib import Path from unittest.mock import patch, MagicMock, call import web.api as api_module # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def client(tmp_path): db_path = tmp_path / "test.db" api_module.DB_PATH = db_path from web.api import app from fastapi.testclient import TestClient c = TestClient(app) c.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/tmp/p1"}) c.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"}) return c @pytest.fixture def conn(): from core.db import init_db from core import models c = init_db(":memory:") models.create_project(c, "vdol", "ВДОЛЬ", "~/projects/vdolipoperek", tech_stack=["vue3"]) models.create_task(c, "VDOL-001", "vdol", "Fix bug", brief={"route_type": "debug"}) yield c c.close() # --------------------------------------------------------------------------- # (1) Revise button — revise_count, target_role, max limit # --------------------------------------------------------------------------- class TestReviseEndpoint: def test_revise_increments_revise_count(self, client): """revise_count начинается с 0 и увеличивается на 1 при каждом вызове.""" r = client.post("/api/tasks/P1-001/revise", json={"comment": "ещё раз"}) assert r.status_code == 200 assert r.json()["revise_count"] == 1 r = client.post("/api/tasks/P1-001/revise", json={"comment": "и ещё"}) assert r.status_code == 200 assert r.json()["revise_count"] == 2 def test_revise_stores_target_role(self, client): """target_role сохраняется в задаче в БД.""" from core.db import init_db from core import models r = client.post("/api/tasks/P1-001/revise", json={ "comment": "доработай бэкенд", "target_role": "backend_dev", }) assert r.status_code == 200 conn = init_db(api_module.DB_PATH) row = conn.execute( "SELECT revise_target_role FROM tasks WHERE id = 'P1-001'" ).fetchone() conn.close() assert row["revise_target_role"] == "backend_dev" def test_revise_target_role_builds_short_steps(self, client): """Если передан target_role, pipeline_steps = [target_role, reviewer].""" r = client.post("/api/tasks/P1-001/revise", json={ "comment": "фикс", "target_role": "frontend_dev", }) assert r.status_code == 200 steps = r.json()["pipeline_steps"] roles = [s["role"] for s in steps] assert roles == ["frontend_dev", "reviewer"] def test_revise_max_count_exceeded_returns_400(self, client): """После 5 ревизий следующий вызов возвращает 400.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", revise_count=5) conn.close() r = client.post("/api/tasks/P1-001/revise", json={"comment": "6-й"}) assert r.status_code == 400 assert "Max revisions" in r.json()["detail"] def test_revise_sets_status_in_progress(self, client): """После /revise задача переходит в статус in_progress.""" r = client.post("/api/tasks/P1-001/revise", json={"comment": "исправь"}) assert r.status_code == 200 assert r.json()["status"] == "in_progress" def test_revise_only_visible_for_review_done_tasks(self, client): """Задача со статусом 'review' возвращает 200, а не 404.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="review") conn.close() r = client.post("/api/tasks/P1-001/revise", json={"comment": "review→revise"}) assert r.status_code == 200 def test_revise_done_task_allowed(self, client): """Задача со статусом 'done' тоже может быть ревизована.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="done") conn.close() r = client.post("/api/tasks/P1-001/revise", json={"comment": "done→revise"}) assert r.status_code == 200 assert r.json()["status"] == "in_progress" # --------------------------------------------------------------------------- # (2) Auto-test before review — _run_project_tests, fix loop, block # --------------------------------------------------------------------------- class TestRunProjectTests: def test_returns_success_when_make_exits_0(self): """_run_project_tests возвращает success=True при returncode=0.""" from agents.runner import _run_project_tests mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "All tests passed." mock_result.stderr = "" with patch("agents.runner.subprocess.run", return_value=mock_result): result = _run_project_tests("/fake/path") assert result["success"] is True assert "All tests passed." in result["output"] def test_returns_failure_when_make_exits_nonzero(self): """_run_project_tests возвращает success=False при returncode!=0.""" from agents.runner import _run_project_tests mock_result = MagicMock() mock_result.returncode = 2 mock_result.stdout = "" mock_result.stderr = "FAILED 3 tests" with patch("agents.runner.subprocess.run", return_value=mock_result): result = _run_project_tests("/fake/path") assert result["success"] is False assert "FAILED" in result["output"] def test_handles_make_not_found(self): """_run_project_tests возвращает success=False если make не найден.""" from agents.runner import _run_project_tests with patch("agents.runner.subprocess.run", side_effect=FileNotFoundError): result = _run_project_tests("/fake/path") assert result["success"] is False assert result["returncode"] == 127 def test_handles_timeout(self): """_run_project_tests возвращает success=False при таймауте.""" from agents.runner import _run_project_tests with patch("agents.runner.subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="make", timeout=120)): result = _run_project_tests("/fake/path", timeout=120) assert result["success"] is False assert result["returncode"] == 124 def test_custom_test_command_used(self): """KIN-ARCH-008: _run_project_tests вызывает subprocess с переданной командой.""" from agents.runner import _run_project_tests mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "2 passed" mock_result.stderr = "" with patch("agents.runner.subprocess.run", return_value=mock_result) as mock_sp, \ patch("agents.runner.shutil.which", return_value=None): _run_project_tests("/fake/path", test_command="pytest -v") called_cmd = mock_sp.call_args[0][0] assert called_cmd[0] == "pytest" assert "-v" in called_cmd def test_default_test_command_is_make_test(self): """KIN-ARCH-008: без test_command параметра вызывается 'make test'.""" from agents.runner import _run_project_tests mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "OK" mock_result.stderr = "" with patch("agents.runner.subprocess.run", return_value=mock_result) as mock_sp, \ patch("agents.runner.shutil.which", return_value=None): _run_project_tests("/fake/path") called_cmd = mock_sp.call_args[0][0] assert called_cmd[0] == "make" assert "test" in called_cmd def test_custom_command_not_found_returns_127(self): """KIN-ARCH-008: кастомная команда не найдена → returncode 127.""" from agents.runner import _run_project_tests with patch("agents.runner.subprocess.run", side_effect=FileNotFoundError), \ patch("agents.runner.shutil.which", return_value=None): result = _run_project_tests("/fake/path", test_command="nonexistent-cmd --flag") assert result["success"] is False assert result["returncode"] == 127 def test_run_project_tests_empty_string_returns_failure(self): """KIN-ARCH-008 AC#7: пустая строка test_command возвращает returncode -1, success=False.""" from agents.runner import _run_project_tests result = _run_project_tests("/fake/path", test_command="") assert result["success"] is False assert result["returncode"] == -1 assert "Empty test_command" in result["output"] def test_returncode_127_output_contains_not_found(self): """KIN-ARCH-008 AC#7: при returncode 127 вывод содержит 'not found' для диагностики.""" from agents.runner import _run_project_tests with patch("agents.runner.subprocess.run", side_effect=FileNotFoundError), \ patch("agents.runner.shutil.which", return_value=None): result = _run_project_tests("/fake/path", test_command="badcmd") assert "not found" in result["output"].lower() def _mock_success(output="done"): m = MagicMock() m.stdout = json.dumps({"result": output}) m.stderr = "" m.returncode = 0 return m def _mock_failure(msg="error"): m = MagicMock() m.stdout = "" m.stderr = msg m.returncode = 1 return m class TestAutoTestInPipeline: """Pipeline с auto_test_enabled: тесты запускаются автоматически после dev-шага.""" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_project_tests") @patch("agents.runner.subprocess.run") def test_auto_test_passes_pipeline_continues( self, mock_run, mock_tests, mock_autocommit, conn ): """Если авто-тест проходит — pipeline завершается успешно.""" from agents.runner import run_pipeline from core import models mock_run.return_value = _mock_success() mock_tests.return_value = {"success": True, "output": "OK", "returncode": 0} models.update_project(conn, "vdol", auto_test_enabled=True, test_command="make test") steps = [{"role": "backend_dev", "brief": "implement"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True mock_tests.assert_called_once() @patch("agents.runner._run_autocommit") @patch("agents.runner._run_project_tests") @patch("agents.runner.subprocess.run") def test_auto_test_disabled_not_called( self, mock_run, mock_tests, mock_autocommit, conn ): """Если auto_test_enabled=False — make test не вызывается.""" from agents.runner import run_pipeline from core import models mock_run.return_value = _mock_success() # auto_test_enabled по умолчанию 0 steps = [{"role": "backend_dev", "brief": "implement"}] run_pipeline(conn, "VDOL-001", steps) mock_tests.assert_not_called() @patch("agents.runner._run_autocommit") @patch("agents.runner._run_project_tests") @patch("agents.runner.subprocess.run") def test_auto_test_fail_triggers_fix_loop( self, mock_run, mock_tests, mock_autocommit, conn ): """Если авто-тест падает — запускается fixer агент и тесты перезапускаются.""" from agents.runner import run_pipeline from core import models import os mock_run.return_value = _mock_success() # First test call fails, second passes mock_tests.side_effect = [ {"success": False, "output": "FAILED: test_foo", "returncode": 1}, {"success": True, "output": "OK", "returncode": 0}, ] models.update_project(conn, "vdol", auto_test_enabled=True, test_command="make test") with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "3"}): steps = [{"role": "backend_dev", "brief": "implement"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True # _run_project_tests called twice: initial check + after fix assert mock_tests.call_count == 2 # subprocess.run called at least twice: backend_dev + fixer backend_dev assert mock_run.call_count >= 2 @patch("agents.runner._run_autocommit") @patch("agents.runner._run_project_tests") @patch("agents.runner.subprocess.run") def test_auto_test_exhausted_blocks_task( self, mock_run, mock_tests, mock_autocommit, conn ): """Если авто-тест падает max_attempts раз — задача блокируется.""" from agents.runner import run_pipeline from core import models import os mock_run.return_value = _mock_success() # Тест всегда падает mock_tests.return_value = {"success": False, "output": "FAILED", "returncode": 1} models.update_project(conn, "vdol", auto_test_enabled=True, test_command="make test") with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "2"}): steps = [{"role": "backend_dev", "brief": "implement"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False task = models.get_task(conn, "VDOL-001") assert task["status"] == "blocked" assert "Auto-test" in (task.get("blocked_reason") or "") @patch("agents.runner._run_autocommit") @patch("agents.runner._run_project_tests") @patch("agents.runner.subprocess.run") def test_auto_test_not_triggered_for_non_dev_roles( self, mock_run, mock_tests, mock_autocommit, conn ): """auto_test запускается только для backend_dev/frontend_dev, не для debugger.""" from agents.runner import run_pipeline from core import models mock_run.return_value = _mock_success() models.update_project(conn, "vdol", auto_test_enabled=True, test_command="make test") steps = [{"role": "debugger", "brief": "find"}] run_pipeline(conn, "VDOL-001", steps) mock_tests.assert_not_called() @patch("agents.runner._run_autocommit") @patch("agents.runner._run_project_tests") @patch("agents.runner.subprocess.run") def test_auto_test_uses_project_test_command( self, mock_run, mock_tests, mock_autocommit, conn ): """KIN-ARCH-008: pipeline передаёт project.test_command в _run_project_tests.""" from agents.runner import run_pipeline from core import models mock_run.return_value = _mock_success() mock_tests.return_value = {"success": True, "output": "OK", "returncode": 0} models.update_project(conn, "vdol", auto_test_enabled=True, test_command="npm test") steps = [{"role": "backend_dev", "brief": "implement"}] run_pipeline(conn, "VDOL-001", steps) mock_tests.assert_called_once() called_test_command = mock_tests.call_args[0][1] assert called_test_command == "npm test" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_project_tests") @patch("agents.runner.subprocess.run") def test_auto_test_returncode_127_blocks_task( self, mock_run, mock_tests, mock_autocommit, conn ): """KIN-ARCH-008 AC#7: returncode 127 (команда не найдена) исчерпывает попытки и блокирует задачу.""" from agents.runner import run_pipeline from core import models import os mock_run.return_value = _mock_success() # Команда не найдена — всегда 127, always fails mock_tests.return_value = {"success": False, "output": "badcmd not found in PATH", "returncode": 127} models.update_project(conn, "vdol", auto_test_enabled=True, test_command="badcmd") with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "1"}): steps = [{"role": "backend_dev", "brief": "implement"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False task = models.get_task(conn, "VDOL-001") assert task["status"] == "blocked" # --------------------------------------------------------------------------- # (3) Spec-driven workflow route # --------------------------------------------------------------------------- class TestSpecDrivenRoute: def _load_specialists(self): import yaml spec_path = Path(__file__).parent.parent / "agents" / "specialists.yaml" with open(spec_path) as f: return yaml.safe_load(f) def test_spec_driven_route_exists(self): """Маршрут spec_driven должен быть объявлен в specialists.yaml.""" data = self._load_specialists() assert "spec_driven" in data.get("routes", {}) def test_spec_driven_route_steps_order(self): """spec_driven route: шаги [constitution, spec, architect, task_decomposer].""" data = self._load_specialists() steps = data["routes"]["spec_driven"]["steps"] assert steps == ["constitution", "spec", "architect", "task_decomposer"] def test_spec_driven_all_roles_exist(self): """Все роли в spec_driven route должны быть объявлены в specialists.""" data = self._load_specialists() specialists = data.get("specialists", {}) for role in data["routes"]["spec_driven"]["steps"]: assert role in specialists, f"Role '{role}' missing from specialists" def test_constitution_role_has_output_schema(self): """constitution должен иметь output_schema (principles, constraints, goals).""" data = self._load_specialists() schema = data["specialists"]["constitution"].get("output_schema", {}) assert "principles" in schema assert "constraints" in schema assert "goals" in schema def test_spec_role_has_output_schema(self): """spec должен иметь output_schema (overview, features, api_contracts).""" data = self._load_specialists() schema = data["specialists"]["spec"].get("output_schema", {}) assert "overview" in schema assert "features" in schema assert "api_contracts" in schema # --------------------------------------------------------------------------- # (4) Git worktrees — create / merge / cleanup / ensure_gitignore # --------------------------------------------------------------------------- class TestCreateWorktree: def test_create_worktree_success(self, tmp_path): """create_worktree возвращает путь при успешном git worktree add.""" from core.worktree import create_worktree mock_r = MagicMock() mock_r.returncode = 0 mock_r.stderr = "" with patch("core.worktree.subprocess.run", return_value=mock_r): path = create_worktree(str(tmp_path), "TASK-001", "backend_dev") assert path is not None assert "TASK-001-backend_dev" in path def test_create_worktree_git_failure_returns_none(self, tmp_path): """create_worktree возвращает None если git worktree add провалился.""" from core.worktree import create_worktree mock_r = MagicMock() mock_r.returncode = 128 mock_r.stderr = "fatal: branch already exists" with patch("core.worktree.subprocess.run", return_value=mock_r): path = create_worktree(str(tmp_path), "TASK-001", "backend_dev") assert path is None def test_create_worktree_exception_returns_none(self, tmp_path): """create_worktree возвращает None при неожиданном исключении (не поднимает).""" from core.worktree import create_worktree with patch("core.worktree.subprocess.run", side_effect=OSError("no git")): path = create_worktree(str(tmp_path), "TASK-001", "backend_dev") assert path is None def test_create_worktree_branch_name_sanitized(self, tmp_path): """Слэши и пробелы в имени шага заменяются на _.""" from core.worktree import create_worktree mock_r = MagicMock() mock_r.returncode = 0 mock_r.stderr = "" calls_made = [] def capture(*args, **kwargs): calls_made.append(args[0]) return mock_r with patch("core.worktree.subprocess.run", side_effect=capture): create_worktree(str(tmp_path), "TASK-001", "step/with spaces") assert calls_made cmd = calls_made[0] branch = cmd[cmd.index("-b") + 1] assert "/" not in branch assert " " not in branch class TestMergeWorktree: def test_merge_success_returns_merged_files(self, tmp_path): """merge_worktree возвращает success=True и список файлов при успешном merge.""" from core.worktree import merge_worktree worktree = str(tmp_path / "TASK-001-backend_dev") merge_ok = MagicMock(returncode=0, stdout="", stderr="") diff_ok = MagicMock(returncode=0, stdout="src/api.py\nsrc/models.py\n", stderr="") with patch("core.worktree.subprocess.run", side_effect=[merge_ok, diff_ok]): result = merge_worktree(worktree, str(tmp_path)) assert result["success"] is True assert "src/api.py" in result["merged_files"] assert result["conflicts"] == [] def test_merge_conflict_returns_conflict_list(self, tmp_path): """merge_worktree возвращает success=False и список конфликтных файлов.""" from core.worktree import merge_worktree worktree = str(tmp_path / "TASK-001-backend_dev") merge_fail = MagicMock(returncode=1, stdout="", stderr="CONFLICT") conflict_files = MagicMock(returncode=0, stdout="src/models.py\n", stderr="") abort = MagicMock(returncode=0) with patch("core.worktree.subprocess.run", side_effect=[merge_fail, conflict_files, abort]): result = merge_worktree(worktree, str(tmp_path)) assert result["success"] is False assert "src/models.py" in result["conflicts"] def test_merge_exception_returns_success_false(self, tmp_path): """merge_worktree никогда не поднимает исключение.""" from core.worktree import merge_worktree with patch("core.worktree.subprocess.run", side_effect=OSError("git died")): result = merge_worktree("/fake/wt", str(tmp_path)) assert result["success"] is False assert "error" in result class TestCleanupWorktree: def test_cleanup_calls_worktree_remove_and_branch_delete(self, tmp_path): """cleanup_worktree вызывает git worktree remove и git branch -D.""" from core.worktree import cleanup_worktree calls = [] def capture(*args, **kwargs): calls.append(args[0]) return MagicMock(returncode=0) with patch("core.worktree.subprocess.run", side_effect=capture): cleanup_worktree("/fake/path/TASK-branch", str(tmp_path)) assert len(calls) == 2 # первый: worktree remove assert "worktree" in calls[0] assert "remove" in calls[0] # второй: branch -D assert "branch" in calls[1] assert "-D" in calls[1] def test_cleanup_never_raises(self, tmp_path): """cleanup_worktree не поднимает исключение при ошибке.""" from core.worktree import cleanup_worktree with patch("core.worktree.subprocess.run", side_effect=OSError("crashed")): cleanup_worktree("/fake/wt", str(tmp_path)) # должно пройти тихо class TestEnsureGitignore: def test_adds_entry_to_existing_gitignore(self, tmp_path): """ensure_gitignore добавляет .kin_worktrees/ в существующий .gitignore.""" from core.worktree import ensure_gitignore gi = tmp_path / ".gitignore" gi.write_text("*.pyc\n__pycache__/\n") ensure_gitignore(str(tmp_path)) assert ".kin_worktrees/" in gi.read_text() def test_creates_gitignore_if_missing(self, tmp_path): """ensure_gitignore создаёт .gitignore если его нет.""" from core.worktree import ensure_gitignore ensure_gitignore(str(tmp_path)) gi = tmp_path / ".gitignore" assert gi.exists() assert ".kin_worktrees/" in gi.read_text() def test_skips_if_entry_already_present(self, tmp_path): """ensure_gitignore не дублирует запись.""" from core.worktree import ensure_gitignore gi = tmp_path / ".gitignore" gi.write_text(".kin_worktrees/\n") ensure_gitignore(str(tmp_path)) content = gi.read_text() assert content.count(".kin_worktrees/") == 1 def test_never_raises_on_permission_error(self, tmp_path): """ensure_gitignore не поднимает исключение при ошибке записи.""" from core.worktree import ensure_gitignore with patch("core.worktree.Path.open", side_effect=PermissionError): ensure_gitignore(str(tmp_path)) # должно пройти тихо # --------------------------------------------------------------------------- # (5) Auto-trigger pipeline — label 'auto' # --------------------------------------------------------------------------- class TestAutoTrigger: def test_task_with_auto_label_triggers_pipeline(self, client): """Создание задачи с label 'auto' запускает pipeline в фоне.""" with patch("web.api._launch_pipeline_subprocess") as mock_launch: r = client.post("/api/tasks", json={ "project_id": "p1", "title": "Auto task", "labels": ["auto"], }) assert r.status_code == 200 mock_launch.assert_called_once() called_task_id = mock_launch.call_args[0][0] assert called_task_id.startswith("P1-") def test_task_without_auto_label_does_not_trigger(self, client): """Создание задачи без label 'auto' НЕ запускает pipeline.""" with patch("web.api._launch_pipeline_subprocess") as mock_launch: r = client.post("/api/tasks", json={ "project_id": "p1", "title": "Manual task", "labels": ["feature"], }) assert r.status_code == 200 mock_launch.assert_not_called() def test_task_without_labels_does_not_trigger(self, client): """Создание задачи без labels вообще НЕ запускает pipeline.""" with patch("web.api._launch_pipeline_subprocess") as mock_launch: r = client.post("/api/tasks", json={ "project_id": "p1", "title": "Plain task", }) assert r.status_code == 200 mock_launch.assert_not_called() def test_task_with_auto_among_multiple_labels_triggers(self, client): """Задача с несколькими метками включая 'auto' запускает pipeline.""" with patch("web.api._launch_pipeline_subprocess") as mock_launch: r = client.post("/api/tasks", json={ "project_id": "p1", "title": "Multi-label auto task", "labels": ["feature", "auto", "backend"], }) assert r.status_code == 200 mock_launch.assert_called_once() # --------------------------------------------------------------------------- # (6) KIN-ARCH-010: Дедупликация задач в task_decomposer при повторном запуске # --------------------------------------------------------------------------- class TestDecomposerDeduplication: """KIN-ARCH-010: повторный вызов _save_decomposer_output не создаёт дубли.""" def test_double_call_with_same_data_creates_only_one_child_task(self, conn): """Регрессия: вызов _save_decomposer_output дважды с одинаковыми данными должен создать ровно одну дочернюю задачу, а не две. Broken behavior: без проверки дублей второй вызов добавлял бы вторую задачу с тем же title + parent_task_id — в БД оказывалось бы 2 строки вместо 1. """ from agents.runner import _save_decomposer_output decomposer_output = { "raw_output": json.dumps({ "tasks": [ { "title": "Implement login endpoint", "brief": "POST /api/auth/login", "priority": 3, } ] }) } r1 = _save_decomposer_output(conn, "vdol", "VDOL-001", decomposer_output) r2 = _save_decomposer_output(conn, "vdol", "VDOL-001", decomposer_output) assert r1["created"] == 1 assert r1["skipped"] == 0 assert r2["created"] == 0 assert r2["skipped"] == 1 children = conn.execute( "SELECT id FROM tasks WHERE parent_task_id = ?", ("VDOL-001",), ).fetchall() assert len(children) == 1, ( f"Ожидалась 1 дочерняя задача, получено {len(children)}" )