693 lines
30 KiB
Python
693 lines
30 KiB
Python
"""
|
||
Regression tests for KIN-091:
|
||
(1) Revise button — feedback loop, revise_count, target_role, max limit
|
||
(2) Auto-test before review — _run_project_tests, fix loop, block on exhaustion
|
||
(3) Spec-driven workflow — route exists and has correct steps in specialists.yaml
|
||
(4) Git worktrees — create/merge/cleanup/ensure_gitignore with mocked subprocess
|
||
(5) Auto-trigger pipeline — task with label 'auto' triggers pipeline on creation
|
||
"""
|
||
|
||
import json
|
||
import subprocess
|
||
import pytest
|
||
from pathlib import Path
|
||
from unittest.mock import patch, MagicMock, call
|
||
|
||
import web.api as api_module
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fixtures
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@pytest.fixture
|
||
def client(tmp_path):
|
||
db_path = tmp_path / "test.db"
|
||
api_module.DB_PATH = db_path
|
||
from web.api import app
|
||
from fastapi.testclient import TestClient
|
||
c = TestClient(app)
|
||
c.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/tmp/p1"})
|
||
c.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"})
|
||
return c
|
||
|
||
|
||
@pytest.fixture
|
||
def conn():
|
||
from core.db import init_db
|
||
from core import models
|
||
c = init_db(":memory:")
|
||
models.create_project(c, "vdol", "ВДОЛЬ", "~/projects/vdolipoperek",
|
||
tech_stack=["vue3"])
|
||
models.create_task(c, "VDOL-001", "vdol", "Fix bug",
|
||
brief={"route_type": "debug"})
|
||
yield c
|
||
c.close()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# (1) Revise button — revise_count, target_role, max limit
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestReviseEndpoint:
|
||
def test_revise_increments_revise_count(self, client):
|
||
"""revise_count начинается с 0 и увеличивается на 1 при каждом вызове."""
|
||
r = client.post("/api/tasks/P1-001/revise", json={"comment": "ещё раз"})
|
||
assert r.status_code == 200
|
||
assert r.json()["revise_count"] == 1
|
||
|
||
r = client.post("/api/tasks/P1-001/revise", json={"comment": "и ещё"})
|
||
assert r.status_code == 200
|
||
assert r.json()["revise_count"] == 2
|
||
|
||
def test_revise_stores_target_role(self, client):
|
||
"""target_role сохраняется в задаче в БД."""
|
||
from core.db import init_db
|
||
from core import models
|
||
r = client.post("/api/tasks/P1-001/revise", json={
|
||
"comment": "доработай бэкенд",
|
||
"target_role": "backend_dev",
|
||
})
|
||
assert r.status_code == 200
|
||
|
||
conn = init_db(api_module.DB_PATH)
|
||
row = conn.execute(
|
||
"SELECT revise_target_role FROM tasks WHERE id = 'P1-001'"
|
||
).fetchone()
|
||
conn.close()
|
||
assert row["revise_target_role"] == "backend_dev"
|
||
|
||
def test_revise_target_role_builds_short_steps(self, client):
|
||
"""Если передан target_role, pipeline_steps = [target_role, reviewer]."""
|
||
r = client.post("/api/tasks/P1-001/revise", json={
|
||
"comment": "фикс",
|
||
"target_role": "frontend_dev",
|
||
})
|
||
assert r.status_code == 200
|
||
steps = r.json()["pipeline_steps"]
|
||
roles = [s["role"] for s in steps]
|
||
assert roles == ["frontend_dev", "reviewer"]
|
||
|
||
def test_revise_max_count_exceeded_returns_400(self, client):
|
||
"""После 5 ревизий следующий вызов возвращает 400."""
|
||
from core.db import init_db
|
||
from core import models
|
||
conn = init_db(api_module.DB_PATH)
|
||
models.update_task(conn, "P1-001", revise_count=5)
|
||
conn.close()
|
||
|
||
r = client.post("/api/tasks/P1-001/revise", json={"comment": "6-й"})
|
||
assert r.status_code == 400
|
||
assert "Max revisions" in r.json()["detail"]
|
||
|
||
def test_revise_sets_status_in_progress(self, client):
|
||
"""После /revise задача переходит в статус in_progress."""
|
||
r = client.post("/api/tasks/P1-001/revise", json={"comment": "исправь"})
|
||
assert r.status_code == 200
|
||
assert r.json()["status"] == "in_progress"
|
||
|
||
def test_revise_only_visible_for_review_done_tasks(self, client):
|
||
"""Задача со статусом 'review' возвращает 200, а не 404."""
|
||
from core.db import init_db
|
||
from core import models
|
||
conn = init_db(api_module.DB_PATH)
|
||
models.update_task(conn, "P1-001", status="review")
|
||
conn.close()
|
||
|
||
r = client.post("/api/tasks/P1-001/revise", json={"comment": "review→revise"})
|
||
assert r.status_code == 200
|
||
|
||
def test_revise_done_task_allowed(self, client):
|
||
"""Задача со статусом 'done' тоже может быть ревизована."""
|
||
from core.db import init_db
|
||
from core import models
|
||
conn = init_db(api_module.DB_PATH)
|
||
models.update_task(conn, "P1-001", status="done")
|
||
conn.close()
|
||
|
||
r = client.post("/api/tasks/P1-001/revise", json={"comment": "done→revise"})
|
||
assert r.status_code == 200
|
||
assert r.json()["status"] == "in_progress"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# (2) Auto-test before review — _run_project_tests, fix loop, block
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestRunProjectTests:
|
||
def test_returns_success_when_make_exits_0(self):
|
||
"""_run_project_tests возвращает success=True при returncode=0."""
|
||
from agents.runner import _run_project_tests
|
||
mock_result = MagicMock()
|
||
mock_result.returncode = 0
|
||
mock_result.stdout = "All tests passed."
|
||
mock_result.stderr = ""
|
||
with patch("agents.runner.subprocess.run", return_value=mock_result):
|
||
result = _run_project_tests("/fake/path")
|
||
assert result["success"] is True
|
||
assert "All tests passed." in result["output"]
|
||
|
||
def test_returns_failure_when_make_exits_nonzero(self):
|
||
"""_run_project_tests возвращает success=False при returncode!=0."""
|
||
from agents.runner import _run_project_tests
|
||
mock_result = MagicMock()
|
||
mock_result.returncode = 2
|
||
mock_result.stdout = ""
|
||
mock_result.stderr = "FAILED 3 tests"
|
||
with patch("agents.runner.subprocess.run", return_value=mock_result):
|
||
result = _run_project_tests("/fake/path")
|
||
assert result["success"] is False
|
||
assert "FAILED" in result["output"]
|
||
|
||
def test_handles_make_not_found(self):
|
||
"""_run_project_tests возвращает success=False если make не найден."""
|
||
from agents.runner import _run_project_tests
|
||
with patch("agents.runner.subprocess.run", side_effect=FileNotFoundError):
|
||
result = _run_project_tests("/fake/path")
|
||
assert result["success"] is False
|
||
assert result["returncode"] == 127
|
||
|
||
def test_handles_timeout(self):
|
||
"""_run_project_tests возвращает success=False при таймауте."""
|
||
from agents.runner import _run_project_tests
|
||
with patch("agents.runner.subprocess.run",
|
||
side_effect=subprocess.TimeoutExpired(cmd="make", timeout=120)):
|
||
result = _run_project_tests("/fake/path", timeout=120)
|
||
assert result["success"] is False
|
||
assert result["returncode"] == 124
|
||
|
||
def test_custom_test_command_used(self):
|
||
"""KIN-ARCH-008: _run_project_tests вызывает subprocess с переданной командой."""
|
||
from agents.runner import _run_project_tests
|
||
mock_result = MagicMock()
|
||
mock_result.returncode = 0
|
||
mock_result.stdout = "2 passed"
|
||
mock_result.stderr = ""
|
||
with patch("agents.runner.subprocess.run", return_value=mock_result) as mock_sp, \
|
||
patch("agents.runner.shutil.which", return_value=None):
|
||
_run_project_tests("/fake/path", test_command="pytest -v")
|
||
called_cmd = mock_sp.call_args[0][0]
|
||
assert called_cmd[0] == "pytest"
|
||
assert "-v" in called_cmd
|
||
|
||
def test_default_test_command_is_make_test(self):
|
||
"""KIN-ARCH-008: без test_command параметра вызывается 'make test'."""
|
||
from agents.runner import _run_project_tests
|
||
mock_result = MagicMock()
|
||
mock_result.returncode = 0
|
||
mock_result.stdout = "OK"
|
||
mock_result.stderr = ""
|
||
with patch("agents.runner.subprocess.run", return_value=mock_result) as mock_sp, \
|
||
patch("agents.runner.shutil.which", return_value=None):
|
||
_run_project_tests("/fake/path")
|
||
called_cmd = mock_sp.call_args[0][0]
|
||
assert called_cmd[0] == "make"
|
||
assert "test" in called_cmd
|
||
|
||
def test_custom_command_not_found_returns_127(self):
|
||
"""KIN-ARCH-008: кастомная команда не найдена → returncode 127."""
|
||
from agents.runner import _run_project_tests
|
||
with patch("agents.runner.subprocess.run", side_effect=FileNotFoundError), \
|
||
patch("agents.runner.shutil.which", return_value=None):
|
||
result = _run_project_tests("/fake/path", test_command="nonexistent-cmd --flag")
|
||
assert result["success"] is False
|
||
assert result["returncode"] == 127
|
||
|
||
def test_run_project_tests_empty_string_returns_failure(self):
|
||
"""KIN-ARCH-008 AC#7: пустая строка test_command возвращает returncode -1, success=False."""
|
||
from agents.runner import _run_project_tests
|
||
result = _run_project_tests("/fake/path", test_command="")
|
||
assert result["success"] is False
|
||
assert result["returncode"] == -1
|
||
assert "Empty test_command" in result["output"]
|
||
|
||
def test_returncode_127_output_contains_not_found(self):
|
||
"""KIN-ARCH-008 AC#7: при returncode 127 вывод содержит 'not found' для диагностики."""
|
||
from agents.runner import _run_project_tests
|
||
with patch("agents.runner.subprocess.run", side_effect=FileNotFoundError), \
|
||
patch("agents.runner.shutil.which", return_value=None):
|
||
result = _run_project_tests("/fake/path", test_command="badcmd")
|
||
assert "not found" in result["output"].lower()
|
||
|
||
|
||
def _mock_success(output="done"):
|
||
m = MagicMock()
|
||
m.stdout = json.dumps({"result": output})
|
||
m.stderr = ""
|
||
m.returncode = 0
|
||
return m
|
||
|
||
|
||
def _mock_failure(msg="error"):
|
||
m = MagicMock()
|
||
m.stdout = ""
|
||
m.stderr = msg
|
||
m.returncode = 1
|
||
return m
|
||
|
||
|
||
class TestAutoTestInPipeline:
|
||
"""Pipeline с auto_test_enabled: тесты запускаются автоматически после dev-шага."""
|
||
|
||
@patch("agents.runner._run_autocommit")
|
||
@patch("agents.runner._run_project_tests")
|
||
@patch("agents.runner.subprocess.run")
|
||
def test_auto_test_passes_pipeline_continues(
|
||
self, mock_run, mock_tests, mock_autocommit, conn
|
||
):
|
||
"""Если авто-тест проходит — pipeline завершается успешно."""
|
||
from agents.runner import run_pipeline
|
||
from core import models
|
||
mock_run.return_value = _mock_success()
|
||
mock_tests.return_value = {"success": True, "output": "OK", "returncode": 0}
|
||
models.update_project(conn, "vdol", auto_test_enabled=True)
|
||
|
||
steps = [{"role": "backend_dev", "brief": "implement"}]
|
||
result = run_pipeline(conn, "VDOL-001", steps)
|
||
|
||
assert result["success"] is True
|
||
mock_tests.assert_called_once()
|
||
|
||
@patch("agents.runner._run_autocommit")
|
||
@patch("agents.runner._run_project_tests")
|
||
@patch("agents.runner.subprocess.run")
|
||
def test_auto_test_disabled_not_called(
|
||
self, mock_run, mock_tests, mock_autocommit, conn
|
||
):
|
||
"""Если auto_test_enabled=False — make test не вызывается."""
|
||
from agents.runner import run_pipeline
|
||
from core import models
|
||
mock_run.return_value = _mock_success()
|
||
# auto_test_enabled по умолчанию 0
|
||
|
||
steps = [{"role": "backend_dev", "brief": "implement"}]
|
||
run_pipeline(conn, "VDOL-001", steps)
|
||
|
||
mock_tests.assert_not_called()
|
||
|
||
@patch("agents.runner._run_autocommit")
|
||
@patch("agents.runner._run_project_tests")
|
||
@patch("agents.runner.subprocess.run")
|
||
def test_auto_test_fail_triggers_fix_loop(
|
||
self, mock_run, mock_tests, mock_autocommit, conn
|
||
):
|
||
"""Если авто-тест падает — запускается fixer агент и тесты перезапускаются."""
|
||
from agents.runner import run_pipeline
|
||
from core import models
|
||
import os
|
||
mock_run.return_value = _mock_success()
|
||
# First test call fails, second passes
|
||
mock_tests.side_effect = [
|
||
{"success": False, "output": "FAILED: test_foo", "returncode": 1},
|
||
{"success": True, "output": "OK", "returncode": 0},
|
||
]
|
||
models.update_project(conn, "vdol", auto_test_enabled=True)
|
||
|
||
with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "3"}):
|
||
steps = [{"role": "backend_dev", "brief": "implement"}]
|
||
result = run_pipeline(conn, "VDOL-001", steps)
|
||
|
||
assert result["success"] is True
|
||
# _run_project_tests called twice: initial check + after fix
|
||
assert mock_tests.call_count == 2
|
||
# subprocess.run called at least twice: backend_dev + fixer backend_dev
|
||
assert mock_run.call_count >= 2
|
||
|
||
@patch("agents.runner._run_autocommit")
|
||
@patch("agents.runner._run_project_tests")
|
||
@patch("agents.runner.subprocess.run")
|
||
def test_auto_test_exhausted_blocks_task(
|
||
self, mock_run, mock_tests, mock_autocommit, conn
|
||
):
|
||
"""Если авто-тест падает max_attempts раз — задача блокируется."""
|
||
from agents.runner import run_pipeline
|
||
from core import models
|
||
import os
|
||
|
||
mock_run.return_value = _mock_success()
|
||
# Тест всегда падает
|
||
mock_tests.return_value = {"success": False, "output": "FAILED", "returncode": 1}
|
||
models.update_project(conn, "vdol", auto_test_enabled=True)
|
||
|
||
with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "2"}):
|
||
steps = [{"role": "backend_dev", "brief": "implement"}]
|
||
result = run_pipeline(conn, "VDOL-001", steps)
|
||
|
||
assert result["success"] is False
|
||
task = models.get_task(conn, "VDOL-001")
|
||
assert task["status"] == "blocked"
|
||
assert "Auto-test" in (task.get("blocked_reason") or "")
|
||
|
||
@patch("agents.runner._run_autocommit")
|
||
@patch("agents.runner._run_project_tests")
|
||
@patch("agents.runner.subprocess.run")
|
||
def test_auto_test_not_triggered_for_non_dev_roles(
|
||
self, mock_run, mock_tests, mock_autocommit, conn
|
||
):
|
||
"""auto_test запускается только для backend_dev/frontend_dev, не для debugger."""
|
||
from agents.runner import run_pipeline
|
||
from core import models
|
||
mock_run.return_value = _mock_success()
|
||
models.update_project(conn, "vdol", auto_test_enabled=True)
|
||
|
||
steps = [{"role": "debugger", "brief": "find"}]
|
||
run_pipeline(conn, "VDOL-001", steps)
|
||
|
||
mock_tests.assert_not_called()
|
||
|
||
@patch("agents.runner._run_autocommit")
|
||
@patch("agents.runner._run_project_tests")
|
||
@patch("agents.runner.subprocess.run")
|
||
def test_auto_test_uses_project_test_command(
|
||
self, mock_run, mock_tests, mock_autocommit, conn
|
||
):
|
||
"""KIN-ARCH-008: pipeline передаёт project.test_command в _run_project_tests."""
|
||
from agents.runner import run_pipeline
|
||
from core import models
|
||
mock_run.return_value = _mock_success()
|
||
mock_tests.return_value = {"success": True, "output": "OK", "returncode": 0}
|
||
models.update_project(conn, "vdol", auto_test_enabled=True, test_command="npm test")
|
||
|
||
steps = [{"role": "backend_dev", "brief": "implement"}]
|
||
run_pipeline(conn, "VDOL-001", steps)
|
||
|
||
mock_tests.assert_called_once()
|
||
called_test_command = mock_tests.call_args[0][1]
|
||
assert called_test_command == "npm test"
|
||
|
||
@patch("agents.runner._run_autocommit")
|
||
@patch("agents.runner._run_project_tests")
|
||
@patch("agents.runner.subprocess.run")
|
||
def test_auto_test_returncode_127_blocks_task(
|
||
self, mock_run, mock_tests, mock_autocommit, conn
|
||
):
|
||
"""KIN-ARCH-008 AC#7: returncode 127 (команда не найдена) исчерпывает попытки и блокирует задачу."""
|
||
from agents.runner import run_pipeline
|
||
from core import models
|
||
import os
|
||
|
||
mock_run.return_value = _mock_success()
|
||
# Команда не найдена — всегда 127, always fails
|
||
mock_tests.return_value = {"success": False, "output": "badcmd not found in PATH", "returncode": 127}
|
||
models.update_project(conn, "vdol", auto_test_enabled=True, test_command="badcmd")
|
||
|
||
with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "1"}):
|
||
steps = [{"role": "backend_dev", "brief": "implement"}]
|
||
result = run_pipeline(conn, "VDOL-001", steps)
|
||
|
||
assert result["success"] is False
|
||
task = models.get_task(conn, "VDOL-001")
|
||
assert task["status"] == "blocked"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# (3) Spec-driven workflow route
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestSpecDrivenRoute:
|
||
def _load_specialists(self):
|
||
import yaml
|
||
spec_path = Path(__file__).parent.parent / "agents" / "specialists.yaml"
|
||
with open(spec_path) as f:
|
||
return yaml.safe_load(f)
|
||
|
||
def test_spec_driven_route_exists(self):
|
||
"""Маршрут spec_driven должен быть объявлен в specialists.yaml."""
|
||
data = self._load_specialists()
|
||
assert "spec_driven" in data.get("routes", {})
|
||
|
||
def test_spec_driven_route_steps_order(self):
|
||
"""spec_driven route: шаги [constitution, spec, architect, task_decomposer]."""
|
||
data = self._load_specialists()
|
||
steps = data["routes"]["spec_driven"]["steps"]
|
||
assert steps == ["constitution", "spec", "architect", "task_decomposer"]
|
||
|
||
def test_spec_driven_all_roles_exist(self):
|
||
"""Все роли в spec_driven route должны быть объявлены в specialists."""
|
||
data = self._load_specialists()
|
||
specialists = data.get("specialists", {})
|
||
for role in data["routes"]["spec_driven"]["steps"]:
|
||
assert role in specialists, f"Role '{role}' missing from specialists"
|
||
|
||
def test_constitution_role_has_output_schema(self):
|
||
"""constitution должен иметь output_schema (principles, constraints, goals)."""
|
||
data = self._load_specialists()
|
||
schema = data["specialists"]["constitution"].get("output_schema", {})
|
||
assert "principles" in schema
|
||
assert "constraints" in schema
|
||
assert "goals" in schema
|
||
|
||
def test_spec_role_has_output_schema(self):
|
||
"""spec должен иметь output_schema (overview, features, api_contracts)."""
|
||
data = self._load_specialists()
|
||
schema = data["specialists"]["spec"].get("output_schema", {})
|
||
assert "overview" in schema
|
||
assert "features" in schema
|
||
assert "api_contracts" in schema
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# (4) Git worktrees — create / merge / cleanup / ensure_gitignore
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestCreateWorktree:
|
||
def test_create_worktree_success(self, tmp_path):
|
||
"""create_worktree возвращает путь при успешном git worktree add."""
|
||
from core.worktree import create_worktree
|
||
mock_r = MagicMock()
|
||
mock_r.returncode = 0
|
||
mock_r.stderr = ""
|
||
with patch("core.worktree.subprocess.run", return_value=mock_r):
|
||
path = create_worktree(str(tmp_path), "TASK-001", "backend_dev")
|
||
assert path is not None
|
||
assert "TASK-001-backend_dev" in path
|
||
|
||
def test_create_worktree_git_failure_returns_none(self, tmp_path):
|
||
"""create_worktree возвращает None если git worktree add провалился."""
|
||
from core.worktree import create_worktree
|
||
mock_r = MagicMock()
|
||
mock_r.returncode = 128
|
||
mock_r.stderr = "fatal: branch already exists"
|
||
with patch("core.worktree.subprocess.run", return_value=mock_r):
|
||
path = create_worktree(str(tmp_path), "TASK-001", "backend_dev")
|
||
assert path is None
|
||
|
||
def test_create_worktree_exception_returns_none(self, tmp_path):
|
||
"""create_worktree возвращает None при неожиданном исключении (не поднимает)."""
|
||
from core.worktree import create_worktree
|
||
with patch("core.worktree.subprocess.run", side_effect=OSError("no git")):
|
||
path = create_worktree(str(tmp_path), "TASK-001", "backend_dev")
|
||
assert path is None
|
||
|
||
def test_create_worktree_branch_name_sanitized(self, tmp_path):
|
||
"""Слэши и пробелы в имени шага заменяются на _."""
|
||
from core.worktree import create_worktree
|
||
mock_r = MagicMock()
|
||
mock_r.returncode = 0
|
||
mock_r.stderr = ""
|
||
calls_made = []
|
||
def capture(*args, **kwargs):
|
||
calls_made.append(args[0])
|
||
return mock_r
|
||
with patch("core.worktree.subprocess.run", side_effect=capture):
|
||
create_worktree(str(tmp_path), "TASK-001", "step/with spaces")
|
||
assert calls_made
|
||
cmd = calls_made[0]
|
||
branch = cmd[cmd.index("-b") + 1]
|
||
assert "/" not in branch
|
||
assert " " not in branch
|
||
|
||
|
||
class TestMergeWorktree:
|
||
def test_merge_success_returns_merged_files(self, tmp_path):
|
||
"""merge_worktree возвращает success=True и список файлов при успешном merge."""
|
||
from core.worktree import merge_worktree
|
||
worktree = str(tmp_path / "TASK-001-backend_dev")
|
||
|
||
merge_ok = MagicMock(returncode=0, stdout="", stderr="")
|
||
diff_ok = MagicMock(returncode=0, stdout="src/api.py\nsrc/models.py\n", stderr="")
|
||
|
||
with patch("core.worktree.subprocess.run", side_effect=[merge_ok, diff_ok]):
|
||
result = merge_worktree(worktree, str(tmp_path))
|
||
|
||
assert result["success"] is True
|
||
assert "src/api.py" in result["merged_files"]
|
||
assert result["conflicts"] == []
|
||
|
||
def test_merge_conflict_returns_conflict_list(self, tmp_path):
|
||
"""merge_worktree возвращает success=False и список конфликтных файлов."""
|
||
from core.worktree import merge_worktree
|
||
worktree = str(tmp_path / "TASK-001-backend_dev")
|
||
|
||
merge_fail = MagicMock(returncode=1, stdout="", stderr="CONFLICT")
|
||
conflict_files = MagicMock(returncode=0, stdout="src/models.py\n", stderr="")
|
||
abort = MagicMock(returncode=0)
|
||
|
||
with patch("core.worktree.subprocess.run",
|
||
side_effect=[merge_fail, conflict_files, abort]):
|
||
result = merge_worktree(worktree, str(tmp_path))
|
||
|
||
assert result["success"] is False
|
||
assert "src/models.py" in result["conflicts"]
|
||
|
||
def test_merge_exception_returns_success_false(self, tmp_path):
|
||
"""merge_worktree никогда не поднимает исключение."""
|
||
from core.worktree import merge_worktree
|
||
with patch("core.worktree.subprocess.run", side_effect=OSError("git died")):
|
||
result = merge_worktree("/fake/wt", str(tmp_path))
|
||
assert result["success"] is False
|
||
assert "error" in result
|
||
|
||
|
||
class TestCleanupWorktree:
|
||
def test_cleanup_calls_worktree_remove_and_branch_delete(self, tmp_path):
|
||
"""cleanup_worktree вызывает git worktree remove и git branch -D."""
|
||
from core.worktree import cleanup_worktree
|
||
calls = []
|
||
def capture(*args, **kwargs):
|
||
calls.append(args[0])
|
||
return MagicMock(returncode=0)
|
||
with patch("core.worktree.subprocess.run", side_effect=capture):
|
||
cleanup_worktree("/fake/path/TASK-branch", str(tmp_path))
|
||
assert len(calls) == 2
|
||
# первый: worktree remove
|
||
assert "worktree" in calls[0]
|
||
assert "remove" in calls[0]
|
||
# второй: branch -D
|
||
assert "branch" in calls[1]
|
||
assert "-D" in calls[1]
|
||
|
||
def test_cleanup_never_raises(self, tmp_path):
|
||
"""cleanup_worktree не поднимает исключение при ошибке."""
|
||
from core.worktree import cleanup_worktree
|
||
with patch("core.worktree.subprocess.run", side_effect=OSError("crashed")):
|
||
cleanup_worktree("/fake/wt", str(tmp_path)) # должно пройти тихо
|
||
|
||
|
||
class TestEnsureGitignore:
|
||
def test_adds_entry_to_existing_gitignore(self, tmp_path):
|
||
"""ensure_gitignore добавляет .kin_worktrees/ в существующий .gitignore."""
|
||
from core.worktree import ensure_gitignore
|
||
gi = tmp_path / ".gitignore"
|
||
gi.write_text("*.pyc\n__pycache__/\n")
|
||
ensure_gitignore(str(tmp_path))
|
||
assert ".kin_worktrees/" in gi.read_text()
|
||
|
||
def test_creates_gitignore_if_missing(self, tmp_path):
|
||
"""ensure_gitignore создаёт .gitignore если его нет."""
|
||
from core.worktree import ensure_gitignore
|
||
ensure_gitignore(str(tmp_path))
|
||
gi = tmp_path / ".gitignore"
|
||
assert gi.exists()
|
||
assert ".kin_worktrees/" in gi.read_text()
|
||
|
||
def test_skips_if_entry_already_present(self, tmp_path):
|
||
"""ensure_gitignore не дублирует запись."""
|
||
from core.worktree import ensure_gitignore
|
||
gi = tmp_path / ".gitignore"
|
||
gi.write_text(".kin_worktrees/\n")
|
||
ensure_gitignore(str(tmp_path))
|
||
content = gi.read_text()
|
||
assert content.count(".kin_worktrees/") == 1
|
||
|
||
def test_never_raises_on_permission_error(self, tmp_path):
|
||
"""ensure_gitignore не поднимает исключение при ошибке записи."""
|
||
from core.worktree import ensure_gitignore
|
||
with patch("core.worktree.Path.open", side_effect=PermissionError):
|
||
ensure_gitignore(str(tmp_path)) # должно пройти тихо
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# (5) Auto-trigger pipeline — label 'auto'
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestAutoTrigger:
|
||
def test_task_with_auto_label_triggers_pipeline(self, client):
|
||
"""Создание задачи с label 'auto' запускает pipeline в фоне."""
|
||
with patch("web.api._launch_pipeline_subprocess") as mock_launch:
|
||
r = client.post("/api/tasks", json={
|
||
"project_id": "p1",
|
||
"title": "Auto task",
|
||
"labels": ["auto"],
|
||
})
|
||
assert r.status_code == 200
|
||
mock_launch.assert_called_once()
|
||
called_task_id = mock_launch.call_args[0][0]
|
||
assert called_task_id.startswith("P1-")
|
||
|
||
def test_task_without_auto_label_does_not_trigger(self, client):
|
||
"""Создание задачи без label 'auto' НЕ запускает pipeline."""
|
||
with patch("web.api._launch_pipeline_subprocess") as mock_launch:
|
||
r = client.post("/api/tasks", json={
|
||
"project_id": "p1",
|
||
"title": "Manual task",
|
||
"labels": ["feature"],
|
||
})
|
||
assert r.status_code == 200
|
||
mock_launch.assert_not_called()
|
||
|
||
def test_task_without_labels_does_not_trigger(self, client):
|
||
"""Создание задачи без labels вообще НЕ запускает pipeline."""
|
||
with patch("web.api._launch_pipeline_subprocess") as mock_launch:
|
||
r = client.post("/api/tasks", json={
|
||
"project_id": "p1",
|
||
"title": "Plain task",
|
||
})
|
||
assert r.status_code == 200
|
||
mock_launch.assert_not_called()
|
||
|
||
def test_task_with_auto_among_multiple_labels_triggers(self, client):
|
||
"""Задача с несколькими метками включая 'auto' запускает pipeline."""
|
||
with patch("web.api._launch_pipeline_subprocess") as mock_launch:
|
||
r = client.post("/api/tasks", json={
|
||
"project_id": "p1",
|
||
"title": "Multi-label auto task",
|
||
"labels": ["feature", "auto", "backend"],
|
||
})
|
||
assert r.status_code == 200
|
||
mock_launch.assert_called_once()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# (6) KIN-ARCH-010: Дедупликация задач в task_decomposer при повторном запуске
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestDecomposerDeduplication:
|
||
"""KIN-ARCH-010: повторный вызов _save_decomposer_output не создаёт дубли."""
|
||
|
||
def test_double_call_with_same_data_creates_only_one_child_task(self, conn):
|
||
"""Регрессия: вызов _save_decomposer_output дважды с одинаковыми данными
|
||
должен создать ровно одну дочернюю задачу, а не две.
|
||
|
||
Broken behavior: без проверки дублей второй вызов добавлял бы вторую задачу
|
||
с тем же title + parent_task_id — в БД оказывалось бы 2 строки вместо 1.
|
||
"""
|
||
from agents.runner import _save_decomposer_output
|
||
|
||
decomposer_output = {
|
||
"raw_output": json.dumps({
|
||
"tasks": [
|
||
{
|
||
"title": "Implement login endpoint",
|
||
"brief": "POST /api/auth/login",
|
||
"priority": 3,
|
||
}
|
||
]
|
||
})
|
||
}
|
||
|
||
r1 = _save_decomposer_output(conn, "vdol", "VDOL-001", decomposer_output)
|
||
r2 = _save_decomposer_output(conn, "vdol", "VDOL-001", decomposer_output)
|
||
|
||
assert r1["created"] == 1
|
||
assert r1["skipped"] == 0
|
||
assert r2["created"] == 0
|
||
assert r2["skipped"] == 1
|
||
|
||
children = conn.execute(
|
||
"SELECT id FROM tasks WHERE parent_task_id = ?",
|
||
("VDOL-001",),
|
||
).fetchall()
|
||
assert len(children) == 1, (
|
||
f"Ожидалась 1 дочерняя задача, получено {len(children)}"
|
||
)
|