kin/tests/test_api.py

2444 lines
96 KiB
Python
Raw Normal View History

"""Tests for web/api.py — new task endpoints (pipeline, approve, reject, full)."""
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
# Patch DB_PATH before importing app
import web.api as api_module
@pytest.fixture
def client(tmp_path):
db_path = tmp_path / "test.db"
api_module.DB_PATH = db_path
from web.api import app
c = TestClient(app)
# Seed data
c.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"})
c.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"})
return c
def test_get_task(client):
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
assert r.json()["title"] == "Fix bug"
def test_get_task_not_found(client):
r = client.get("/api/tasks/NOPE")
assert r.status_code == 404
def test_task_pipeline_empty(client):
r = client.get("/api/tasks/P1-001/pipeline")
assert r.status_code == 200
assert r.json() == []
def test_task_pipeline_with_logs(client):
# Insert agent logs directly
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.log_agent_run(conn, "p1", "debugger", "execute",
task_id="P1-001", output_summary="Found bug",
tokens_used=1000, duration_seconds=5, success=True)
models.log_agent_run(conn, "p1", "tester", "execute",
task_id="P1-001", output_summary="Tests pass",
tokens_used=500, duration_seconds=3, success=True)
conn.close()
r = client.get("/api/tasks/P1-001/pipeline")
assert r.status_code == 200
steps = r.json()
assert len(steps) == 2
assert steps[0]["agent_role"] == "debugger"
assert steps[0]["output_summary"] == "Found bug"
assert steps[1]["agent_role"] == "tester"
def test_task_full(client):
r = client.get("/api/tasks/P1-001/full")
assert r.status_code == 200
data = r.json()
assert data["id"] == "P1-001"
assert "pipeline_steps" in data
assert "related_decisions" in data
def test_task_full_not_found(client):
r = client.get("/api/tasks/NOPE/full")
assert r.status_code == 404
def test_approve_task(client):
# First set task to review
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", status="review")
conn.close()
r = client.post("/api/tasks/P1-001/approve", json={})
assert r.status_code == 200
assert r.json()["status"] == "done"
# Verify task is done
r = client.get("/api/tasks/P1-001")
assert r.json()["status"] == "done"
def test_approve_with_decision(client):
r = client.post("/api/tasks/P1-001/approve", json={
"decision_title": "Use AbortController",
"decision_description": "Fix race condition with AbortController",
"decision_type": "decision",
})
assert r.status_code == 200
assert r.json()["decision"] is not None
assert r.json()["decision"]["title"] == "Use AbortController"
def test_approve_not_found(client):
r = client.post("/api/tasks/NOPE/approve", json={})
assert r.status_code == 404
def test_approve_fires_task_done_hooks(client):
"""Ручной апрув задачи должен вызывать хуки с event='task_done'."""
from unittest.mock import patch
with patch("core.hooks.run_hooks") as mock_hooks:
mock_hooks.return_value = []
r = client.post("/api/tasks/P1-001/approve", json={})
assert r.status_code == 200
events_fired = [call[1].get("event") or call[0][3]
for call in mock_hooks.call_args_list]
assert "task_done" in events_fired
def test_reject_task(client):
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", status="review")
conn.close()
r = client.post("/api/tasks/P1-001/reject", json={
"reason": "Didn't fix the root cause"
})
assert r.status_code == 200
assert r.json()["status"] == "pending"
# Verify task is pending with review reason
r = client.get("/api/tasks/P1-001")
data = r.json()
assert data["status"] == "pending"
assert data["review"]["rejected"] == "Didn't fix the root cause"
def test_reject_not_found(client):
r = client.post("/api/tasks/NOPE/reject", json={"reason": "bad"})
assert r.status_code == 404
def test_revise_task(client):
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", status="review")
conn.close()
r = client.post("/api/tasks/P1-001/revise", json={
"comment": "Доисследуй edge case с пустым массивом"
})
assert r.status_code == 200
assert r.json()["status"] == "in_progress"
# Verify task is in_progress with revise_comment stored
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT status, revise_comment FROM tasks WHERE id = 'P1-001'").fetchone()
conn.close()
assert row["status"] == "in_progress"
assert row["revise_comment"] == "Доисследуй edge case с пустым массивом"
def test_revise_not_found(client):
r = client.post("/api/tasks/NOPE/revise", json={"comment": "fix it"})
assert r.status_code == 404
def test_revise_task_response_includes_comment(client):
"""Ответ /revise содержит поле comment с переданным текстом."""
r = client.post("/api/tasks/P1-001/revise", json={"comment": "Уточни требования"})
assert r.status_code == 200
assert r.json()["comment"] == "Уточни требования"
def test_revise_task_missing_comment_returns_422(client):
"""Запрос /revise без поля comment → 422 Unprocessable Entity (Pydantic validation)."""
r = client.post("/api/tasks/P1-001/revise", json={})
assert r.status_code == 422
def test_task_pipeline_not_found(client):
r = client.get("/api/tasks/NOPE/pipeline")
assert r.status_code == 404
def test_running_endpoint_no_pipeline(client):
r = client.get("/api/tasks/P1-001/running")
assert r.status_code == 200
assert r.json()["running"] is False
def test_running_endpoint_with_pipeline(client):
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.create_pipeline(conn, "P1-001", "p1", "debug",
[{"role": "debugger"}])
conn.close()
r = client.get("/api/tasks/P1-001/running")
assert r.status_code == 200
assert r.json()["running"] is True
def test_running_endpoint_not_found(client):
r = client.get("/api/tasks/NOPE/running")
assert r.status_code == 404
def test_run_sets_in_progress(client):
"""POST /run should set task to in_progress immediately."""
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 202
r = client.get("/api/tasks/P1-001")
assert r.json()["status"] == "in_progress"
def test_run_not_found(client):
r = client.post("/api/tasks/NOPE/run")
assert r.status_code == 404
def test_run_returns_503_when_claude_not_authenticated(client):
"""KIN-083: /run возвращает 503 с claude_auth_required если claude не залогинен."""
from agents.runner import ClaudeAuthError
with patch("agents.runner.check_claude_auth", side_effect=ClaudeAuthError("Claude CLI requires login. Run: claude login")):
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 503
body = r.json()
assert body["detail"]["error"] == "claude_auth_required"
assert body["detail"]["instructions"] == "Run: claude login"
assert "login" in body["detail"]["message"].lower()
def test_start_phase_returns_503_when_claude_not_authenticated(client):
"""KIN-083: /phases/start возвращает 503 с claude_auth_required если claude не залогинен."""
from agents.runner import ClaudeAuthError
with patch("agents.runner.check_claude_auth", side_effect=ClaudeAuthError("Claude CLI requires login. Run: claude login")):
r = client.post("/api/projects/p1/phases/start")
assert r.status_code == 503
body = r.json()
assert body["detail"]["error"] == "claude_auth_required"
assert body["detail"]["instructions"] == "Run: claude login"
assert "login" in body["detail"]["message"].lower()
def test_run_kin_038_without_allow_write(client):
"""Регрессионный тест KIN-038: allow_write удалён из схемы,
эндпоинт принимает запросы с пустым телом без этого параметра."""
r = client.post("/api/tasks/P1-001/run", json={})
assert r.status_code == 202
def test_run_with_empty_body(client):
"""POST /run with empty JSON body should be accepted."""
r = client.post("/api/tasks/P1-001/run", json={})
assert r.status_code == 202
def test_run_without_body(client):
"""POST /run without body should be backwards-compatible."""
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 202
def test_project_summary_includes_review(client):
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", status="review")
conn.close()
r = client.get("/api/projects")
projects = r.json()
assert projects[0]["review_tasks"] == 1
def test_audit_not_found(client):
r = client.post("/api/projects/NOPE/audit")
assert r.status_code == 404
def test_audit_apply(client):
"""POST /audit/apply should mark tasks as done."""
r = client.post("/api/projects/p1/audit/apply",
json={"task_ids": ["P1-001"]})
assert r.status_code == 200
assert r.json()["count"] == 1
assert "P1-001" in r.json()["updated"]
# Verify task is done
r = client.get("/api/tasks/P1-001")
assert r.json()["status"] == "done"
def test_audit_apply_not_found(client):
r = client.post("/api/projects/NOPE/audit/apply",
json={"task_ids": ["P1-001"]})
assert r.status_code == 404
def test_audit_apply_wrong_project(client):
"""Tasks not belonging to the project should be skipped."""
r = client.post("/api/projects/p1/audit/apply",
json={"task_ids": ["WRONG-001"]})
assert r.status_code == 200
assert r.json()["count"] == 0
# ---------------------------------------------------------------------------
# PATCH /api/tasks/{task_id} — смена статуса
# ---------------------------------------------------------------------------
def test_patch_task_status(client):
"""PATCH должен обновить статус и вернуть задачу."""
r = client.patch("/api/tasks/P1-001", json={"status": "review"})
assert r.status_code == 200
data = r.json()
assert data["status"] == "review"
assert data["id"] == "P1-001"
def test_patch_task_status_persisted(client):
"""После PATCH повторный GET должен возвращать новый статус."""
client.patch("/api/tasks/P1-001", json={"status": "blocked"})
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
assert r.json()["status"] == "blocked"
@pytest.mark.parametrize("status", ["pending", "in_progress", "review", "done", "blocked", "decomposed", "cancelled"])
def test_patch_task_all_valid_statuses(client, status):
"""Все 7 допустимых статусов должны приниматься (включая decomposed)."""
r = client.patch("/api/tasks/P1-001", json={"status": status})
assert r.status_code == 200
assert r.json()["status"] == status
def test_patch_task_status_decomposed(client):
"""Регрессионный тест KIN-033: API принимает статус 'decomposed'."""
r = client.patch("/api/tasks/P1-001", json={"status": "decomposed"})
assert r.status_code == 200
assert r.json()["status"] == "decomposed"
def test_patch_task_status_decomposed_persisted(client):
"""После установки 'decomposed' повторный GET возвращает этот статус."""
client.patch("/api/tasks/P1-001", json={"status": "decomposed"})
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
assert r.json()["status"] == "decomposed"
# ---------------------------------------------------------------------------
# KIN-033 — единый источник истины для статусов
# ---------------------------------------------------------------------------
def test_api_valid_statuses_match_models():
"""API использует models.VALID_TASK_STATUSES как единственный источник истины."""
from core import models
import web.api as api_module
assert api_module.VALID_STATUSES == set(models.VALID_TASK_STATUSES)
def test_cli_valid_statuses_match_models():
"""CLI использует models.VALID_TASK_STATUSES как единственный источник истины."""
from core import models
from cli.main import task_update
status_param = next(p for p in task_update.params if p.name == "status")
cli_choices = set(status_param.type.choices)
assert cli_choices == set(models.VALID_TASK_STATUSES)
def test_cli_and_api_statuses_are_identical():
"""Список статусов в CLI и API идентичен."""
from core import models
import web.api as api_module
from cli.main import task_update
status_param = next(p for p in task_update.params if p.name == "status")
cli_choices = set(status_param.type.choices)
assert cli_choices == api_module.VALID_STATUSES
assert "decomposed" in cli_choices
assert "decomposed" in api_module.VALID_STATUSES
def test_patch_task_invalid_status(client):
"""Недопустимый статус → 400."""
r = client.patch("/api/tasks/P1-001", json={"status": "flying"})
assert r.status_code == 400
def test_patch_task_not_found(client):
"""Несуществующая задача → 404."""
r = client.patch("/api/tasks/NOPE-999", json={"status": "done"})
assert r.status_code == 404
def test_patch_task_empty_body_returns_400(client):
"""PATCH с пустым телом (нет status и нет execution_mode) → 400."""
r = client.patch("/api/tasks/P1-001", json={})
assert r.status_code == 400
def test_patch_task_execution_mode_auto_complete_accepted(client):
"""KIN-063: execution_mode='auto_complete' принимается (200)."""
r = client.patch("/api/tasks/P1-001", json={"execution_mode": "auto_complete"})
assert r.status_code == 200
assert r.json()["execution_mode"] == "auto_complete"
def test_patch_task_execution_mode_auto_rejected(client):
"""KIN-063: старое значение 'auto' должно отклоняться (400) — Decision #29."""
r = client.patch("/api/tasks/P1-001", json={"execution_mode": "auto"})
assert r.status_code == 400
def test_patch_task_execution_mode_review_accepted(client):
"""KIN-074: execution_mode='review' принимается (200) — регрессия после фикса frontend."""
r = client.patch("/api/tasks/P1-001", json={"execution_mode": "review"})
assert r.status_code == 200
assert r.json()["execution_mode"] == "review"
# ---------------------------------------------------------------------------
# KIN-022 — blocked_reason: регрессионные тесты
# ---------------------------------------------------------------------------
def test_blocked_reason_saved_and_returned(client):
"""При переходе в blocked с blocked_reason поле сохраняется и отдаётся в GET."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", status="blocked",
blocked_reason="Step 1/2 (debugger) failed")
conn.close()
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
data = r.json()
assert data["status"] == "blocked"
assert data["blocked_reason"] == "Step 1/2 (debugger) failed"
def test_blocked_reason_present_in_full(client):
"""blocked_reason также присутствует в /full эндпоинте."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", status="blocked",
blocked_reason="tester agent crashed")
conn.close()
r = client.get("/api/tasks/P1-001/full")
assert r.status_code == 200
data = r.json()
assert data["status"] == "blocked"
assert data["blocked_reason"] == "tester agent crashed"
def test_blocked_reason_none_by_default(client):
"""Новая задача не имеет blocked_reason."""
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
data = r.json()
assert data["blocked_reason"] is None
def test_blocked_without_reason_allowed(client):
"""Переход в blocked без причины допустим (reason=None)."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", status="blocked")
conn.close()
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
data = r.json()
assert data["status"] == "blocked"
assert data["blocked_reason"] is None
def test_blocked_reason_cleared_on_retry(client):
"""При повторном запуске (статус pending) blocked_reason сбрасывается."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", status="blocked",
blocked_reason="failed once")
models.update_task(conn, "P1-001", status="pending", blocked_reason=None)
conn.close()
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
data = r.json()
assert data["status"] == "pending"
assert data["blocked_reason"] is None
# ---------------------------------------------------------------------------
# KIN-029 — DELETE /api/projects/{project_id}/decisions/{decision_id}
# ---------------------------------------------------------------------------
def test_delete_decision_ok(client):
"""Создаём decision через POST, удаляем DELETE → 200 с телом {"deleted": id}."""
r = client.post("/api/decisions", json={
"project_id": "p1",
"type": "decision",
"title": "Use SQLite",
"description": "Chosen for simplicity",
})
assert r.status_code == 200
decision_id = r.json()["id"]
r = client.delete(f"/api/projects/p1/decisions/{decision_id}")
assert r.status_code == 200
assert r.json() == {"deleted": decision_id}
r = client.get("/api/decisions?project=p1")
assert r.status_code == 200
ids = [d["id"] for d in r.json()]
assert decision_id not in ids
def test_delete_decision_not_found(client):
"""DELETE несуществующего decision → 404."""
r = client.delete("/api/projects/p1/decisions/99999")
assert r.status_code == 404
def test_delete_decision_wrong_project(client):
"""DELETE decision с чужим project_id → 404 (не раскрываем существование)."""
r = client.post("/api/decisions", json={
"project_id": "p1",
"type": "decision",
"title": "Cross-project check",
"description": "Should not be deletable from p2",
})
assert r.status_code == 200
decision_id = r.json()["id"]
r = client.delete(f"/api/projects/p2/decisions/{decision_id}")
assert r.status_code == 404
# Decision должен остаться нетронутым
r = client.get("/api/decisions?project=p1")
ids = [d["id"] for d in r.json()]
assert decision_id in ids
# ---------------------------------------------------------------------------
# KIN-035 — регрессионный тест: смена статуса на cancelled
# ---------------------------------------------------------------------------
def test_patch_task_status_cancelled(client):
"""Регрессионный тест KIN-035: PATCH /api/tasks/{id} с status='cancelled' → 200."""
r = client.patch("/api/tasks/P1-001", json={"status": "cancelled"})
assert r.status_code == 200
assert r.json()["status"] == "cancelled"
def test_patch_task_status_cancelled_persisted(client):
"""После установки 'cancelled' повторный GET возвращает этот статус."""
client.patch("/api/tasks/P1-001", json={"status": "cancelled"})
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
assert r.json()["status"] == "cancelled"
def test_cancelled_in_valid_statuses():
"""'cancelled' присутствует в VALID_TASK_STATUSES модели и в VALID_STATUSES API."""
from core import models
import web.api as api_module
assert "cancelled" in models.VALID_TASK_STATUSES
assert "cancelled" in api_module.VALID_STATUSES
# ---------------------------------------------------------------------------
# KIN-036 — регрессионный тест: --allow-write всегда в команде через web API
# ---------------------------------------------------------------------------
def test_run_always_includes_allow_write_when_body_false(client):
"""Регрессионный тест KIN-036: --allow-write присутствует в команде,
даже если allow_write=False в теле запроса.
Баг: условие `if body and body.allow_write` не добавляло флаг при
allow_write=False, что приводило к блокировке агента на 300 с."""
from unittest.mock import patch, MagicMock
with patch("web.api.subprocess.Popen") as mock_popen:
mock_proc = MagicMock()
mock_proc.pid = 12345
mock_popen.return_value = mock_proc
r = client.post("/api/tasks/P1-001/run", json={"allow_write": False})
assert r.status_code == 202
cmd = mock_popen.call_args[0][0]
assert "--allow-write" in cmd, (
"--allow-write обязан присутствовать всегда: без него агент зависает "
"при попытке записи, потому что stdin=DEVNULL и нет интерактивного подтверждения"
)
def test_run_always_includes_allow_write_without_body(client):
"""Регрессионный тест KIN-036: --allow-write присутствует даже без тела запроса."""
from unittest.mock import patch, MagicMock
with patch("web.api.subprocess.Popen") as mock_popen:
mock_proc = MagicMock()
mock_proc.pid = 12345
mock_popen.return_value = mock_proc
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 202
cmd = mock_popen.call_args[0][0]
assert "--allow-write" in cmd
def test_run_sets_kin_noninteractive_env(client):
"""Регрессионный тест KIN-036: KIN_NONINTERACTIVE=1 всегда устанавливается
при запуске через web API, что вместе с --allow-write предотвращает зависание."""
from unittest.mock import patch, MagicMock
with patch("web.api.subprocess.Popen") as mock_popen:
mock_proc = MagicMock()
mock_proc.pid = 99
mock_popen.return_value = mock_proc
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 202
call_kwargs = mock_popen.call_args[1]
env = call_kwargs.get("env", {})
assert env.get("KIN_NONINTERACTIVE") == "1"
def test_run_sets_stdin_devnull(client):
"""Регрессионный тест KIN-036: stdin=DEVNULL всегда устанавливается,
что является причиной, по которой --allow-write обязателен."""
import subprocess as _subprocess
from unittest.mock import patch, MagicMock
with patch("web.api.subprocess.Popen") as mock_popen:
mock_proc = MagicMock()
mock_proc.pid = 42
mock_popen.return_value = mock_proc
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 202
call_kwargs = mock_popen.call_args[1]
assert call_kwargs.get("stdin") == _subprocess.DEVNULL
# ---------------------------------------------------------------------------
# KIN-040 — регрессионные тесты: удаление TaskRun / allow_write из схемы
# ---------------------------------------------------------------------------
def test_run_kin_040_no_taskrun_class():
"""Регрессионный тест KIN-040: класс TaskRun удалён из web/api.py.
allow_write больше не является частью схемы эндпоинта /run."""
import web.api as api_module
assert not hasattr(api_module, "TaskRun"), (
"Класс TaskRun должен быть удалён из web/api.py (KIN-040)"
)
def test_run_kin_040_allow_write_true_ignored(client):
"""Регрессионный тест KIN-040: allow_write=True в теле игнорируется (не 422).
Эндпоинт не имеет body-параметра, поэтому FastAPI не валидирует тело."""
r = client.post("/api/tasks/P1-001/run", json={"allow_write": True})
assert r.status_code == 202
# ---------------------------------------------------------------------------
# KIN-058 — регрессионный тест: stderr=DEVNULL у Popen в web API
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# KIN-020 — manual_escalation задачи: PATCH status='done' резолвит задачу
# ---------------------------------------------------------------------------
def test_patch_manual_escalation_task_to_done(client):
"""PATCH status='done' на manual_escalation задаче — статус обновляется — KIN-020."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.create_task(conn, "P1-002", "p1", "Fix .dockerignore manually",
brief={"task_type": "manual_escalation",
"source": "followup:P1-001",
"description": "Ручное применение .dockerignore"})
conn.close()
r = client.patch("/api/tasks/P1-002", json={"status": "done"})
assert r.status_code == 200
assert r.json()["status"] == "done"
def test_manual_escalation_task_brief_preserved_after_patch(client):
"""PATCH не затирает brief.task_type — поле manual_escalation сохраняется — KIN-020."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.create_task(conn, "P1-002", "p1", "Fix manually",
brief={"task_type": "manual_escalation",
"source": "followup:P1-001"})
conn.close()
client.patch("/api/tasks/P1-002", json={"status": "done"})
r = client.get("/api/tasks/P1-002")
assert r.status_code == 200
assert r.json()["brief"]["task_type"] == "manual_escalation"
def test_run_sets_stderr_devnull(client):
"""Регрессионный тест KIN-058: stderr=DEVNULL всегда устанавливается в Popen,
чтобы stderr дочернего процесса не загрязнял логи uvicorn."""
import subprocess as _subprocess
from unittest.mock import patch, MagicMock
with patch("web.api.subprocess.Popen") as mock_popen:
mock_proc = MagicMock()
mock_proc.pid = 77
mock_popen.return_value = mock_proc
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 202
call_kwargs = mock_popen.call_args[1]
assert call_kwargs.get("stderr") == _subprocess.DEVNULL, (
"Регрессия KIN-058: stderr у Popen должен быть DEVNULL, "
"иначе вывод агента попадает в логи uvicorn"
)
# ---------------------------------------------------------------------------
# KIN-065 — PATCH /api/projects/{id} — autocommit_enabled toggle
# ---------------------------------------------------------------------------
def test_patch_project_autocommit_enabled_true(client):
"""PATCH с autocommit_enabled=true → 200, поле установлено в 1."""
r = client.patch("/api/projects/p1", json={"autocommit_enabled": True})
assert r.status_code == 200
assert r.json()["autocommit_enabled"] == 1
def test_patch_project_autocommit_enabled_false(client):
"""После включения PATCH с autocommit_enabled=false → 200, поле установлено в 0."""
client.patch("/api/projects/p1", json={"autocommit_enabled": True})
r = client.patch("/api/projects/p1", json={"autocommit_enabled": False})
assert r.status_code == 200
assert r.json()["autocommit_enabled"] == 0
def test_patch_project_autocommit_persisted_via_sql(client):
"""После PATCH autocommit_enabled=True прямой SQL подтверждает значение 1."""
client.patch("/api/projects/p1", json={"autocommit_enabled": True})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT autocommit_enabled FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row is not None
assert row[0] == 1
def test_patch_project_autocommit_false_persisted_via_sql(client):
"""После PATCH autocommit_enabled=False прямой SQL подтверждает значение 0."""
client.patch("/api/projects/p1", json={"autocommit_enabled": True})
client.patch("/api/projects/p1", json={"autocommit_enabled": False})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT autocommit_enabled FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row is not None
assert row[0] == 0
def test_patch_project_autocommit_null_before_first_update(client):
"""Новый проект имеет autocommit_enabled=NULL/0 (falsy) до первого обновления."""
client.post("/api/projects", json={"id": "p_new", "name": "New", "path": "/new"})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT autocommit_enabled FROM projects WHERE id = 'p_new'").fetchone()
conn.close()
assert row is not None
assert not row[0] # DEFAULT 0 или NULL — в любом случае falsy
def test_patch_project_empty_body_returns_400(client):
"""PATCH проекта без полей → 400."""
r = client.patch("/api/projects/p1", json={})
assert r.status_code == 400
def test_patch_project_not_found(client):
"""PATCH несуществующего проекта → 404."""
r = client.patch("/api/projects/NOPE", json={"autocommit_enabled": True})
assert r.status_code == 404
def test_patch_project_autocommit_and_execution_mode_together(client):
"""PATCH с autocommit_enabled и execution_mode → оба поля обновлены."""
r = client.patch("/api/projects/p1", json={
"autocommit_enabled": True,
"execution_mode": "auto_complete",
})
assert r.status_code == 200
data = r.json()
assert data["autocommit_enabled"] == 1
assert data["execution_mode"] == "auto_complete"
def test_patch_project_returns_full_project_object(client):
"""PATCH возвращает полный объект проекта с id, name и autocommit_enabled."""
r = client.patch("/api/projects/p1", json={"autocommit_enabled": True})
assert r.status_code == 200
data = r.json()
assert data["id"] == "p1"
assert data["name"] == "P1"
assert "autocommit_enabled" in data
# ---------------------------------------------------------------------------
# KIN-008 — PATCH priority и route_type задачи
# ---------------------------------------------------------------------------
def test_patch_task_priority(client):
"""PATCH priority задачи обновляет поле и возвращает задачу."""
r = client.patch("/api/tasks/P1-001", json={"priority": 3})
assert r.status_code == 200
assert r.json()["priority"] == 3
def test_patch_task_priority_persisted(client):
"""После PATCH priority повторный GET возвращает новое значение."""
client.patch("/api/tasks/P1-001", json={"priority": 7})
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
assert r.json()["priority"] == 7
def test_patch_task_priority_invalid_zero(client):
"""PATCH с priority=0 → 400."""
r = client.patch("/api/tasks/P1-001", json={"priority": 0})
assert r.status_code == 400
def test_patch_task_priority_invalid_eleven(client):
"""PATCH с priority=11 → 400."""
r = client.patch("/api/tasks/P1-001", json={"priority": 11})
assert r.status_code == 400
def test_patch_task_route_type_set(client):
"""PATCH route_type сохраняет значение в brief."""
r = client.patch("/api/tasks/P1-001", json={"route_type": "feature"})
assert r.status_code == 200
data = r.json()
assert data["brief"]["route_type"] == "feature"
def test_patch_task_route_type_all_valid(client):
"""Все допустимые route_type принимаются."""
for rt in ("debug", "feature", "refactor", "hotfix"):
r = client.patch("/api/tasks/P1-001", json={"route_type": rt})
assert r.status_code == 200, f"route_type={rt} rejected"
assert r.json()["brief"]["route_type"] == rt
def test_patch_task_route_type_invalid(client):
"""Недопустимый route_type → 400."""
r = client.patch("/api/tasks/P1-001", json={"route_type": "unknown"})
assert r.status_code == 400
def test_patch_task_route_type_clear(client):
"""PATCH route_type='' очищает поле из brief."""
client.patch("/api/tasks/P1-001", json={"route_type": "debug"})
r = client.patch("/api/tasks/P1-001", json={"route_type": ""})
assert r.status_code == 200
data = r.json()
brief = data.get("brief")
if brief:
assert "route_type" not in brief
def test_patch_task_route_type_merges_brief(client):
"""route_type сохраняется вместе с другими полями brief без перезаписи."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", brief={"extra": "data"})
conn.close()
r = client.patch("/api/tasks/P1-001", json={"route_type": "hotfix"})
assert r.status_code == 200
brief = r.json()["brief"]
assert brief["route_type"] == "hotfix"
assert brief["extra"] == "data"
def test_patch_task_priority_and_route_type_together(client):
"""PATCH может обновить priority и route_type одновременно."""
r = client.patch("/api/tasks/P1-001", json={"priority": 2, "route_type": "refactor"})
assert r.status_code == 200
data = r.json()
assert data["priority"] == 2
assert data["brief"]["route_type"] == "refactor"
def test_patch_task_empty_body_still_returns_400(client):
"""Пустое тело по-прежнему возвращает 400 (регрессия KIN-008)."""
r = client.patch("/api/tasks/P1-001", json={})
assert r.status_code == 400
# PATCH /api/tasks/{id} — редактирование title и brief_text (KIN-015)
def test_patch_task_title(client):
"""PATCH title обновляет заголовок задачи."""
r = client.patch("/api/tasks/P1-001", json={"title": "Новый заголовок"})
assert r.status_code == 200
assert r.json()["title"] == "Новый заголовок"
def test_patch_task_title_persisted(client):
"""PATCH title сохраняется в БД."""
client.patch("/api/tasks/P1-001", json={"title": "Персистентный заголовок"})
r = client.get("/api/tasks/P1-001")
assert r.json()["title"] == "Персистентный заголовок"
def test_patch_task_title_empty_returns_400(client):
"""Пустой title → 400."""
r = client.patch("/api/tasks/P1-001", json={"title": " "})
assert r.status_code == 400
def test_patch_task_brief_text(client):
"""PATCH brief_text сохраняется в brief.text."""
r = client.patch("/api/tasks/P1-001", json={"brief_text": "Описание задачи"})
assert r.status_code == 200
assert r.json()["brief"]["text"] == "Описание задачи"
def test_patch_task_brief_text_persisted(client):
"""PATCH brief_text сохраняется в БД."""
client.patch("/api/tasks/P1-001", json={"brief_text": "Сохранённое описание"})
r = client.get("/api/tasks/P1-001")
assert r.json()["brief"]["text"] == "Сохранённое описание"
def test_patch_task_brief_text_merges_route_type(client):
"""brief_text не перезаписывает route_type в brief."""
client.patch("/api/tasks/P1-001", json={"route_type": "feature"})
client.patch("/api/tasks/P1-001", json={"brief_text": "Описание"})
r = client.get("/api/tasks/P1-001")
brief = r.json()["brief"]
assert brief["text"] == "Описание"
assert brief["route_type"] == "feature"
def test_patch_task_title_and_brief_text_together(client):
"""PATCH может обновить title и brief_text одновременно."""
r = client.patch("/api/tasks/P1-001", json={"title": "Совместное", "brief_text": "и описание"})
assert r.status_code == 200
data = r.json()
assert data["title"] == "Совместное"
assert data["brief"]["text"] == "и описание"
# ---------------------------------------------------------------------------
# KIN-049 — Deploy: миграция, PATCH deploy_command, POST /deploy
# ---------------------------------------------------------------------------
def test_deploy_command_column_exists_in_schema(client):
"""Миграция: PRAGMA table_info(projects) подтверждает наличие deploy_command (decision #74)."""
from core.db import init_db
conn = init_db(api_module.DB_PATH)
cols = {row[1] for row in conn.execute("PRAGMA table_info(projects)").fetchall()}
conn.close()
assert "deploy_command" in cols
def test_patch_project_deploy_command_persisted_via_sql(client):
"""PATCH с deploy_command сохраняется в БД — прямой SQL (decision #55)."""
client.patch("/api/projects/p1", json={"deploy_command": "echo hello"})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT deploy_command FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row is not None
assert row[0] == "echo hello"
def test_patch_project_deploy_command_returned_in_response(client):
"""После PATCH ответ содержит обновлённый deploy_command."""
r = client.patch("/api/projects/p1", json={"deploy_command": "git push origin main"})
assert r.status_code == 200
assert r.json()["deploy_command"] == "git push origin main"
def test_patch_project_deploy_command_empty_string_clears_to_null(client):
"""PATCH с пустой строкой очищает deploy_command → NULL (decision #68)."""
client.patch("/api/projects/p1", json={"deploy_command": "echo hello"})
client.patch("/api/projects/p1", json={"deploy_command": ""})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT deploy_command FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row[0] is None
def test_deploy_project_executes_command_returns_stdout(client):
"""POST /deploy — команда echo → stdout присутствует в ответе."""
from unittest.mock import patch, MagicMock
client.patch("/api/projects/p1", json={"deploy_command": "echo deployed"})
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = "deployed\n"
mock_result.stderr = ""
with patch("web.api.subprocess.run", return_value=mock_result):
r = client.post("/api/projects/p1/deploy")
assert r.status_code == 200
data = r.json()
assert data["success"] is True
assert data["exit_code"] == 0
assert "deployed" in data["stdout"]
assert "duration_seconds" in data
def test_deploy_project_without_deploy_command_returns_400(client):
"""POST /deploy для проекта без deploy_command → 400."""
r = client.post("/api/projects/p1/deploy")
assert r.status_code == 400
def test_deploy_project_not_found_returns_404(client):
"""POST /deploy для несуществующего проекта → 404."""
r = client.post("/api/projects/NOPE/deploy")
assert r.status_code == 404
def test_deploy_project_failed_command_returns_success_false(client):
"""POST /deploy — ненулевой exit_code → success=False (команда выполнилась, но упала)."""
from unittest.mock import patch, MagicMock
client.patch("/api/projects/p1", json={"deploy_command": "exit 1"})
mock_result = MagicMock()
mock_result.returncode = 1
mock_result.stdout = ""
mock_result.stderr = "error occurred"
with patch("web.api.subprocess.run", return_value=mock_result):
r = client.post("/api/projects/p1/deploy")
assert r.status_code == 200
data = r.json()
assert data["success"] is False
assert data["exit_code"] == 1
assert "error occurred" in data["stderr"]
def test_deploy_project_timeout_returns_504(client):
"""POST /deploy — timeout → 504."""
from unittest.mock import patch
import subprocess
client.patch("/api/projects/p1", json={"deploy_command": "sleep 100"})
with patch("web.api.subprocess.run", side_effect=subprocess.TimeoutExpired("sleep 100", 60)):
r = client.post("/api/projects/p1/deploy")
assert r.status_code == 504
def test_task_full_includes_project_deploy_command(client):
"""GET /api/tasks/{id}/full включает project_deploy_command из таблицы projects."""
client.patch("/api/projects/p1", json={"deploy_command": "git push"})
r = client.get("/api/tasks/P1-001/full")
assert r.status_code == 200
data = r.json()
assert "project_deploy_command" in data
assert data["project_deploy_command"] == "git push"
def test_task_full_project_deploy_command_none_when_not_set(client):
"""GET /api/tasks/{id}/full возвращает project_deploy_command=None когда не задана."""
r = client.get("/api/tasks/P1-001/full")
assert r.status_code == 200
data = r.json()
assert "project_deploy_command" in data
assert data["project_deploy_command"] is None
2026-03-17 18:34:56 +02:00
# ---------------------------------------------------------------------------
# KIN-INFRA-009 — project_deploy_runtime/host/path в TaskFull
# ---------------------------------------------------------------------------
def test_task_full_includes_project_deploy_runtime(client):
"""GET /api/tasks/{id}/full включает project_deploy_runtime из таблицы projects."""
client.patch("/api/projects/p1", json={"deploy_runtime": "docker"})
r = client.get("/api/tasks/P1-001/full")
assert r.status_code == 200
data = r.json()
assert "project_deploy_runtime" in data
assert data["project_deploy_runtime"] == "docker"
def test_task_full_includes_project_deploy_host_and_path(client):
"""GET /api/tasks/{id}/full включает project_deploy_host и project_deploy_path."""
client.patch("/api/projects/p1", json={"deploy_host": "10.0.0.5", "deploy_path": "/srv/app"})
r = client.get("/api/tasks/P1-001/full")
assert r.status_code == 200
data = r.json()
assert data["project_deploy_host"] == "10.0.0.5"
assert data["project_deploy_path"] == "/srv/app"
def test_task_full_deploy_runtime_fields_none_when_not_set(client):
"""GET /api/tasks/{id}/full возвращает None для всех deploy-полей если не заданы."""
r = client.get("/api/tasks/P1-001/full")
assert r.status_code == 200
data = r.json()
assert data["project_deploy_runtime"] is None
assert data["project_deploy_host"] is None
assert data["project_deploy_path"] is None
def test_task_full_deploy_runtime_and_command_coexist(client):
"""GET /api/tasks/{id}/full возвращает одновременно deploy_runtime и deploy_command."""
client.patch("/api/projects/p1", json={"deploy_runtime": "python", "deploy_command": "make deploy"})
r = client.get("/api/tasks/P1-001/full")
assert r.status_code == 200
data = r.json()
assert data["project_deploy_runtime"] == "python"
assert data["project_deploy_command"] == "make deploy"
# ---------------------------------------------------------------------------
# KIN-067 — PATCH obsidian_vault_path + sync/obsidian не возвращает 400
# ---------------------------------------------------------------------------
def test_patch_project_obsidian_vault_path_persisted_via_sql(client):
"""PATCH с obsidian_vault_path сохраняется в БД — прямой SQL."""
r = client.patch("/api/projects/p1", json={"obsidian_vault_path": "/tmp/vault"})
assert r.status_code == 200
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT obsidian_vault_path FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row is not None
assert row[0] == "/tmp/vault"
def test_patch_project_obsidian_vault_path_returned_in_response(client):
"""PATCH возвращает обновлённый obsidian_vault_path в ответе."""
r = client.patch("/api/projects/p1", json={"obsidian_vault_path": "/my/vault"})
assert r.status_code == 200
assert r.json()["obsidian_vault_path"] == "/my/vault"
def test_sync_obsidian_without_vault_path_returns_400(client):
"""POST sync/obsidian без сохранённого vault_path → 400 Bad Request."""
r = client.post("/api/projects/p1/sync/obsidian")
assert r.status_code == 400
def test_sync_obsidian_after_patch_vault_path_not_400(client, tmp_path):
"""Сценарий бага KIN-067: сначала PATCH vault_path, затем sync → не 400.
Раньше runSync() вызывал sync/obsidian без предварительного сохранения пути,
что приводило к 400. После фикса PATCH вызывается первым.
"""
vault = tmp_path / "vault"
vault.mkdir()
# Шаг 1: сохранить vault_path через PATCH (как теперь делает runSync)
r = client.patch("/api/projects/p1", json={"obsidian_vault_path": str(vault)})
assert r.status_code == 200
# Шаг 2: запустить синхронизацию — не должно вернуть 400
r = client.post("/api/projects/p1/sync/obsidian")
assert r.status_code != 400, f"Ожидался не 400, получен {r.status_code}: {r.text}"
assert r.status_code == 200
def test_sync_obsidian_after_patch_returns_sync_result_fields(client, tmp_path):
"""После PATCH vault_path синхронизация возвращает поля exported_decisions и tasks_updated."""
vault = tmp_path / "vault"
vault.mkdir()
client.patch("/api/projects/p1", json={"obsidian_vault_path": str(vault)})
r = client.post("/api/projects/p1/sync/obsidian")
assert r.status_code == 200
data = r.json()
assert "exported_decisions" in data
# ---------------------------------------------------------------------------
# KIN-016 — GET /api/notifications — эскалации от заблокированных агентов
# ---------------------------------------------------------------------------
def test_kin016_notifications_empty_when_no_blocked_tasks(client):
"""KIN-016: GET /api/notifications возвращает [] когда нет заблокированных задач."""
r = client.get("/api/notifications")
assert r.status_code == 200
assert r.json() == []
def test_kin016_notifications_returns_blocked_task_as_escalation(client):
"""KIN-016: заблокированная задача появляется в /api/notifications с корректными полями."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(
conn, "P1-001",
status="blocked",
blocked_reason="cannot access external API",
blocked_at="2026-03-16T10:00:00",
blocked_agent_role="debugger",
blocked_pipeline_step="1",
)
conn.close()
r = client.get("/api/notifications")
assert r.status_code == 200
items = r.json()
assert len(items) == 1
item = items[0]
assert item["task_id"] == "P1-001"
assert item["agent_role"] == "debugger"
assert item["reason"] == "cannot access external API"
assert item["pipeline_step"] == "1"
assert item["blocked_at"] == "2026-03-16T10:00:00"
def test_kin016_notifications_contains_project_id_and_title(client):
"""KIN-016: уведомление содержит project_id и title задачи."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", status="blocked",
blocked_reason="out of scope",
blocked_agent_role="architect")
conn.close()
r = client.get("/api/notifications")
assert r.status_code == 200
item = r.json()[0]
assert item["project_id"] == "p1"
assert item["title"] == "Fix bug"
def test_kin016_notifications_filters_by_project_id(client):
"""KIN-016: ?project_id= фильтрует уведомления по проекту."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
# Создаём второй проект с заблокированной задачей
models.create_project(conn, "p2", "P2", "/p2")
models.create_task(conn, "P2-001", "p2", "Another task")
models.update_task(conn, "P1-001", status="blocked",
blocked_reason="reason A", blocked_agent_role="debugger")
models.update_task(conn, "P2-001", status="blocked",
blocked_reason="reason B", blocked_agent_role="tester")
conn.close()
r = client.get("/api/notifications?project_id=p1")
assert r.status_code == 200
items = r.json()
assert all(i["project_id"] == "p1" for i in items)
assert len(items) == 1
assert items[0]["task_id"] == "P1-001"
def test_kin016_notifications_only_returns_blocked_status(client):
"""KIN-016: задачи в статусе pending/review/done НЕ попадают в уведомления."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
# Задача остаётся в pending (дефолт)
assert models.get_task(conn, "P1-001")["status"] == "pending"
conn.close()
r = client.get("/api/notifications")
assert r.status_code == 200
assert r.json() == []
def test_kin016_pipeline_blocked_agent_stops_next_steps_integration(client):
"""KIN-016: после blocked пайплайна задача блокируется, /api/notifications показывает её.
Интеграционный тест: pipeline blocked /api/notifications содержит task.
"""
import json
from unittest.mock import patch, MagicMock
blocked_output = json.dumps({
"result": json.dumps({"status": "blocked", "reason": "no repo access"}),
})
mock_proc = MagicMock()
mock_proc.pid = 123
with patch("web.api.subprocess.Popen") as mock_popen:
mock_popen.return_value = mock_proc
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 202
# Вручную помечаем задачу blocked (имитируем результат пайплайна)
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(
conn, "P1-001",
status="blocked",
blocked_reason="no repo access",
blocked_agent_role="debugger",
blocked_pipeline_step="1",
)
conn.close()
r = client.get("/api/notifications")
assert r.status_code == 200
items = r.json()
assert len(items) == 1
assert items[0]["task_id"] == "P1-001"
assert items[0]["reason"] == "no repo access"
assert items[0]["agent_role"] == "debugger"
# ---------------------------------------------------------------------------
# KIN-BIZ-001 — telegram_sent из БД (не заглушка)
# ---------------------------------------------------------------------------
def test_kin_biz_001_telegram_sent_not_stub(client):
"""Регрессия KIN-BIZ-001: /api/notifications возвращает реальный telegram_sent из БД, не False-заглушку."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(
conn, "P1-001",
status="blocked",
blocked_reason="cannot access repo",
blocked_agent_role="debugger",
)
models.mark_telegram_sent(conn, "P1-001")
conn.close()
r = client.get("/api/notifications")
assert r.status_code == 200
items = r.json()
assert len(items) == 1
# Ключевая проверка: telegram_sent должен быть True из БД, не False-заглушка
assert items[0]["telegram_sent"] is True
def test_kin_biz_001_notifications_telegram_sent_false_when_not_sent(client):
"""KIN-BIZ-001: telegram_sent=False для задачи, где уведомление не отправлялось."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(
conn, "P1-001",
status="blocked",
blocked_reason="no access",
blocked_agent_role="tester",
)
# Не вызываем mark_telegram_sent → telegram_sent остаётся 0
conn.close()
r = client.get("/api/notifications")
assert r.status_code == 200
items = r.json()
assert len(items) == 1
assert items[0]["telegram_sent"] is False
def test_kin_biz_001_telegram_sent_distinguishes_sent_and_not_sent(client):
"""KIN-BIZ-001: список уведомлений корректно различает sent/not-sent задачи."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.create_task(conn, "P1-002", "p1", "Another task")
models.update_task(
conn, "P1-001",
status="blocked",
blocked_reason="reason 1",
blocked_agent_role="debugger",
)
models.update_task(
conn, "P1-002",
status="blocked",
blocked_reason="reason 2",
blocked_agent_role="tester",
)
# Telegram отправлен только для P1-001
models.mark_telegram_sent(conn, "P1-001")
conn.close()
r = client.get("/api/notifications")
assert r.status_code == 200
items = r.json()
assert len(items) == 2
by_id = {item["task_id"]: item for item in items}
assert by_id["P1-001"]["telegram_sent"] is True
assert by_id["P1-002"]["telegram_sent"] is False
# ---------------------------------------------------------------------------
# KIN-071: project_type и SSH-поля в API
# ---------------------------------------------------------------------------
def test_create_operations_project_with_ssh_fields(client):
"""KIN-071: POST /api/projects с project_type=operations и SSH-полями возвращает 200."""
r = client.post("/api/projects", json={
"id": "srv1",
"name": "My Server",
"project_type": "operations",
"ssh_host": "10.0.0.1",
"ssh_user": "root",
"ssh_key_path": "~/.ssh/id_rsa",
"ssh_proxy_jump": "jumpt",
})
assert r.status_code == 200
data = r.json()
assert data["project_type"] == "operations"
assert data["path"] is None
assert data["ssh_host"] == "10.0.0.1"
assert data["ssh_user"] == "root"
assert data["ssh_key_path"] == "~/.ssh/id_rsa"
assert data["ssh_proxy_jump"] == "jumpt"
def test_create_project_invalid_type_returns_400(client):
"""KIN-071: POST /api/projects с недопустимым project_type → 400."""
r = client.post("/api/projects", json={
"id": "bad",
"name": "Bad",
"path": "/bad",
"project_type": "legacy",
})
assert r.status_code == 400
def test_patch_project_invalid_type_returns_400(client):
"""KIN-071: PATCH /api/projects/{id} с недопустимым project_type → 400."""
r = client.patch("/api/projects/p1", json={"project_type": "invalid_type"})
assert r.status_code == 400
def test_create_operations_project_without_ssh_host_allowed(client):
"""Регрессионный тест KIN-ARCH-001: воспроизводит СЛОМАННОЕ поведение до фикса.
До фикса: POST operations-проекта без ssh_host возвращал 200.
После фикса: должен возвращать 422 (Pydantic model_validator).
Этот тест НАМЕРЕННО проверяет, что старое поведение больше не существует.
"""
r = client.post("/api/projects", json={
"id": "srv2",
"name": "Server No SSH",
"project_type": "operations",
})
# Фикс KIN-ARCH-001: был 200, стал 422
assert r.status_code == 422, (
"Регрессия KIN-ARCH-001: POST operations-проекта без ssh_host "
"должен возвращать 422, а не 200"
)
# ---------------------------------------------------------------------------
# KIN-ARCH-001 — серверная валидация ssh_host для operations-проектов
# ---------------------------------------------------------------------------
def test_kin_arch_001_operations_without_ssh_host_returns_422(client):
"""Регрессионный тест KIN-ARCH-001: POST /api/projects с project_type='operations'
и без ssh_host 422 Unprocessable Entity."""
r = client.post("/api/projects", json={
"id": "ops_no_ssh",
"name": "Ops Without SSH",
"project_type": "operations",
})
assert r.status_code == 422
def test_kin_arch_001_operations_with_empty_ssh_host_returns_422(client):
"""Регрессионный тест KIN-ARCH-001: пустая строка в ssh_host считается отсутствующим
значением 422."""
r = client.post("/api/projects", json={
"id": "ops_empty_ssh",
"name": "Ops Empty SSH",
"project_type": "operations",
"ssh_host": "",
})
assert r.status_code == 422
def test_kin_arch_001_operations_with_valid_ssh_host_returns_200(client):
"""Регрессионный тест KIN-ARCH-001: POST /api/projects с project_type='operations'
и корректным ssh_host 200, проект создаётся."""
r = client.post("/api/projects", json={
"id": "ops_with_ssh",
"name": "Ops With SSH",
"project_type": "operations",
"ssh_host": "10.0.0.42",
})
assert r.status_code == 200
data = r.json()
assert data["project_type"] == "operations"
assert data["path"] is None
assert data["ssh_host"] == "10.0.0.42"
def test_kin_arch_001_development_without_ssh_host_allowed(client):
"""Регрессионный тест KIN-ARCH-001: project_type='development' без ssh_host
должен создаваться без ошибок валидатор срабатывает только для operations."""
r = client.post("/api/projects", json={
"id": "dev_no_ssh",
"name": "Dev No SSH",
"path": "/dev",
"project_type": "development",
})
assert r.status_code == 200
assert r.json()["project_type"] == "development"
def test_kin_arch_001_research_without_ssh_host_allowed(client):
"""Регрессионный тест KIN-ARCH-001: project_type='research' без ssh_host
должен создаваться без ошибок."""
r = client.post("/api/projects", json={
"id": "res_no_ssh",
"name": "Research No SSH",
"path": "/research",
"project_type": "research",
})
assert r.status_code == 200
assert r.json()["project_type"] == "research"
def test_kin_arch_001_422_error_message_mentions_ssh_host(client):
"""Регрессионный тест KIN-ARCH-001: тело 422-ответа содержит сообщение об ошибке
с упоминанием ssh_host."""
r = client.post("/api/projects", json={
"id": "ops_err_msg",
"name": "Check Error Message",
"project_type": "operations",
})
assert r.status_code == 422
body = r.json()
# Pydantic возвращает detail со списком ошибок
detail_str = str(body)
assert "ssh_host" in detail_str
def test_create_research_project_type_accepted(client):
"""KIN-071: project_type=research принимается API."""
r = client.post("/api/projects", json={
"id": "res1",
"name": "Research Project",
"path": "/research",
"project_type": "research",
})
assert r.status_code == 200
assert r.json()["project_type"] == "research"
# ---------------------------------------------------------------------------
# KIN-ARCH-003 — path nullable для operations-проектов
# Исправляет баг: workaround с пустой строкой ("") для operations-проектов
# ---------------------------------------------------------------------------
def test_kin_arch_003_operations_project_without_path_returns_200(client):
"""KIN-ARCH-003: POST /api/projects с project_type='operations' без path → 200.
До фикса: path="" передавался как workaround для NOT NULL constraint.
После фикса: path не передаётся вовсе, сохраняется как NULL.
"""
r = client.post("/api/projects", json={
"id": "ops_null_path",
"name": "Ops Null Path",
"project_type": "operations",
"ssh_host": "10.0.0.1",
})
assert r.status_code == 200
data = r.json()
assert data["path"] is None, (
"KIN-ARCH-003 регрессия: path должен быть NULL, а не пустой строкой"
)
def test_kin_arch_003_development_project_without_path_returns_422(client):
"""KIN-ARCH-003: POST /api/projects с project_type='development' без path → 422.
Pydantic validate_fields: path обязателен для non-operations проектов.
"""
r = client.post("/api/projects", json={
"id": "dev_no_path",
"name": "Dev No Path",
"project_type": "development",
})
assert r.status_code == 422
def test_kin_arch_003_development_without_path_error_mentions_path(client):
"""KIN-ARCH-003: тело 422-ответа содержит упоминание об обязательности path."""
r = client.post("/api/projects", json={
"id": "dev_no_path_msg",
"name": "Dev No Path Msg",
"project_type": "development",
})
assert r.status_code == 422
detail_str = str(r.json())
assert "path" in detail_str
def test_kin_arch_003_deploy_operations_project_null_path_uses_cwd_none(client):
"""KIN-ARCH-003: deploy_project для operations-проекта с path=NULL
не вызывает Path.exists() передаёт cwd=None в subprocess.run."""
from unittest.mock import patch, MagicMock
client.post("/api/projects", json={
"id": "ops_deploy_null",
"name": "Ops Deploy Null Path",
"project_type": "operations",
"ssh_host": "10.0.0.1",
})
client.patch("/api/projects/ops_deploy_null", json={"deploy_command": "echo ok"})
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = "ok\n"
mock_result.stderr = ""
with patch("subprocess.run", return_value=mock_result) as mock_run:
r = client.post("/api/projects/ops_deploy_null/deploy")
assert r.status_code == 200
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs.get("cwd") is None, (
"KIN-ARCH-003: для operations-проектов без path, cwd должен быть None"
)
# ---------------------------------------------------------------------------
# Bootstrap endpoint — KIN-081
# ---------------------------------------------------------------------------
@pytest.fixture
def bootstrap_client(tmp_path):
"""TestClient без seed-данных, с отдельным DB_PATH."""
db_path = tmp_path / "bs_test.db"
api_module.DB_PATH = db_path
from web.api import app
return TestClient(app), tmp_path
def test_bootstrap_endpoint_invalid_path_returns_400(bootstrap_client):
"""KIN-081: bootstrap возвращает 400 если путь не существует."""
client, _ = bootstrap_client
r = client.post("/api/bootstrap", json={
"id": "newproj", "name": "New Project", "path": "/nonexistent/path/that/does/not/exist"
})
assert r.status_code == 400
assert "not a directory" in r.json()["detail"].lower()
def test_bootstrap_endpoint_duplicate_id_returns_409(bootstrap_client, tmp_path):
"""KIN-081: bootstrap возвращает 409 если проект с таким ID уже существует."""
client, _ = bootstrap_client
proj_dir = tmp_path / "myproj"
proj_dir.mkdir()
# Create project first
client.post("/api/projects", json={"id": "existing", "name": "Existing", "path": str(proj_dir)})
# Try bootstrap with same ID
r = client.post("/api/bootstrap", json={
"id": "existing", "name": "Same ID", "path": str(proj_dir)
})
assert r.status_code == 409
assert "already exists" in r.json()["detail"]
def test_bootstrap_endpoint_rollback_on_save_error(bootstrap_client, tmp_path):
"""KIN-081: при ошибке в save_to_db проект удаляется (rollback), возвращается 500."""
client, _ = bootstrap_client
proj_dir = tmp_path / "rollbackproj"
proj_dir.mkdir()
from core.db import init_db
from core import models as _models
def _save_create_then_fail(conn, project_id, name, path, *args, **kwargs):
# Simulate partial write: project row created, then error
_models.create_project(conn, project_id, name, path)
raise RuntimeError("simulated DB error after project created")
with patch("web.api.save_to_db", side_effect=_save_create_then_fail):
r = client.post("/api/bootstrap", json={
"id": "rollbackproj", "name": "Rollback Test", "path": str(proj_dir)
})
assert r.status_code == 500
assert "Bootstrap failed" in r.json()["detail"]
# Project must NOT remain in DB (rollback was executed)
conn = init_db(api_module.DB_PATH)
assert _models.get_project(conn, "rollbackproj") is None
conn.close()
def test_bootstrap_endpoint_success(bootstrap_client, tmp_path):
"""KIN-081: успешный bootstrap возвращает 200 с project и counts."""
client, _ = bootstrap_client
proj_dir = tmp_path / "goodproj"
proj_dir.mkdir()
(proj_dir / "requirements.txt").write_text("fastapi\n")
with patch("web.api.find_vault_root", return_value=None):
r = client.post("/api/bootstrap", json={
"id": "goodproj", "name": "Good Project", "path": str(proj_dir)
})
assert r.status_code == 200
data = r.json()
assert data["project"]["id"] == "goodproj"
assert "modules_count" in data
assert "decisions_count" in data
def test_delete_project_ok(client):
# Create a separate project to delete
r = client.post("/api/projects", json={"id": "del1", "name": "Del1", "path": "/del1"})
assert r.status_code == 200
r = client.delete("/api/projects/del1")
assert r.status_code == 204
assert r.content == b""
# Verify project is gone
r = client.get("/api/projects/del1")
assert r.status_code == 404
def test_delete_project_not_found(client):
r = client.delete("/api/projects/99999")
assert r.status_code == 404
# ---------------------------------------------------------------------------
# Environments (KIN-087)
# ---------------------------------------------------------------------------
def test_create_environment(client):
r = client.post("/api/projects/p1/environments", json={
"name": "prod",
"host": "10.0.0.1",
"username": "pelmen",
"port": 22,
"auth_type": "password",
"auth_value": "s3cr3t",
"is_installed": False,
})
assert r.status_code == 201
data = r.json()
assert data["name"] == "prod"
assert data["host"] == "10.0.0.1"
assert data["username"] == "pelmen"
# auth_value must be hidden in responses
assert data.get("auth_value") is None
assert "scan_task_id" not in data
def test_create_environment_project_not_found(client):
r = client.post("/api/projects/nope/environments", json={
"name": "prod",
"host": "10.0.0.1",
"username": "root",
})
assert r.status_code == 404
def test_create_environment_invalid_auth_type(client):
r = client.post("/api/projects/p1/environments", json={
"name": "prod",
"host": "10.0.0.1",
"username": "root",
"auth_type": "oauth",
})
assert r.status_code == 422
def test_create_environment_invalid_port(client):
r = client.post("/api/projects/p1/environments", json={
"name": "prod",
"host": "10.0.0.1",
"username": "root",
"port": 99999,
})
assert r.status_code == 422
def test_create_environment_triggers_scan_when_installed(client):
"""is_installed=True на POST должен создать задачу sysadmin и вернуть scan_task_id."""
with patch("subprocess.Popen") as mock_popen:
mock_popen.return_value = MagicMock(pid=12345)
r = client.post("/api/projects/p1/environments", json={
"name": "prod",
"host": "10.0.0.2",
"username": "pelmen",
"is_installed": True,
})
assert r.status_code == 201
data = r.json()
assert "scan_task_id" in data
task_id = data["scan_task_id"]
# Verify the task exists with sysadmin role
from core.db import init_db
from core import models as m
conn = init_db(api_module.DB_PATH)
task = m.get_task(conn, task_id)
conn.close()
assert task is not None
assert task["assigned_role"] == "sysadmin"
assert task["category"] == "INFRA"
def test_list_environments(client):
client.post("/api/projects/p1/environments", json={
"name": "dev", "host": "10.0.0.10", "username": "dev",
})
client.post("/api/projects/p1/environments", json={
"name": "prod", "host": "10.0.0.11", "username": "prod",
})
r = client.get("/api/projects/p1/environments")
assert r.status_code == 200
data = r.json()
assert len(data) == 2
names = {e["name"] for e in data}
assert names == {"dev", "prod"}
def test_list_environments_project_not_found(client):
r = client.get("/api/projects/nope/environments")
assert r.status_code == 404
def test_patch_environment(client):
r = client.post("/api/projects/p1/environments", json={
"name": "dev", "host": "10.0.0.20", "username": "root",
})
env_id = r.json()["id"]
r = client.patch(f"/api/projects/p1/environments/{env_id}", json={
"host": "10.0.0.99",
})
assert r.status_code == 200
assert r.json()["host"] == "10.0.0.99"
def test_patch_environment_triggers_scan_on_false_to_true(client):
"""PATCH is_installed false→true должен запустить скан."""
r = client.post("/api/projects/p1/environments", json={
"name": "staging", "host": "10.0.0.30", "username": "root", "is_installed": False,
})
env_id = r.json()["id"]
with patch("subprocess.Popen") as mock_popen:
mock_popen.return_value = MagicMock(pid=22222)
r = client.patch(f"/api/projects/p1/environments/{env_id}", json={
"is_installed": True,
})
assert r.status_code == 200
assert "scan_task_id" in r.json()
def test_patch_environment_no_duplicate_scan(client):
"""Повторный PATCH is_installed=True (true→true) не создаёт новую задачу."""
with patch("subprocess.Popen") as mock_popen:
mock_popen.return_value = MagicMock(pid=33333)
r = client.post("/api/projects/p1/environments", json={
"name": "prod2", "host": "10.0.0.40", "username": "root", "is_installed": True,
})
first_task_id = r.json().get("scan_task_id")
assert first_task_id is not None
env_id = r.json()["id"]
# Second PATCH with host change — was_installed=True, so no scan triggered
with patch("subprocess.Popen") as mock_popen2:
mock_popen2.return_value = MagicMock(pid=44444)
r2 = client.patch(f"/api/projects/p1/environments/{env_id}", json={
"host": "10.0.0.41",
})
assert r2.status_code == 200
assert "scan_task_id" not in r2.json()
def test_patch_environment_nothing_to_update(client):
r = client.post("/api/projects/p1/environments", json={
"name": "dev", "host": "10.0.0.50", "username": "root",
})
env_id = r.json()["id"]
r = client.patch(f"/api/projects/p1/environments/{env_id}", json={})
assert r.status_code == 400
def test_patch_environment_not_found(client):
r = client.patch("/api/projects/p1/environments/99999", json={"host": "1.2.3.4"})
assert r.status_code == 404
def test_delete_environment(client):
r = client.post("/api/projects/p1/environments", json={
"name": "dev", "host": "10.0.0.60", "username": "root",
})
env_id = r.json()["id"]
r = client.delete(f"/api/projects/p1/environments/{env_id}")
assert r.status_code == 204
# Verify gone
r = client.get("/api/projects/p1/environments")
ids = [e["id"] for e in r.json()]
assert env_id not in ids
def test_delete_environment_not_found(client):
r = client.delete("/api/projects/p1/environments/99999")
assert r.status_code == 404
def test_scan_environment(client):
r = client.post("/api/projects/p1/environments", json={
"name": "prod", "host": "10.0.0.70", "username": "root",
})
env_id = r.json()["id"]
with patch("subprocess.Popen") as mock_popen:
mock_popen.return_value = MagicMock(pid=55555)
r = client.post(f"/api/projects/p1/environments/{env_id}/scan")
assert r.status_code == 202
data = r.json()
assert data["status"] == "started"
assert "task_id" in data
def test_scan_environment_not_found(client):
r = client.post("/api/projects/p1/environments/99999/scan")
assert r.status_code == 404
# ---------------------------------------------------------------------------
# Environments (KIN-087) — дополнительные тесты по acceptance criteria
# ---------------------------------------------------------------------------
def test_list_environments_auth_value_hidden():
"""GET /environments не должен возвращать auth_value (AC: маскировка)."""
import web.api as api_module2
from pathlib import Path
import tempfile
with tempfile.TemporaryDirectory() as tmp:
db_path = Path(tmp) / "t.db"
api_module2.DB_PATH = db_path
from web.api import app
from fastapi.testclient import TestClient
c = TestClient(app)
c.post("/api/projects", json={"id": "p2", "name": "P2", "path": "/p2"})
c.post("/api/projects/p2/environments", json={
"name": "prod", "host": "1.2.3.4", "username": "root",
"auth_type": "password", "auth_value": "supersecret",
})
r = c.get("/api/projects/p2/environments")
assert r.status_code == 200
for env in r.json():
assert env.get("auth_value") is None
def test_patch_environment_auth_value_hidden(client):
"""PATCH /environments/{id} не должен возвращать auth_value в ответе (AC: маскировка)."""
r = client.post("/api/projects/p1/environments", json={
"name": "masked", "host": "5.5.5.5", "username": "user",
"auth_value": "topsecret",
})
env_id = r.json()["id"]
r = client.patch(f"/api/projects/p1/environments/{env_id}", json={"host": "6.6.6.6"})
assert r.status_code == 200
assert r.json().get("auth_value") is None
def test_is_installed_flag_persisted(client):
"""is_installed=True сохраняется и возвращается в GET-списке (AC: чекбокс работает)."""
with patch("subprocess.Popen") as mock_popen:
mock_popen.return_value = MagicMock(pid=99001)
r = client.post("/api/projects/p1/environments", json={
"name": "installed_prod", "host": "7.7.7.7", "username": "admin",
"is_installed": True,
})
assert r.status_code == 201
env_id = r.json()["id"]
r = client.get("/api/projects/p1/environments")
envs = {e["id"]: e for e in r.json()}
assert bool(envs[env_id]["is_installed"]) is True
def test_is_installed_false_not_installed(client):
"""is_installed=False по умолчанию сохраняется корректно."""
r = client.post("/api/projects/p1/environments", json={
"name": "notinstalled", "host": "8.8.8.8", "username": "ops",
"is_installed": False,
})
assert r.status_code == 201
env_id = r.json()["id"]
r = client.get("/api/projects/p1/environments")
envs = {e["id"]: e for e in r.json()}
assert not bool(envs[env_id]["is_installed"])
def test_sysadmin_scan_task_has_escalation_in_brief(client):
"""Задача sysadmin должна содержать инструкцию об эскалации при нехватке данных (AC#4)."""
with patch("subprocess.Popen") as mock_popen:
mock_popen.return_value = MagicMock(pid=99002)
r = client.post("/api/projects/p1/environments", json={
"name": "esc_test", "host": "9.9.9.9", "username": "deploy",
"is_installed": True,
})
task_id = r.json()["scan_task_id"]
from core.db import init_db
from core import models as m
conn = init_db(api_module.DB_PATH)
task = m.get_task(conn, task_id)
conn.close()
brief = task["brief"]
assert isinstance(brief, dict), "brief must be a dict"
text = brief.get("text", "")
assert "эскалация" in text.lower(), (
"Sysadmin task brief must mention escalation to user when data is insufficient"
)
def test_create_environment_key_auth_type(client):
"""auth_type='key' должен быть принят и сохранён (AC: ключ SSH)."""
r = client.post("/api/projects/p1/environments", json={
"name": "ssh_key_env", "host": "10.10.10.10", "username": "git",
"auth_type": "key", "auth_value": "-----BEGIN OPENSSH PRIVATE KEY-----",
})
assert r.status_code == 201
data = r.json()
assert data["auth_type"] == "key"
assert data.get("auth_value") is None
def test_create_environment_duplicate_name_conflict(client):
"""Повторное создание среды с тем же именем в проекте → 409 Conflict."""
client.post("/api/projects/p1/environments", json={
"name": "unique_env", "host": "11.11.11.11", "username": "root",
})
r = client.post("/api/projects/p1/environments", json={
"name": "unique_env", "host": "22.22.22.22", "username": "root",
})
assert r.status_code == 409
def test_patch_environment_empty_auth_value_preserves_stored(client):
"""PATCH с пустым auth_value не стирает сохранённый credential (AC: безопасность)."""
r = client.post("/api/projects/p1/environments", json={
"name": "cred_safe", "host": "33.33.33.33", "username": "ops",
"auth_value": "original_password",
})
env_id = r.json()["id"]
# Patch без auth_value — credential должен сохраниться
r = client.patch(f"/api/projects/p1/environments/{env_id}", json={"host": "44.44.44.44"})
assert r.status_code == 200
# Читаем raw запись из БД (get_environment возвращает obfuscated auth_value)
from core.db import init_db
from core import models as m
conn = init_db(api_module.DB_PATH)
raw = m.get_environment(conn, env_id)
conn.close()
assert raw["auth_value"] is not None, "Stored credential must be preserved after PATCH without auth_value"
decrypted = m._decrypt_auth(raw["auth_value"])
assert decrypted == "original_password", "Stored credential must be decryptable and match original"
# ---------------------------------------------------------------------------
# KIN-088 — POST /run возвращает 409 если задача уже in_progress
# ---------------------------------------------------------------------------
def test_run_returns_409_when_task_already_in_progress(client):
"""KIN-088: повторный POST /run для задачи со статусом in_progress → 409 с task_already_running."""
from core.db import init_db
from core import models as m
conn = init_db(api_module.DB_PATH)
m.update_task(conn, "P1-001", status="in_progress")
conn.close()
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 409
assert r.json()["error"] == "task_already_running"
def test_run_409_error_key_is_task_already_running(client):
"""KIN-088: тело ответа 409 содержит ключ error='task_already_running'."""
from core.db import init_db
from core import models as m
conn = init_db(api_module.DB_PATH)
m.update_task(conn, "P1-001", status="in_progress")
conn.close()
r = client.post("/api/tasks/P1-001/run")
body = r.json()
assert "error" in body
assert body["error"] == "task_already_running"
def test_run_second_call_does_not_change_status(client):
"""KIN-088: при повторном /run задача остаётся in_progress, статус не сбрасывается."""
from core.db import init_db
from core import models as m
conn = init_db(api_module.DB_PATH)
m.update_task(conn, "P1-001", status="in_progress")
conn.close()
client.post("/api/tasks/P1-001/run") # второй вызов — должен вернуть 409
r = client.get("/api/tasks/P1-001")
assert r.json()["status"] == "in_progress"
def test_run_pending_task_still_returns_202(client):
"""KIN-088: задача со статусом pending запускается без ошибки — 202."""
r = client.post("/api/tasks/P1-001/run")
assert r.status_code == 202
def test_run_kin085_parallel_different_tasks_not_blocked(client):
"""KIN-085: /run для разных задач независимы — in_progress одной не блокирует другую."""
# Создаём вторую задачу
client.post("/api/tasks", json={"project_id": "p1", "title": "Second task"})
# Ставим первую задачу в in_progress
from core.db import init_db
from core import models as m
conn = init_db(api_module.DB_PATH)
m.update_task(conn, "P1-001", status="in_progress")
conn.close()
# Запуск второй задачи должен быть успешным
r = client.post("/api/tasks/P1-002/run")
assert r.status_code == 202
2026-03-17 15:40:31 +02:00
# ---------------------------------------------------------------------------
# KIN-ARCH-008: test_command на уровне проекта — API
# ---------------------------------------------------------------------------
def test_patch_project_test_command(client):
"""KIN-ARCH-008: PATCH /api/projects/{id} с test_command сохраняет значение."""
r = client.patch("/api/projects/p1", json={"test_command": "pytest -v"})
assert r.status_code == 200
assert r.json()["test_command"] == "pytest -v"
def test_create_project_with_test_command(client):
"""KIN-ARCH-008: POST /api/projects с test_command сохраняет значение в БД."""
r = client.post("/api/projects", json={
"id": "p_tc",
"name": "TC Project",
"path": "/tmp/tc",
"test_command": "npm test",
})
assert r.status_code == 200
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT test_command FROM projects WHERE id = 'p_tc'").fetchone()
conn.close()
assert row is not None
assert row[0] == "npm test"
2026-03-17 15:49:37 +02:00
2026-03-17 19:30:15 +02:00
def test_patch_project_test_command_empty_string_stores_null(client):
"""KIN-101: PATCH с пустой строкой сохраняет NULL — включает режим авто-определения фреймворка.
Пустая строка сентинел для сброса test_command (аналогично deploy_command и другим полям).
"""
2026-03-17 15:49:37 +02:00
client.patch("/api/projects/p1", json={"test_command": "pytest -v"})
client.patch("/api/projects/p1", json={"test_command": ""})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT test_command FROM projects WHERE id = 'p1'").fetchone()
conn.close()
2026-03-17 19:30:15 +02:00
assert row[0] is None
2026-03-17 15:49:37 +02:00
def test_get_projects_includes_test_command(client):
"""KIN-ARCH-008: GET /api/projects возвращает test_command — нужно для инициализации фронтенда."""
client.patch("/api/projects/p1", json={"test_command": "cargo test"})
r = client.get("/api/projects")
assert r.status_code == 200
projects = r.json()
p1 = next((p for p in projects if p["id"] == "p1"), None)
assert p1 is not None
assert p1["test_command"] == "cargo test"
2026-03-17 15:59:43 +02:00
def test_patch_project_test_command_db_value_verified(client):
"""KIN-ARCH-008 decision #318: PATCH test_command сохраняет значение и в response, и в БД."""
r = client.patch("/api/projects/p1", json={"test_command": "pytest --tb=short"})
assert r.status_code == 200
assert r.json()["test_command"] == "pytest --tb=short"
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT test_command FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row[0] == "pytest --tb=short"
def test_patch_project_test_command_null_returns_400(client):
"""KIN-ARCH-008: PATCH с test_command=null (и без других полей) → 400 'Nothing to update'.
null трактуется как «поле не передано»: has_any=False 400.
Это документирует текущее поведение: нет способа сбросить test_command через PATCH null.
"""
client.patch("/api/projects/p1", json={"test_command": "npm test"})
r = client.patch("/api/projects/p1", json={"test_command": None})
assert r.status_code == 400
assert "Nothing to update" in r.json()["detail"]
2026-03-17 17:26:31 +02:00
# ---------------------------------------------------------------------------
# KIN-084: GET /api/pipelines/{pipeline_id}/logs
# ---------------------------------------------------------------------------
def _seed_pipeline(db_path, project_id="p1", task_id="P1-001") -> int:
"""Создаёт pipeline в БД и возвращает его id."""
from core.db import init_db
from core import models
conn = init_db(db_path)
pipe = models.create_pipeline(conn, task_id, project_id, "feature", [{"role": "dev"}])
conn.close()
return pipe["id"]
def test_get_pipeline_logs_returns_200_empty(client):
"""KIN-084: GET /api/pipelines/{id}/logs → 200 с пустым списком если логов нет."""
pid = _seed_pipeline(api_module.DB_PATH)
r = client.get(f"/api/pipelines/{pid}/logs")
assert r.status_code == 200
assert r.json() == []
def test_get_pipeline_logs_returns_entries(client):
"""KIN-084: GET /api/pipelines/{id}/logs → список записей с нужными полями."""
from core.db import init_db
from core import models
pid = _seed_pipeline(api_module.DB_PATH)
conn = init_db(api_module.DB_PATH)
models.write_log(conn, pid, "Pipeline started", extra={"steps_count": 3})
models.write_log(conn, pid, "Step 1 done")
conn.close()
r = client.get(f"/api/pipelines/{pid}/logs")
assert r.status_code == 200
logs = r.json()
assert len(logs) == 2
assert logs[0]["message"] == "Pipeline started"
assert isinstance(logs[0]["extra_json"], dict)
assert logs[0]["extra_json"]["steps_count"] == 3
assert logs[1]["message"] == "Step 1 done"
def test_get_pipeline_logs_since_id_filters(client):
"""KIN-084: ?since_id=N возвращает только id > N."""
from core.db import init_db
from core import models
pid = _seed_pipeline(api_module.DB_PATH, task_id="P1-001")
conn = init_db(api_module.DB_PATH)
e1 = models.write_log(conn, pid, "Entry 1")
models.write_log(conn, pid, "Entry 2")
models.write_log(conn, pid, "Entry 3")
conn.close()
r = client.get(f"/api/pipelines/{pid}/logs?since_id={e1['id']}")
assert r.status_code == 200
logs = r.json()
assert len(logs) == 2
assert all(log["id"] > e1["id"] for log in logs)
def test_get_pipeline_logs_not_found(client):
"""KIN-OBS-023: GET /api/pipelines/9999/logs → 200 [] (log collections return empty, not 404)."""
2026-03-17 17:26:31 +02:00
r = client.get("/api/pipelines/9999/logs")
assert r.status_code == 200
assert r.json() == []
def test_get_pipeline_logs_since_id_returns_pm_entries(client):
"""KIN-OBS-025: poll since_id возвращает PM-записи с role='pm' в extra_json."""
import json as _json
from core.db import init_db
from core import models
pid = _seed_pipeline(api_module.DB_PATH, task_id="P1-001")
conn = init_db(api_module.DB_PATH)
# Вставляем не-PM запись, которая будет отфильтрована
e0 = models.write_log(conn, pid, "Pipeline start")
# Вставляем PM-записи с ретроспективными ts
models.write_log(conn, pid, "PM start: task planning",
ts="2026-03-17T10:00:00", extra={"role": "pm"})
models.write_log(conn, pid, "PM done: 2 steps planned, success=True, cost=$0.0100, tokens=1000",
ts="2026-03-17T10:00:05",
extra={"role": "pm", "steps_count": 2, "tokens_used": 1000, "cost_usd": 0.01})
conn.close()
r = client.get(f"/api/pipelines/{pid}/logs?since_id={e0['id']}")
assert r.status_code == 200
logs = r.json()
assert len(logs) == 2
pm_roles = [log["extra_json"]["role"] for log in logs if log.get("extra_json")]
assert all(role == "pm" for role in pm_roles)
# Убеждаемся что PM done содержит метрики
done_log = next(log for log in logs if "PM done" in log["message"])
assert done_log["extra_json"]["steps_count"] == 2
assert done_log["extra_json"]["tokens_used"] == 1000
2026-03-17 20:21:52 +02:00
# ---------------------------------------------------------------------------
# KIN-103 — PATCH /api/projects/{id} — worktrees_enabled toggle
# ---------------------------------------------------------------------------
def test_patch_project_worktrees_enabled_true(client):
"""PATCH с worktrees_enabled=true → 200, поле установлено в 1."""
r = client.patch("/api/projects/p1", json={"worktrees_enabled": True})
assert r.status_code == 200
assert r.json()["worktrees_enabled"] == 1
def test_patch_project_worktrees_enabled_false(client):
"""После включения PATCH с worktrees_enabled=false → 200, поле установлено в 0."""
client.patch("/api/projects/p1", json={"worktrees_enabled": True})
r = client.patch("/api/projects/p1", json={"worktrees_enabled": False})
assert r.status_code == 200
assert r.json()["worktrees_enabled"] == 0
def test_patch_project_worktrees_enabled_true_persisted_via_sql(client):
"""После PATCH worktrees_enabled=True прямой SQL подтверждает значение 1."""
client.patch("/api/projects/p1", json={"worktrees_enabled": True})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT worktrees_enabled FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row is not None
assert row[0] == 1
def test_patch_project_worktrees_enabled_false_persisted_via_sql(client):
"""После PATCH worktrees_enabled=False прямой SQL подтверждает значение 0."""
client.patch("/api/projects/p1", json={"worktrees_enabled": True})
client.patch("/api/projects/p1", json={"worktrees_enabled": False})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT worktrees_enabled FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row is not None
assert row[0] == 0
def test_patch_project_worktrees_enabled_null_before_first_update(client):
"""Новый проект имеет worktrees_enabled=0 (DEFAULT) до первого обновления."""
client.post("/api/projects", json={"id": "p_new", "name": "New", "path": "/new"})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT worktrees_enabled FROM projects WHERE id = 'p_new'").fetchone()
conn.close()
assert row is not None
assert not row[0] # DEFAULT 0 или NULL — в любом случае falsy
def test_patch_project_worktrees_enabled_get_includes_field(client):
"""GET проекта включает worktrees_enabled в ответе."""
r = client.get("/api/projects/p1")
assert r.status_code == 200
data = r.json()
assert "worktrees_enabled" in data
def test_patch_project_worktrees_enabled_and_autocommit_together(client):
"""PATCH с worktrees_enabled и autocommit_enabled → оба поля обновлены."""
r = client.patch("/api/projects/p1", json={
"worktrees_enabled": True,
"autocommit_enabled": True,
})
assert r.status_code == 200
data = r.json()
assert data["worktrees_enabled"] == 1
assert data["autocommit_enabled"] == 1
def test_patch_project_worktrees_enabled_no_change_when_not_in_patch(client):
"""PATCH без worktrees_enabled → поле не меняется."""
# Сначала установим worktrees_enabled=1
client.patch("/api/projects/p1", json={"worktrees_enabled": True})
# Потом патч без worktrees_enabled не должен его менять
r = client.patch("/api/projects/p1", json={"autocommit_enabled": True})
assert r.status_code == 200
assert r.json()["worktrees_enabled"] == 1
def test_patch_project_worktrees_enabled_toggle_sequence(client):
"""Последовательные включение/выключение worktrees_enabled."""
# Включаем
r1 = client.patch("/api/projects/p1", json={"worktrees_enabled": True})
assert r1.json()["worktrees_enabled"] == 1
# Отключаем
r2 = client.patch("/api/projects/p1", json={"worktrees_enabled": False})
assert r2.json()["worktrees_enabled"] == 0
# Снова включаем
r3 = client.patch("/api/projects/p1", json={"worktrees_enabled": True})
assert r3.json()["worktrees_enabled"] == 1