"""Tests for web/api.py — new task endpoints (pipeline, approve, reject, full).""" import pytest from pathlib import Path from unittest.mock import patch, MagicMock from fastapi.testclient import TestClient # Patch DB_PATH before importing app import web.api as api_module @pytest.fixture def client(tmp_path): db_path = tmp_path / "test.db" api_module.DB_PATH = db_path from web.api import app c = TestClient(app) # Seed data c.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"}) c.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"}) return c def test_get_task(client): r = client.get("/api/tasks/P1-001") assert r.status_code == 200 assert r.json()["title"] == "Fix bug" def test_get_task_not_found(client): r = client.get("/api/tasks/NOPE") assert r.status_code == 404 def test_task_pipeline_empty(client): r = client.get("/api/tasks/P1-001/pipeline") assert r.status_code == 200 assert r.json() == [] def test_task_pipeline_with_logs(client): # Insert agent logs directly from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.log_agent_run(conn, "p1", "debugger", "execute", task_id="P1-001", output_summary="Found bug", tokens_used=1000, duration_seconds=5, success=True) models.log_agent_run(conn, "p1", "tester", "execute", task_id="P1-001", output_summary="Tests pass", tokens_used=500, duration_seconds=3, success=True) conn.close() r = client.get("/api/tasks/P1-001/pipeline") assert r.status_code == 200 steps = r.json() assert len(steps) == 2 assert steps[0]["agent_role"] == "debugger" assert steps[0]["output_summary"] == "Found bug" assert steps[1]["agent_role"] == "tester" def test_task_full(client): r = client.get("/api/tasks/P1-001/full") assert r.status_code == 200 data = r.json() assert data["id"] == "P1-001" assert "pipeline_steps" in data assert "related_decisions" in data def test_task_full_not_found(client): r = client.get("/api/tasks/NOPE/full") assert r.status_code == 404 def test_approve_task(client): # First set task to review from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="review") conn.close() r = client.post("/api/tasks/P1-001/approve", json={}) assert r.status_code == 200 assert r.json()["status"] == "done" # Verify task is done r = client.get("/api/tasks/P1-001") assert r.json()["status"] == "done" def test_approve_with_decision(client): r = client.post("/api/tasks/P1-001/approve", json={ "decision_title": "Use AbortController", "decision_description": "Fix race condition with AbortController", "decision_type": "decision", }) assert r.status_code == 200 assert r.json()["decision"] is not None assert r.json()["decision"]["title"] == "Use AbortController" def test_approve_not_found(client): r = client.post("/api/tasks/NOPE/approve", json={}) assert r.status_code == 404 def test_approve_fires_task_done_hooks(client): """Ручной апрув задачи должен вызывать хуки с event='task_done'.""" from unittest.mock import patch with patch("core.hooks.run_hooks") as mock_hooks: mock_hooks.return_value = [] r = client.post("/api/tasks/P1-001/approve", json={}) assert r.status_code == 200 events_fired = [call[1].get("event") or call[0][3] for call in mock_hooks.call_args_list] assert "task_done" in events_fired def test_reject_task(client): from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="review") conn.close() r = client.post("/api/tasks/P1-001/reject", json={ "reason": "Didn't fix the root cause" }) assert r.status_code == 200 assert r.json()["status"] == "pending" # Verify task is pending with review reason r = client.get("/api/tasks/P1-001") data = r.json() assert data["status"] == "pending" assert data["review"]["rejected"] == "Didn't fix the root cause" def test_reject_not_found(client): r = client.post("/api/tasks/NOPE/reject", json={"reason": "bad"}) assert r.status_code == 404 def test_revise_task(client): from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="review") conn.close() r = client.post("/api/tasks/P1-001/revise", json={ "comment": "Доисследуй edge case с пустым массивом" }) assert r.status_code == 200 assert r.json()["status"] == "in_progress" # Verify task is in_progress with revise_comment stored conn = init_db(api_module.DB_PATH) row = conn.execute("SELECT status, revise_comment FROM tasks WHERE id = 'P1-001'").fetchone() conn.close() assert row["status"] == "in_progress" assert row["revise_comment"] == "Доисследуй edge case с пустым массивом" def test_revise_not_found(client): r = client.post("/api/tasks/NOPE/revise", json={"comment": "fix it"}) assert r.status_code == 404 def test_revise_task_response_includes_comment(client): """Ответ /revise содержит поле comment с переданным текстом.""" r = client.post("/api/tasks/P1-001/revise", json={"comment": "Уточни требования"}) assert r.status_code == 200 assert r.json()["comment"] == "Уточни требования" def test_revise_task_missing_comment_returns_422(client): """Запрос /revise без поля comment → 422 Unprocessable Entity (Pydantic validation).""" r = client.post("/api/tasks/P1-001/revise", json={}) assert r.status_code == 422 def test_task_pipeline_not_found(client): r = client.get("/api/tasks/NOPE/pipeline") assert r.status_code == 404 def test_running_endpoint_no_pipeline(client): r = client.get("/api/tasks/P1-001/running") assert r.status_code == 200 assert r.json()["running"] is False def test_running_endpoint_with_pipeline(client): from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.create_pipeline(conn, "P1-001", "p1", "debug", [{"role": "debugger"}]) conn.close() r = client.get("/api/tasks/P1-001/running") assert r.status_code == 200 assert r.json()["running"] is True def test_running_endpoint_not_found(client): r = client.get("/api/tasks/NOPE/running") assert r.status_code == 404 def test_run_sets_in_progress(client): """POST /run should set task to in_progress immediately.""" r = client.post("/api/tasks/P1-001/run") assert r.status_code == 202 r = client.get("/api/tasks/P1-001") assert r.json()["status"] == "in_progress" def test_run_not_found(client): r = client.post("/api/tasks/NOPE/run") assert r.status_code == 404 def test_run_returns_503_when_claude_not_authenticated(client): """KIN-083: /run возвращает 503 с claude_auth_required если claude не залогинен.""" from agents.runner import ClaudeAuthError with patch("agents.runner.check_claude_auth", side_effect=ClaudeAuthError("Claude CLI requires login. Run: claude login")): r = client.post("/api/tasks/P1-001/run") assert r.status_code == 503 body = r.json() assert body["detail"]["error"] == "claude_auth_required" assert body["detail"]["instructions"] == "Run: claude login" assert "login" in body["detail"]["message"].lower() def test_start_phase_returns_503_when_claude_not_authenticated(client): """KIN-083: /phases/start возвращает 503 с claude_auth_required если claude не залогинен.""" from agents.runner import ClaudeAuthError with patch("agents.runner.check_claude_auth", side_effect=ClaudeAuthError("Claude CLI requires login. Run: claude login")): r = client.post("/api/projects/p1/phases/start") assert r.status_code == 503 body = r.json() assert body["detail"]["error"] == "claude_auth_required" assert body["detail"]["instructions"] == "Run: claude login" assert "login" in body["detail"]["message"].lower() def test_run_kin_038_without_allow_write(client): """Регрессионный тест KIN-038: allow_write удалён из схемы, эндпоинт принимает запросы с пустым телом без этого параметра.""" r = client.post("/api/tasks/P1-001/run", json={}) assert r.status_code == 202 def test_run_with_empty_body(client): """POST /run with empty JSON body should be accepted.""" r = client.post("/api/tasks/P1-001/run", json={}) assert r.status_code == 202 def test_run_without_body(client): """POST /run without body should be backwards-compatible.""" r = client.post("/api/tasks/P1-001/run") assert r.status_code == 202 def test_project_summary_includes_review(client): from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="review") conn.close() r = client.get("/api/projects") projects = r.json() assert projects[0]["review_tasks"] == 1 def test_audit_not_found(client): r = client.post("/api/projects/NOPE/audit") assert r.status_code == 404 def test_audit_apply(client): """POST /audit/apply should mark tasks as done.""" r = client.post("/api/projects/p1/audit/apply", json={"task_ids": ["P1-001"]}) assert r.status_code == 200 assert r.json()["count"] == 1 assert "P1-001" in r.json()["updated"] # Verify task is done r = client.get("/api/tasks/P1-001") assert r.json()["status"] == "done" def test_audit_apply_not_found(client): r = client.post("/api/projects/NOPE/audit/apply", json={"task_ids": ["P1-001"]}) assert r.status_code == 404 def test_audit_apply_wrong_project(client): """Tasks not belonging to the project should be skipped.""" r = client.post("/api/projects/p1/audit/apply", json={"task_ids": ["WRONG-001"]}) assert r.status_code == 200 assert r.json()["count"] == 0 # --------------------------------------------------------------------------- # PATCH /api/tasks/{task_id} — смена статуса # --------------------------------------------------------------------------- def test_patch_task_status(client): """PATCH должен обновить статус и вернуть задачу.""" r = client.patch("/api/tasks/P1-001", json={"status": "review"}) assert r.status_code == 200 data = r.json() assert data["status"] == "review" assert data["id"] == "P1-001" def test_patch_task_status_persisted(client): """После PATCH повторный GET должен возвращать новый статус.""" client.patch("/api/tasks/P1-001", json={"status": "blocked"}) r = client.get("/api/tasks/P1-001") assert r.status_code == 200 assert r.json()["status"] == "blocked" @pytest.mark.parametrize("status", ["pending", "in_progress", "review", "done", "blocked", "decomposed", "cancelled"]) def test_patch_task_all_valid_statuses(client, status): """Все 7 допустимых статусов должны приниматься (включая decomposed).""" r = client.patch("/api/tasks/P1-001", json={"status": status}) assert r.status_code == 200 assert r.json()["status"] == status def test_patch_task_status_decomposed(client): """Регрессионный тест KIN-033: API принимает статус 'decomposed'.""" r = client.patch("/api/tasks/P1-001", json={"status": "decomposed"}) assert r.status_code == 200 assert r.json()["status"] == "decomposed" def test_patch_task_status_decomposed_persisted(client): """После установки 'decomposed' повторный GET возвращает этот статус.""" client.patch("/api/tasks/P1-001", json={"status": "decomposed"}) r = client.get("/api/tasks/P1-001") assert r.status_code == 200 assert r.json()["status"] == "decomposed" # --------------------------------------------------------------------------- # KIN-033 — единый источник истины для статусов # --------------------------------------------------------------------------- def test_api_valid_statuses_match_models(): """API использует models.VALID_TASK_STATUSES как единственный источник истины.""" from core import models import web.api as api_module assert api_module.VALID_STATUSES == set(models.VALID_TASK_STATUSES) def test_cli_valid_statuses_match_models(): """CLI использует models.VALID_TASK_STATUSES как единственный источник истины.""" from core import models from cli.main import task_update status_param = next(p for p in task_update.params if p.name == "status") cli_choices = set(status_param.type.choices) assert cli_choices == set(models.VALID_TASK_STATUSES) def test_cli_and_api_statuses_are_identical(): """Список статусов в CLI и API идентичен.""" from core import models import web.api as api_module from cli.main import task_update status_param = next(p for p in task_update.params if p.name == "status") cli_choices = set(status_param.type.choices) assert cli_choices == api_module.VALID_STATUSES assert "decomposed" in cli_choices assert "decomposed" in api_module.VALID_STATUSES def test_patch_task_invalid_status(client): """Недопустимый статус → 400.""" r = client.patch("/api/tasks/P1-001", json={"status": "flying"}) assert r.status_code == 400 def test_patch_task_not_found(client): """Несуществующая задача → 404.""" r = client.patch("/api/tasks/NOPE-999", json={"status": "done"}) assert r.status_code == 404 def test_patch_task_empty_body_returns_400(client): """PATCH с пустым телом (нет status и нет execution_mode) → 400.""" r = client.patch("/api/tasks/P1-001", json={}) assert r.status_code == 400 def test_patch_task_execution_mode_auto_complete_accepted(client): """KIN-063: execution_mode='auto_complete' принимается (200).""" r = client.patch("/api/tasks/P1-001", json={"execution_mode": "auto_complete"}) assert r.status_code == 200 assert r.json()["execution_mode"] == "auto_complete" def test_patch_task_execution_mode_auto_rejected(client): """KIN-063: старое значение 'auto' должно отклоняться (400) — Decision #29.""" r = client.patch("/api/tasks/P1-001", json={"execution_mode": "auto"}) assert r.status_code == 400 def test_patch_task_execution_mode_review_accepted(client): """KIN-074: execution_mode='review' принимается (200) — регрессия после фикса frontend.""" r = client.patch("/api/tasks/P1-001", json={"execution_mode": "review"}) assert r.status_code == 200 assert r.json()["execution_mode"] == "review" # --------------------------------------------------------------------------- # KIN-022 — blocked_reason: регрессионные тесты # --------------------------------------------------------------------------- def test_blocked_reason_saved_and_returned(client): """При переходе в blocked с blocked_reason поле сохраняется и отдаётся в GET.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="blocked", blocked_reason="Step 1/2 (debugger) failed") conn.close() r = client.get("/api/tasks/P1-001") assert r.status_code == 200 data = r.json() assert data["status"] == "blocked" assert data["blocked_reason"] == "Step 1/2 (debugger) failed" def test_blocked_reason_present_in_full(client): """blocked_reason также присутствует в /full эндпоинте.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="blocked", blocked_reason="tester agent crashed") conn.close() r = client.get("/api/tasks/P1-001/full") assert r.status_code == 200 data = r.json() assert data["status"] == "blocked" assert data["blocked_reason"] == "tester agent crashed" def test_blocked_reason_none_by_default(client): """Новая задача не имеет blocked_reason.""" r = client.get("/api/tasks/P1-001") assert r.status_code == 200 data = r.json() assert data["blocked_reason"] is None def test_blocked_without_reason_allowed(client): """Переход в blocked без причины допустим (reason=None).""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="blocked") conn.close() r = client.get("/api/tasks/P1-001") assert r.status_code == 200 data = r.json() assert data["status"] == "blocked" assert data["blocked_reason"] is None def test_blocked_reason_cleared_on_retry(client): """При повторном запуске (статус pending) blocked_reason сбрасывается.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="blocked", blocked_reason="failed once") models.update_task(conn, "P1-001", status="pending", blocked_reason=None) conn.close() r = client.get("/api/tasks/P1-001") assert r.status_code == 200 data = r.json() assert data["status"] == "pending" assert data["blocked_reason"] is None # --------------------------------------------------------------------------- # KIN-029 — DELETE /api/projects/{project_id}/decisions/{decision_id} # --------------------------------------------------------------------------- def test_delete_decision_ok(client): """Создаём decision через POST, удаляем DELETE → 200 с телом {"deleted": id}.""" r = client.post("/api/decisions", json={ "project_id": "p1", "type": "decision", "title": "Use SQLite", "description": "Chosen for simplicity", }) assert r.status_code == 200 decision_id = r.json()["id"] r = client.delete(f"/api/projects/p1/decisions/{decision_id}") assert r.status_code == 200 assert r.json() == {"deleted": decision_id} r = client.get("/api/decisions?project=p1") assert r.status_code == 200 ids = [d["id"] for d in r.json()] assert decision_id not in ids def test_delete_decision_not_found(client): """DELETE несуществующего decision → 404.""" r = client.delete("/api/projects/p1/decisions/99999") assert r.status_code == 404 def test_delete_decision_wrong_project(client): """DELETE decision с чужим project_id → 404 (не раскрываем существование).""" r = client.post("/api/decisions", json={ "project_id": "p1", "type": "decision", "title": "Cross-project check", "description": "Should not be deletable from p2", }) assert r.status_code == 200 decision_id = r.json()["id"] r = client.delete(f"/api/projects/p2/decisions/{decision_id}") assert r.status_code == 404 # Decision должен остаться нетронутым r = client.get("/api/decisions?project=p1") ids = [d["id"] for d in r.json()] assert decision_id in ids # --------------------------------------------------------------------------- # KIN-035 — регрессионный тест: смена статуса на cancelled # --------------------------------------------------------------------------- def test_patch_task_status_cancelled(client): """Регрессионный тест KIN-035: PATCH /api/tasks/{id} с status='cancelled' → 200.""" r = client.patch("/api/tasks/P1-001", json={"status": "cancelled"}) assert r.status_code == 200 assert r.json()["status"] == "cancelled" def test_patch_task_status_cancelled_persisted(client): """После установки 'cancelled' повторный GET возвращает этот статус.""" client.patch("/api/tasks/P1-001", json={"status": "cancelled"}) r = client.get("/api/tasks/P1-001") assert r.status_code == 200 assert r.json()["status"] == "cancelled" def test_cancelled_in_valid_statuses(): """'cancelled' присутствует в VALID_TASK_STATUSES модели и в VALID_STATUSES API.""" from core import models import web.api as api_module assert "cancelled" in models.VALID_TASK_STATUSES assert "cancelled" in api_module.VALID_STATUSES # --------------------------------------------------------------------------- # KIN-036 — регрессионный тест: --allow-write всегда в команде через web API # --------------------------------------------------------------------------- def test_run_always_includes_allow_write_when_body_false(client): """Регрессионный тест KIN-036: --allow-write присутствует в команде, даже если allow_write=False в теле запроса. Баг: условие `if body and body.allow_write` не добавляло флаг при allow_write=False, что приводило к блокировке агента на 300 с.""" from unittest.mock import patch, MagicMock with patch("web.api.subprocess.Popen") as mock_popen: mock_proc = MagicMock() mock_proc.pid = 12345 mock_popen.return_value = mock_proc r = client.post("/api/tasks/P1-001/run", json={"allow_write": False}) assert r.status_code == 202 cmd = mock_popen.call_args[0][0] assert "--allow-write" in cmd, ( "--allow-write обязан присутствовать всегда: без него агент зависает " "при попытке записи, потому что stdin=DEVNULL и нет интерактивного подтверждения" ) def test_run_always_includes_allow_write_without_body(client): """Регрессионный тест KIN-036: --allow-write присутствует даже без тела запроса.""" from unittest.mock import patch, MagicMock with patch("web.api.subprocess.Popen") as mock_popen: mock_proc = MagicMock() mock_proc.pid = 12345 mock_popen.return_value = mock_proc r = client.post("/api/tasks/P1-001/run") assert r.status_code == 202 cmd = mock_popen.call_args[0][0] assert "--allow-write" in cmd def test_run_sets_kin_noninteractive_env(client): """Регрессионный тест KIN-036: KIN_NONINTERACTIVE=1 всегда устанавливается при запуске через web API, что вместе с --allow-write предотвращает зависание.""" from unittest.mock import patch, MagicMock with patch("web.api.subprocess.Popen") as mock_popen: mock_proc = MagicMock() mock_proc.pid = 99 mock_popen.return_value = mock_proc r = client.post("/api/tasks/P1-001/run") assert r.status_code == 202 call_kwargs = mock_popen.call_args[1] env = call_kwargs.get("env", {}) assert env.get("KIN_NONINTERACTIVE") == "1" def test_run_sets_stdin_devnull(client): """Регрессионный тест KIN-036: stdin=DEVNULL всегда устанавливается, что является причиной, по которой --allow-write обязателен.""" import subprocess as _subprocess from unittest.mock import patch, MagicMock with patch("web.api.subprocess.Popen") as mock_popen: mock_proc = MagicMock() mock_proc.pid = 42 mock_popen.return_value = mock_proc r = client.post("/api/tasks/P1-001/run") assert r.status_code == 202 call_kwargs = mock_popen.call_args[1] assert call_kwargs.get("stdin") == _subprocess.DEVNULL # --------------------------------------------------------------------------- # KIN-040 — регрессионные тесты: удаление TaskRun / allow_write из схемы # --------------------------------------------------------------------------- def test_run_kin_040_no_taskrun_class(): """Регрессионный тест KIN-040: класс TaskRun удалён из web/api.py. allow_write больше не является частью схемы эндпоинта /run.""" import web.api as api_module assert not hasattr(api_module, "TaskRun"), ( "Класс TaskRun должен быть удалён из web/api.py (KIN-040)" ) def test_run_kin_040_allow_write_true_ignored(client): """Регрессионный тест KIN-040: allow_write=True в теле игнорируется (не 422). Эндпоинт не имеет body-параметра, поэтому FastAPI не валидирует тело.""" r = client.post("/api/tasks/P1-001/run", json={"allow_write": True}) assert r.status_code == 202 # --------------------------------------------------------------------------- # KIN-058 — регрессионный тест: stderr=DEVNULL у Popen в web API # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # KIN-020 — manual_escalation задачи: PATCH status='done' резолвит задачу # --------------------------------------------------------------------------- def test_patch_manual_escalation_task_to_done(client): """PATCH status='done' на manual_escalation задаче — статус обновляется — KIN-020.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.create_task(conn, "P1-002", "p1", "Fix .dockerignore manually", brief={"task_type": "manual_escalation", "source": "followup:P1-001", "description": "Ручное применение .dockerignore"}) conn.close() r = client.patch("/api/tasks/P1-002", json={"status": "done"}) assert r.status_code == 200 assert r.json()["status"] == "done" def test_manual_escalation_task_brief_preserved_after_patch(client): """PATCH не затирает brief.task_type — поле manual_escalation сохраняется — KIN-020.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.create_task(conn, "P1-002", "p1", "Fix manually", brief={"task_type": "manual_escalation", "source": "followup:P1-001"}) conn.close() client.patch("/api/tasks/P1-002", json={"status": "done"}) r = client.get("/api/tasks/P1-002") assert r.status_code == 200 assert r.json()["brief"]["task_type"] == "manual_escalation" def test_run_sets_stderr_devnull(client): """Регрессионный тест KIN-058: stderr=DEVNULL всегда устанавливается в Popen, чтобы stderr дочернего процесса не загрязнял логи uvicorn.""" import subprocess as _subprocess from unittest.mock import patch, MagicMock with patch("web.api.subprocess.Popen") as mock_popen: mock_proc = MagicMock() mock_proc.pid = 77 mock_popen.return_value = mock_proc r = client.post("/api/tasks/P1-001/run") assert r.status_code == 202 call_kwargs = mock_popen.call_args[1] assert call_kwargs.get("stderr") == _subprocess.DEVNULL, ( "Регрессия KIN-058: stderr у Popen должен быть DEVNULL, " "иначе вывод агента попадает в логи uvicorn" ) # --------------------------------------------------------------------------- # KIN-065 — PATCH /api/projects/{id} — autocommit_enabled toggle # --------------------------------------------------------------------------- def test_patch_project_autocommit_enabled_true(client): """PATCH с autocommit_enabled=true → 200, поле установлено в 1.""" r = client.patch("/api/projects/p1", json={"autocommit_enabled": True}) assert r.status_code == 200 assert r.json()["autocommit_enabled"] == 1 def test_patch_project_autocommit_enabled_false(client): """После включения PATCH с autocommit_enabled=false → 200, поле установлено в 0.""" client.patch("/api/projects/p1", json={"autocommit_enabled": True}) r = client.patch("/api/projects/p1", json={"autocommit_enabled": False}) assert r.status_code == 200 assert r.json()["autocommit_enabled"] == 0 def test_patch_project_autocommit_persisted_via_sql(client): """После PATCH autocommit_enabled=True прямой SQL подтверждает значение 1.""" client.patch("/api/projects/p1", json={"autocommit_enabled": True}) from core.db import init_db conn = init_db(api_module.DB_PATH) row = conn.execute("SELECT autocommit_enabled FROM projects WHERE id = 'p1'").fetchone() conn.close() assert row is not None assert row[0] == 1 def test_patch_project_autocommit_false_persisted_via_sql(client): """После PATCH autocommit_enabled=False прямой SQL подтверждает значение 0.""" client.patch("/api/projects/p1", json={"autocommit_enabled": True}) client.patch("/api/projects/p1", json={"autocommit_enabled": False}) from core.db import init_db conn = init_db(api_module.DB_PATH) row = conn.execute("SELECT autocommit_enabled FROM projects WHERE id = 'p1'").fetchone() conn.close() assert row is not None assert row[0] == 0 def test_patch_project_autocommit_null_before_first_update(client): """Новый проект имеет autocommit_enabled=NULL/0 (falsy) до первого обновления.""" client.post("/api/projects", json={"id": "p_new", "name": "New", "path": "/new"}) from core.db import init_db conn = init_db(api_module.DB_PATH) row = conn.execute("SELECT autocommit_enabled FROM projects WHERE id = 'p_new'").fetchone() conn.close() assert row is not None assert not row[0] # DEFAULT 0 или NULL — в любом случае falsy def test_patch_project_empty_body_returns_400(client): """PATCH проекта без полей → 400.""" r = client.patch("/api/projects/p1", json={}) assert r.status_code == 400 def test_patch_project_not_found(client): """PATCH несуществующего проекта → 404.""" r = client.patch("/api/projects/NOPE", json={"autocommit_enabled": True}) assert r.status_code == 404 def test_patch_project_autocommit_and_execution_mode_together(client): """PATCH с autocommit_enabled и execution_mode → оба поля обновлены.""" r = client.patch("/api/projects/p1", json={ "autocommit_enabled": True, "execution_mode": "auto_complete", }) assert r.status_code == 200 data = r.json() assert data["autocommit_enabled"] == 1 assert data["execution_mode"] == "auto_complete" def test_patch_project_returns_full_project_object(client): """PATCH возвращает полный объект проекта с id, name и autocommit_enabled.""" r = client.patch("/api/projects/p1", json={"autocommit_enabled": True}) assert r.status_code == 200 data = r.json() assert data["id"] == "p1" assert data["name"] == "P1" assert "autocommit_enabled" in data # --------------------------------------------------------------------------- # KIN-008 — PATCH priority и route_type задачи # --------------------------------------------------------------------------- def test_patch_task_priority(client): """PATCH priority задачи обновляет поле и возвращает задачу.""" r = client.patch("/api/tasks/P1-001", json={"priority": 3}) assert r.status_code == 200 assert r.json()["priority"] == 3 def test_patch_task_priority_persisted(client): """После PATCH priority повторный GET возвращает новое значение.""" client.patch("/api/tasks/P1-001", json={"priority": 7}) r = client.get("/api/tasks/P1-001") assert r.status_code == 200 assert r.json()["priority"] == 7 def test_patch_task_priority_invalid_zero(client): """PATCH с priority=0 → 400.""" r = client.patch("/api/tasks/P1-001", json={"priority": 0}) assert r.status_code == 400 def test_patch_task_priority_invalid_eleven(client): """PATCH с priority=11 → 400.""" r = client.patch("/api/tasks/P1-001", json={"priority": 11}) assert r.status_code == 400 def test_patch_task_route_type_set(client): """PATCH route_type сохраняет значение в brief.""" r = client.patch("/api/tasks/P1-001", json={"route_type": "feature"}) assert r.status_code == 200 data = r.json() assert data["brief"]["route_type"] == "feature" def test_patch_task_route_type_all_valid(client): """Все допустимые route_type принимаются.""" for rt in ("debug", "feature", "refactor", "hotfix"): r = client.patch("/api/tasks/P1-001", json={"route_type": rt}) assert r.status_code == 200, f"route_type={rt} rejected" assert r.json()["brief"]["route_type"] == rt def test_patch_task_route_type_invalid(client): """Недопустимый route_type → 400.""" r = client.patch("/api/tasks/P1-001", json={"route_type": "unknown"}) assert r.status_code == 400 def test_patch_task_route_type_clear(client): """PATCH route_type='' очищает поле из brief.""" client.patch("/api/tasks/P1-001", json={"route_type": "debug"}) r = client.patch("/api/tasks/P1-001", json={"route_type": ""}) assert r.status_code == 200 data = r.json() brief = data.get("brief") if brief: assert "route_type" not in brief def test_patch_task_route_type_merges_brief(client): """route_type сохраняется вместе с другими полями brief без перезаписи.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", brief={"extra": "data"}) conn.close() r = client.patch("/api/tasks/P1-001", json={"route_type": "hotfix"}) assert r.status_code == 200 brief = r.json()["brief"] assert brief["route_type"] == "hotfix" assert brief["extra"] == "data" def test_patch_task_priority_and_route_type_together(client): """PATCH может обновить priority и route_type одновременно.""" r = client.patch("/api/tasks/P1-001", json={"priority": 2, "route_type": "refactor"}) assert r.status_code == 200 data = r.json() assert data["priority"] == 2 assert data["brief"]["route_type"] == "refactor" def test_patch_task_empty_body_still_returns_400(client): """Пустое тело по-прежнему возвращает 400 (регрессия KIN-008).""" r = client.patch("/api/tasks/P1-001", json={}) assert r.status_code == 400 # PATCH /api/tasks/{id} — редактирование title и brief_text (KIN-015) def test_patch_task_title(client): """PATCH title обновляет заголовок задачи.""" r = client.patch("/api/tasks/P1-001", json={"title": "Новый заголовок"}) assert r.status_code == 200 assert r.json()["title"] == "Новый заголовок" def test_patch_task_title_persisted(client): """PATCH title сохраняется в БД.""" client.patch("/api/tasks/P1-001", json={"title": "Персистентный заголовок"}) r = client.get("/api/tasks/P1-001") assert r.json()["title"] == "Персистентный заголовок" def test_patch_task_title_empty_returns_400(client): """Пустой title → 400.""" r = client.patch("/api/tasks/P1-001", json={"title": " "}) assert r.status_code == 400 def test_patch_task_brief_text(client): """PATCH brief_text сохраняется в brief.text.""" r = client.patch("/api/tasks/P1-001", json={"brief_text": "Описание задачи"}) assert r.status_code == 200 assert r.json()["brief"]["text"] == "Описание задачи" def test_patch_task_brief_text_persisted(client): """PATCH brief_text сохраняется в БД.""" client.patch("/api/tasks/P1-001", json={"brief_text": "Сохранённое описание"}) r = client.get("/api/tasks/P1-001") assert r.json()["brief"]["text"] == "Сохранённое описание" def test_patch_task_brief_text_merges_route_type(client): """brief_text не перезаписывает route_type в brief.""" client.patch("/api/tasks/P1-001", json={"route_type": "feature"}) client.patch("/api/tasks/P1-001", json={"brief_text": "Описание"}) r = client.get("/api/tasks/P1-001") brief = r.json()["brief"] assert brief["text"] == "Описание" assert brief["route_type"] == "feature" def test_patch_task_title_and_brief_text_together(client): """PATCH может обновить title и brief_text одновременно.""" r = client.patch("/api/tasks/P1-001", json={"title": "Совместное", "brief_text": "и описание"}) assert r.status_code == 200 data = r.json() assert data["title"] == "Совместное" assert data["brief"]["text"] == "и описание" # --------------------------------------------------------------------------- # KIN-049 — Deploy: миграция, PATCH deploy_command, POST /deploy # --------------------------------------------------------------------------- def test_deploy_command_column_exists_in_schema(client): """Миграция: PRAGMA table_info(projects) подтверждает наличие deploy_command (decision #74).""" from core.db import init_db conn = init_db(api_module.DB_PATH) cols = {row[1] for row in conn.execute("PRAGMA table_info(projects)").fetchall()} conn.close() assert "deploy_command" in cols def test_patch_project_deploy_command_persisted_via_sql(client): """PATCH с deploy_command сохраняется в БД — прямой SQL (decision #55).""" client.patch("/api/projects/p1", json={"deploy_command": "echo hello"}) from core.db import init_db conn = init_db(api_module.DB_PATH) row = conn.execute("SELECT deploy_command FROM projects WHERE id = 'p1'").fetchone() conn.close() assert row is not None assert row[0] == "echo hello" def test_patch_project_deploy_command_returned_in_response(client): """После PATCH ответ содержит обновлённый deploy_command.""" r = client.patch("/api/projects/p1", json={"deploy_command": "git push origin main"}) assert r.status_code == 200 assert r.json()["deploy_command"] == "git push origin main" def test_patch_project_deploy_command_empty_string_clears_to_null(client): """PATCH с пустой строкой очищает deploy_command → NULL (decision #68).""" client.patch("/api/projects/p1", json={"deploy_command": "echo hello"}) client.patch("/api/projects/p1", json={"deploy_command": ""}) from core.db import init_db conn = init_db(api_module.DB_PATH) row = conn.execute("SELECT deploy_command FROM projects WHERE id = 'p1'").fetchone() conn.close() assert row[0] is None def test_deploy_project_executes_command_returns_stdout(client): """POST /deploy — команда echo → stdout присутствует в ответе.""" from unittest.mock import patch, MagicMock client.patch("/api/projects/p1", json={"deploy_command": "echo deployed"}) mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "deployed\n" mock_result.stderr = "" with patch("web.api.subprocess.run", return_value=mock_result): r = client.post("/api/projects/p1/deploy") assert r.status_code == 200 data = r.json() assert data["success"] is True assert data["exit_code"] == 0 assert "deployed" in data["stdout"] assert "duration_seconds" in data def test_deploy_project_without_deploy_command_returns_400(client): """POST /deploy для проекта без deploy_command → 400.""" r = client.post("/api/projects/p1/deploy") assert r.status_code == 400 def test_deploy_project_not_found_returns_404(client): """POST /deploy для несуществующего проекта → 404.""" r = client.post("/api/projects/NOPE/deploy") assert r.status_code == 404 def test_deploy_project_failed_command_returns_success_false(client): """POST /deploy — ненулевой exit_code → success=False (команда выполнилась, но упала).""" from unittest.mock import patch, MagicMock client.patch("/api/projects/p1", json={"deploy_command": "exit 1"}) mock_result = MagicMock() mock_result.returncode = 1 mock_result.stdout = "" mock_result.stderr = "error occurred" with patch("web.api.subprocess.run", return_value=mock_result): r = client.post("/api/projects/p1/deploy") assert r.status_code == 200 data = r.json() assert data["success"] is False assert data["exit_code"] == 1 assert "error occurred" in data["stderr"] def test_deploy_project_timeout_returns_504(client): """POST /deploy — timeout → 504.""" from unittest.mock import patch import subprocess client.patch("/api/projects/p1", json={"deploy_command": "sleep 100"}) with patch("web.api.subprocess.run", side_effect=subprocess.TimeoutExpired("sleep 100", 60)): r = client.post("/api/projects/p1/deploy") assert r.status_code == 504 def test_task_full_includes_project_deploy_command(client): """GET /api/tasks/{id}/full включает project_deploy_command из таблицы projects.""" client.patch("/api/projects/p1", json={"deploy_command": "git push"}) r = client.get("/api/tasks/P1-001/full") assert r.status_code == 200 data = r.json() assert "project_deploy_command" in data assert data["project_deploy_command"] == "git push" def test_task_full_project_deploy_command_none_when_not_set(client): """GET /api/tasks/{id}/full возвращает project_deploy_command=None когда не задана.""" r = client.get("/api/tasks/P1-001/full") assert r.status_code == 200 data = r.json() assert "project_deploy_command" in data assert data["project_deploy_command"] is None # --------------------------------------------------------------------------- # KIN-067 — PATCH obsidian_vault_path + sync/obsidian не возвращает 400 # --------------------------------------------------------------------------- def test_patch_project_obsidian_vault_path_persisted_via_sql(client): """PATCH с obsidian_vault_path сохраняется в БД — прямой SQL.""" r = client.patch("/api/projects/p1", json={"obsidian_vault_path": "/tmp/vault"}) assert r.status_code == 200 from core.db import init_db conn = init_db(api_module.DB_PATH) row = conn.execute("SELECT obsidian_vault_path FROM projects WHERE id = 'p1'").fetchone() conn.close() assert row is not None assert row[0] == "/tmp/vault" def test_patch_project_obsidian_vault_path_returned_in_response(client): """PATCH возвращает обновлённый obsidian_vault_path в ответе.""" r = client.patch("/api/projects/p1", json={"obsidian_vault_path": "/my/vault"}) assert r.status_code == 200 assert r.json()["obsidian_vault_path"] == "/my/vault" def test_sync_obsidian_without_vault_path_returns_400(client): """POST sync/obsidian без сохранённого vault_path → 400 Bad Request.""" r = client.post("/api/projects/p1/sync/obsidian") assert r.status_code == 400 def test_sync_obsidian_after_patch_vault_path_not_400(client, tmp_path): """Сценарий бага KIN-067: сначала PATCH vault_path, затем sync → не 400. Раньше runSync() вызывал sync/obsidian без предварительного сохранения пути, что приводило к 400. После фикса PATCH вызывается первым. """ vault = tmp_path / "vault" vault.mkdir() # Шаг 1: сохранить vault_path через PATCH (как теперь делает runSync) r = client.patch("/api/projects/p1", json={"obsidian_vault_path": str(vault)}) assert r.status_code == 200 # Шаг 2: запустить синхронизацию — не должно вернуть 400 r = client.post("/api/projects/p1/sync/obsidian") assert r.status_code != 400, f"Ожидался не 400, получен {r.status_code}: {r.text}" assert r.status_code == 200 def test_sync_obsidian_after_patch_returns_sync_result_fields(client, tmp_path): """После PATCH vault_path синхронизация возвращает поля exported_decisions и tasks_updated.""" vault = tmp_path / "vault" vault.mkdir() client.patch("/api/projects/p1", json={"obsidian_vault_path": str(vault)}) r = client.post("/api/projects/p1/sync/obsidian") assert r.status_code == 200 data = r.json() assert "exported_decisions" in data # --------------------------------------------------------------------------- # KIN-016 — GET /api/notifications — эскалации от заблокированных агентов # --------------------------------------------------------------------------- def test_kin016_notifications_empty_when_no_blocked_tasks(client): """KIN-016: GET /api/notifications возвращает [] когда нет заблокированных задач.""" r = client.get("/api/notifications") assert r.status_code == 200 assert r.json() == [] def test_kin016_notifications_returns_blocked_task_as_escalation(client): """KIN-016: заблокированная задача появляется в /api/notifications с корректными полями.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task( conn, "P1-001", status="blocked", blocked_reason="cannot access external API", blocked_at="2026-03-16T10:00:00", blocked_agent_role="debugger", blocked_pipeline_step="1", ) conn.close() r = client.get("/api/notifications") assert r.status_code == 200 items = r.json() assert len(items) == 1 item = items[0] assert item["task_id"] == "P1-001" assert item["agent_role"] == "debugger" assert item["reason"] == "cannot access external API" assert item["pipeline_step"] == "1" assert item["blocked_at"] == "2026-03-16T10:00:00" def test_kin016_notifications_contains_project_id_and_title(client): """KIN-016: уведомление содержит project_id и title задачи.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task(conn, "P1-001", status="blocked", blocked_reason="out of scope", blocked_agent_role="architect") conn.close() r = client.get("/api/notifications") assert r.status_code == 200 item = r.json()[0] assert item["project_id"] == "p1" assert item["title"] == "Fix bug" def test_kin016_notifications_filters_by_project_id(client): """KIN-016: ?project_id= фильтрует уведомления по проекту.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) # Создаём второй проект с заблокированной задачей models.create_project(conn, "p2", "P2", "/p2") models.create_task(conn, "P2-001", "p2", "Another task") models.update_task(conn, "P1-001", status="blocked", blocked_reason="reason A", blocked_agent_role="debugger") models.update_task(conn, "P2-001", status="blocked", blocked_reason="reason B", blocked_agent_role="tester") conn.close() r = client.get("/api/notifications?project_id=p1") assert r.status_code == 200 items = r.json() assert all(i["project_id"] == "p1" for i in items) assert len(items) == 1 assert items[0]["task_id"] == "P1-001" def test_kin016_notifications_only_returns_blocked_status(client): """KIN-016: задачи в статусе pending/review/done НЕ попадают в уведомления.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) # Задача остаётся в pending (дефолт) assert models.get_task(conn, "P1-001")["status"] == "pending" conn.close() r = client.get("/api/notifications") assert r.status_code == 200 assert r.json() == [] def test_kin016_pipeline_blocked_agent_stops_next_steps_integration(client): """KIN-016: после blocked пайплайна задача блокируется, /api/notifications показывает её. Интеграционный тест: pipeline → blocked → /api/notifications содержит task. """ import json from unittest.mock import patch, MagicMock blocked_output = json.dumps({ "result": json.dumps({"status": "blocked", "reason": "no repo access"}), }) mock_proc = MagicMock() mock_proc.pid = 123 with patch("web.api.subprocess.Popen") as mock_popen: mock_popen.return_value = mock_proc r = client.post("/api/tasks/P1-001/run") assert r.status_code == 202 # Вручную помечаем задачу blocked (имитируем результат пайплайна) from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task( conn, "P1-001", status="blocked", blocked_reason="no repo access", blocked_agent_role="debugger", blocked_pipeline_step="1", ) conn.close() r = client.get("/api/notifications") assert r.status_code == 200 items = r.json() assert len(items) == 1 assert items[0]["task_id"] == "P1-001" assert items[0]["reason"] == "no repo access" assert items[0]["agent_role"] == "debugger" # --------------------------------------------------------------------------- # KIN-BIZ-001 — telegram_sent из БД (не заглушка) # --------------------------------------------------------------------------- def test_kin_biz_001_telegram_sent_not_stub(client): """Регрессия KIN-BIZ-001: /api/notifications возвращает реальный telegram_sent из БД, не False-заглушку.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task( conn, "P1-001", status="blocked", blocked_reason="cannot access repo", blocked_agent_role="debugger", ) models.mark_telegram_sent(conn, "P1-001") conn.close() r = client.get("/api/notifications") assert r.status_code == 200 items = r.json() assert len(items) == 1 # Ключевая проверка: telegram_sent должен быть True из БД, не False-заглушка assert items[0]["telegram_sent"] is True def test_kin_biz_001_notifications_telegram_sent_false_when_not_sent(client): """KIN-BIZ-001: telegram_sent=False для задачи, где уведомление не отправлялось.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.update_task( conn, "P1-001", status="blocked", blocked_reason="no access", blocked_agent_role="tester", ) # Не вызываем mark_telegram_sent → telegram_sent остаётся 0 conn.close() r = client.get("/api/notifications") assert r.status_code == 200 items = r.json() assert len(items) == 1 assert items[0]["telegram_sent"] is False def test_kin_biz_001_telegram_sent_distinguishes_sent_and_not_sent(client): """KIN-BIZ-001: список уведомлений корректно различает sent/not-sent задачи.""" from core.db import init_db from core import models conn = init_db(api_module.DB_PATH) models.create_task(conn, "P1-002", "p1", "Another task") models.update_task( conn, "P1-001", status="blocked", blocked_reason="reason 1", blocked_agent_role="debugger", ) models.update_task( conn, "P1-002", status="blocked", blocked_reason="reason 2", blocked_agent_role="tester", ) # Telegram отправлен только для P1-001 models.mark_telegram_sent(conn, "P1-001") conn.close() r = client.get("/api/notifications") assert r.status_code == 200 items = r.json() assert len(items) == 2 by_id = {item["task_id"]: item for item in items} assert by_id["P1-001"]["telegram_sent"] is True assert by_id["P1-002"]["telegram_sent"] is False # --------------------------------------------------------------------------- # KIN-071: project_type и SSH-поля в API # --------------------------------------------------------------------------- def test_create_operations_project_with_ssh_fields(client): """KIN-071: POST /api/projects с project_type=operations и SSH-полями возвращает 200.""" r = client.post("/api/projects", json={ "id": "srv1", "name": "My Server", "project_type": "operations", "ssh_host": "10.0.0.1", "ssh_user": "root", "ssh_key_path": "~/.ssh/id_rsa", "ssh_proxy_jump": "jumpt", }) assert r.status_code == 200 data = r.json() assert data["project_type"] == "operations" assert data["path"] is None assert data["ssh_host"] == "10.0.0.1" assert data["ssh_user"] == "root" assert data["ssh_key_path"] == "~/.ssh/id_rsa" assert data["ssh_proxy_jump"] == "jumpt" def test_create_project_invalid_type_returns_400(client): """KIN-071: POST /api/projects с недопустимым project_type → 400.""" r = client.post("/api/projects", json={ "id": "bad", "name": "Bad", "path": "/bad", "project_type": "legacy", }) assert r.status_code == 400 def test_patch_project_invalid_type_returns_400(client): """KIN-071: PATCH /api/projects/{id} с недопустимым project_type → 400.""" r = client.patch("/api/projects/p1", json={"project_type": "invalid_type"}) assert r.status_code == 400 def test_create_operations_project_without_ssh_host_allowed(client): """Регрессионный тест KIN-ARCH-001: воспроизводит СЛОМАННОЕ поведение до фикса. До фикса: POST operations-проекта без ssh_host возвращал 200. После фикса: должен возвращать 422 (Pydantic model_validator). Этот тест НАМЕРЕННО проверяет, что старое поведение больше не существует. """ r = client.post("/api/projects", json={ "id": "srv2", "name": "Server No SSH", "project_type": "operations", }) # Фикс KIN-ARCH-001: был 200, стал 422 assert r.status_code == 422, ( "Регрессия KIN-ARCH-001: POST operations-проекта без ssh_host " "должен возвращать 422, а не 200" ) # --------------------------------------------------------------------------- # KIN-ARCH-001 — серверная валидация ssh_host для operations-проектов # --------------------------------------------------------------------------- def test_kin_arch_001_operations_without_ssh_host_returns_422(client): """Регрессионный тест KIN-ARCH-001: POST /api/projects с project_type='operations' и без ssh_host → 422 Unprocessable Entity.""" r = client.post("/api/projects", json={ "id": "ops_no_ssh", "name": "Ops Without SSH", "project_type": "operations", }) assert r.status_code == 422 def test_kin_arch_001_operations_with_empty_ssh_host_returns_422(client): """Регрессионный тест KIN-ARCH-001: пустая строка в ssh_host считается отсутствующим значением → 422.""" r = client.post("/api/projects", json={ "id": "ops_empty_ssh", "name": "Ops Empty SSH", "project_type": "operations", "ssh_host": "", }) assert r.status_code == 422 def test_kin_arch_001_operations_with_valid_ssh_host_returns_200(client): """Регрессионный тест KIN-ARCH-001: POST /api/projects с project_type='operations' и корректным ssh_host → 200, проект создаётся.""" r = client.post("/api/projects", json={ "id": "ops_with_ssh", "name": "Ops With SSH", "project_type": "operations", "ssh_host": "10.0.0.42", }) assert r.status_code == 200 data = r.json() assert data["project_type"] == "operations" assert data["path"] is None assert data["ssh_host"] == "10.0.0.42" def test_kin_arch_001_development_without_ssh_host_allowed(client): """Регрессионный тест KIN-ARCH-001: project_type='development' без ssh_host должен создаваться без ошибок — валидатор срабатывает только для operations.""" r = client.post("/api/projects", json={ "id": "dev_no_ssh", "name": "Dev No SSH", "path": "/dev", "project_type": "development", }) assert r.status_code == 200 assert r.json()["project_type"] == "development" def test_kin_arch_001_research_without_ssh_host_allowed(client): """Регрессионный тест KIN-ARCH-001: project_type='research' без ssh_host должен создаваться без ошибок.""" r = client.post("/api/projects", json={ "id": "res_no_ssh", "name": "Research No SSH", "path": "/research", "project_type": "research", }) assert r.status_code == 200 assert r.json()["project_type"] == "research" def test_kin_arch_001_422_error_message_mentions_ssh_host(client): """Регрессионный тест KIN-ARCH-001: тело 422-ответа содержит сообщение об ошибке с упоминанием ssh_host.""" r = client.post("/api/projects", json={ "id": "ops_err_msg", "name": "Check Error Message", "project_type": "operations", }) assert r.status_code == 422 body = r.json() # Pydantic возвращает detail со списком ошибок detail_str = str(body) assert "ssh_host" in detail_str def test_create_research_project_type_accepted(client): """KIN-071: project_type=research принимается API.""" r = client.post("/api/projects", json={ "id": "res1", "name": "Research Project", "path": "/research", "project_type": "research", }) assert r.status_code == 200 assert r.json()["project_type"] == "research" # --------------------------------------------------------------------------- # KIN-ARCH-003 — path nullable для operations-проектов # Исправляет баг: workaround с пустой строкой ("") для operations-проектов # --------------------------------------------------------------------------- def test_kin_arch_003_operations_project_without_path_returns_200(client): """KIN-ARCH-003: POST /api/projects с project_type='operations' без path → 200. До фикса: path="" передавался как workaround для NOT NULL constraint. После фикса: path не передаётся вовсе, сохраняется как NULL. """ r = client.post("/api/projects", json={ "id": "ops_null_path", "name": "Ops Null Path", "project_type": "operations", "ssh_host": "10.0.0.1", }) assert r.status_code == 200 data = r.json() assert data["path"] is None, ( "KIN-ARCH-003 регрессия: path должен быть NULL, а не пустой строкой" ) def test_kin_arch_003_development_project_without_path_returns_422(client): """KIN-ARCH-003: POST /api/projects с project_type='development' без path → 422. Pydantic validate_fields: path обязателен для non-operations проектов. """ r = client.post("/api/projects", json={ "id": "dev_no_path", "name": "Dev No Path", "project_type": "development", }) assert r.status_code == 422 def test_kin_arch_003_development_without_path_error_mentions_path(client): """KIN-ARCH-003: тело 422-ответа содержит упоминание об обязательности path.""" r = client.post("/api/projects", json={ "id": "dev_no_path_msg", "name": "Dev No Path Msg", "project_type": "development", }) assert r.status_code == 422 detail_str = str(r.json()) assert "path" in detail_str def test_kin_arch_003_deploy_operations_project_null_path_uses_cwd_none(client): """KIN-ARCH-003: deploy_project для operations-проекта с path=NULL не вызывает Path.exists() — передаёт cwd=None в subprocess.run.""" from unittest.mock import patch, MagicMock client.post("/api/projects", json={ "id": "ops_deploy_null", "name": "Ops Deploy Null Path", "project_type": "operations", "ssh_host": "10.0.0.1", }) client.patch("/api/projects/ops_deploy_null", json={"deploy_command": "echo ok"}) mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "ok\n" mock_result.stderr = "" with patch("subprocess.run", return_value=mock_result) as mock_run: r = client.post("/api/projects/ops_deploy_null/deploy") assert r.status_code == 200 call_kwargs = mock_run.call_args.kwargs assert call_kwargs.get("cwd") is None, ( "KIN-ARCH-003: для operations-проектов без path, cwd должен быть None" ) # --------------------------------------------------------------------------- # Bootstrap endpoint — KIN-081 # --------------------------------------------------------------------------- @pytest.fixture def bootstrap_client(tmp_path): """TestClient без seed-данных, с отдельным DB_PATH.""" db_path = tmp_path / "bs_test.db" api_module.DB_PATH = db_path from web.api import app return TestClient(app), tmp_path def test_bootstrap_endpoint_invalid_path_returns_400(bootstrap_client): """KIN-081: bootstrap возвращает 400 если путь не существует.""" client, _ = bootstrap_client r = client.post("/api/bootstrap", json={ "id": "newproj", "name": "New Project", "path": "/nonexistent/path/that/does/not/exist" }) assert r.status_code == 400 assert "not a directory" in r.json()["detail"].lower() def test_bootstrap_endpoint_duplicate_id_returns_409(bootstrap_client, tmp_path): """KIN-081: bootstrap возвращает 409 если проект с таким ID уже существует.""" client, _ = bootstrap_client proj_dir = tmp_path / "myproj" proj_dir.mkdir() # Create project first client.post("/api/projects", json={"id": "existing", "name": "Existing", "path": str(proj_dir)}) # Try bootstrap with same ID r = client.post("/api/bootstrap", json={ "id": "existing", "name": "Same ID", "path": str(proj_dir) }) assert r.status_code == 409 assert "already exists" in r.json()["detail"] def test_bootstrap_endpoint_rollback_on_save_error(bootstrap_client, tmp_path): """KIN-081: при ошибке в save_to_db проект удаляется (rollback), возвращается 500.""" client, _ = bootstrap_client proj_dir = tmp_path / "rollbackproj" proj_dir.mkdir() from core.db import init_db from core import models as _models def _save_create_then_fail(conn, project_id, name, path, *args, **kwargs): # Simulate partial write: project row created, then error _models.create_project(conn, project_id, name, path) raise RuntimeError("simulated DB error after project created") with patch("web.api.save_to_db", side_effect=_save_create_then_fail): r = client.post("/api/bootstrap", json={ "id": "rollbackproj", "name": "Rollback Test", "path": str(proj_dir) }) assert r.status_code == 500 assert "Bootstrap failed" in r.json()["detail"] # Project must NOT remain in DB (rollback was executed) conn = init_db(api_module.DB_PATH) assert _models.get_project(conn, "rollbackproj") is None conn.close() def test_bootstrap_endpoint_success(bootstrap_client, tmp_path): """KIN-081: успешный bootstrap возвращает 200 с project и counts.""" client, _ = bootstrap_client proj_dir = tmp_path / "goodproj" proj_dir.mkdir() (proj_dir / "requirements.txt").write_text("fastapi\n") with patch("web.api.find_vault_root", return_value=None): r = client.post("/api/bootstrap", json={ "id": "goodproj", "name": "Good Project", "path": str(proj_dir) }) assert r.status_code == 200 data = r.json() assert data["project"]["id"] == "goodproj" assert "modules_count" in data assert "decisions_count" in data def test_delete_project_ok(client): # Create a separate project to delete r = client.post("/api/projects", json={"id": "del1", "name": "Del1", "path": "/del1"}) assert r.status_code == 200 r = client.delete("/api/projects/del1") assert r.status_code == 204 assert r.content == b"" # Verify project is gone r = client.get("/api/projects/del1") assert r.status_code == 404 def test_delete_project_not_found(client): r = client.delete("/api/projects/99999") assert r.status_code == 404 assert "tasks_count" in data