2026-03-18 22:20:05 +02:00
|
|
|
|
"""Regression tests for KIN-128 — Quality over speed.
|
|
|
|
|
|
|
|
|
|
|
|
Covers 4 mechanisms:
|
|
|
|
|
|
1. Analyst auto-injection on 2nd+ revision (revise_count >= 2)
|
|
|
|
|
|
2. Smoke tester: cannot_confirm → task blocked; confirmed → pipeline continues
|
|
|
|
|
|
3. Agent prompts contain mandatory proof block (backend_dev, frontend_dev, debugger)
|
|
|
|
|
|
4. Tech debt output creates child task in DB
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
import subprocess
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
|
|
from core.db import init_db
|
|
|
|
|
|
from core import models
|
|
|
|
|
|
from agents.runner import _save_tech_debt_output, _TECH_DEBT_ROLES, run_pipeline
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Fixtures
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
|
def conn():
|
|
|
|
|
|
"""Fresh in-memory DB with a seeded project and task."""
|
|
|
|
|
|
c = init_db(":memory:")
|
|
|
|
|
|
models.create_project(c, "p1", "P1", "/p1", tech_stack=["python"])
|
|
|
|
|
|
models.create_task(c, "P1-001", "p1", "Fix bug", brief={"text": "fix"})
|
|
|
|
|
|
yield c
|
|
|
|
|
|
c.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
|
def api_client(tmp_path):
|
|
|
|
|
|
"""FastAPI TestClient with isolated DB."""
|
|
|
|
|
|
import web.api as api_module
|
|
|
|
|
|
api_module.DB_PATH = tmp_path / "test.db"
|
|
|
|
|
|
from web.api import app
|
|
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
client = TestClient(app)
|
|
|
|
|
|
client.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"})
|
|
|
|
|
|
client.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"})
|
|
|
|
|
|
return client
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _mock_success(output_data):
|
|
|
|
|
|
m = MagicMock()
|
|
|
|
|
|
m.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data
|
|
|
|
|
|
m.stderr = ""
|
|
|
|
|
|
m.returncode = 0
|
|
|
|
|
|
return m
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
# 1. Analyst injection on 2nd+ revision
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
|
|
|
|
|
|
class TestAnalystInjectionOnRevise:
|
|
|
|
|
|
def test_first_revise_does_not_inject_analyst(self, api_client):
|
|
|
|
|
|
"""revise_count=1 → analyst НЕ добавляется в шаги пайплайна."""
|
|
|
|
|
|
steps = [{"role": "backend_dev", "model": "sonnet"}]
|
|
|
|
|
|
with patch("web.api._launch_pipeline_subprocess"):
|
|
|
|
|
|
r = api_client.post(
|
|
|
|
|
|
"/api/tasks/P1-001/revise",
|
|
|
|
|
|
json={"comment": "please fix", "steps": steps},
|
|
|
|
|
|
)
|
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
|
data = r.json()
|
|
|
|
|
|
assert data["revise_count"] == 1
|
|
|
|
|
|
roles = [s["role"] for s in (data.get("pipeline_steps") or [])]
|
|
|
|
|
|
assert roles[0] != "analyst", "analyst не должен быть первым шагом при revise_count=1"
|
|
|
|
|
|
|
|
|
|
|
|
def test_second_revise_injects_analyst_as_first_step(self, api_client):
|
|
|
|
|
|
"""revise_count=2 → analyst автоматически первым шагом пайплайна."""
|
|
|
|
|
|
steps = [{"role": "backend_dev", "model": "sonnet"}]
|
|
|
|
|
|
with patch("web.api._launch_pipeline_subprocess"):
|
|
|
|
|
|
# first revise
|
|
|
|
|
|
api_client.post(
|
|
|
|
|
|
"/api/tasks/P1-001/revise",
|
|
|
|
|
|
json={"comment": "first attempt", "steps": steps},
|
|
|
|
|
|
)
|
|
|
|
|
|
# second revise
|
|
|
|
|
|
r = api_client.post(
|
|
|
|
|
|
"/api/tasks/P1-001/revise",
|
|
|
|
|
|
json={"comment": "second attempt", "steps": steps},
|
|
|
|
|
|
)
|
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
|
data = r.json()
|
|
|
|
|
|
assert data["revise_count"] == 2
|
|
|
|
|
|
pipeline_steps = data.get("pipeline_steps") or []
|
|
|
|
|
|
assert pipeline_steps, "pipeline_steps не должен быть пустым"
|
|
|
|
|
|
assert pipeline_steps[0]["role"] == "analyst", (
|
|
|
|
|
|
f"Первый шаг должен быть analyst, получили: {pipeline_steps[0]['role']}"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_second_revise_analyst_not_duplicated_if_already_first(self, api_client):
|
|
|
|
|
|
"""Если analyst уже первый шаг — не добавлять второй."""
|
|
|
|
|
|
steps = [{"role": "analyst", "model": "sonnet"}, {"role": "backend_dev", "model": "sonnet"}]
|
|
|
|
|
|
with patch("web.api._launch_pipeline_subprocess"):
|
|
|
|
|
|
api_client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": steps})
|
|
|
|
|
|
r = api_client.post("/api/tasks/P1-001/revise", json={"comment": "r2", "steps": steps})
|
|
|
|
|
|
data = r.json()
|
|
|
|
|
|
pipeline_steps = data.get("pipeline_steps") or []
|
|
|
|
|
|
analyst_steps = [s for s in pipeline_steps if s["role"] == "analyst"]
|
|
|
|
|
|
assert len(analyst_steps) == 1, "analyst не должен дублироваться"
|
|
|
|
|
|
|
|
|
|
|
|
def test_third_revise_also_injects_analyst(self, api_client):
|
|
|
|
|
|
"""revise_count=3 тоже инжектирует analyst."""
|
|
|
|
|
|
steps = [{"role": "backend_dev", "model": "sonnet"}]
|
|
|
|
|
|
with patch("web.api._launch_pipeline_subprocess"):
|
|
|
|
|
|
for comment in ("r1", "r2", "r3"):
|
|
|
|
|
|
r = api_client.post(
|
|
|
|
|
|
"/api/tasks/P1-001/revise",
|
|
|
|
|
|
json={"comment": comment, "steps": steps},
|
|
|
|
|
|
)
|
|
|
|
|
|
data = r.json()
|
|
|
|
|
|
assert data["revise_count"] == 3
|
|
|
|
|
|
assert (data.get("pipeline_steps") or [{}])[0]["role"] == "analyst"
|
|
|
|
|
|
|
2026-03-18 22:36:24 +02:00
|
|
|
|
@patch("agents.runner.subprocess.run")
|
|
|
|
|
|
def test_pipeline_with_analyst_first_executes_analyst_as_first_agent(self, mock_run, conn):
|
|
|
|
|
|
"""Интеграционный тест: run_pipeline с analyst первым шагом запускает analyst первым.
|
|
|
|
|
|
|
|
|
|
|
|
Проверяет РЕАЛЬНОЕ выполнение пайплайна через run_pipeline() напрямую,
|
|
|
|
|
|
без мока _launch_pipeline_subprocess — в отличие от других тестов класса,
|
|
|
|
|
|
которые проверяют только JSON-ответ API.
|
|
|
|
|
|
|
|
|
|
|
|
Структура вызовов subprocess.run:
|
|
|
|
|
|
call[0] — check_claude_auth (cmd: [claude, "-p", "ok", ...])
|
|
|
|
|
|
call[1] — первый агент пайплайна (должен быть analyst)
|
|
|
|
|
|
"""
|
|
|
|
|
|
analyst_output = {
|
|
|
|
|
|
"status": "done",
|
|
|
|
|
|
"root_problem": "Предыдущий подход не учитывал корневую причину X",
|
|
|
|
|
|
"suggested_approach": "Переработать алгоритм с учётом Y",
|
|
|
|
|
|
"confidence": "high",
|
|
|
|
|
|
}
|
|
|
|
|
|
mock_run.return_value = _mock_success(analyst_output)
|
|
|
|
|
|
|
|
|
|
|
|
# Шаги, которые revise_task() строит при ревизии 2+ (после инжекции analyst)
|
|
|
|
|
|
analyst_step = {
|
|
|
|
|
|
"role": "analyst",
|
|
|
|
|
|
"model": "sonnet",
|
|
|
|
|
|
"brief": "Задача вернулась на ревизию №2. Проведи анализ.",
|
|
|
|
|
|
}
|
|
|
|
|
|
steps = [analyst_step]
|
|
|
|
|
|
|
|
|
|
|
|
result = run_pipeline(conn, "P1-001", steps)
|
|
|
|
|
|
|
|
|
|
|
|
# conftest._mock_check_claude_auth авто-мокирует check_claude_auth для всех тестов,
|
|
|
|
|
|
# поэтому auth-check НЕ занимает slot в call_args_list.
|
|
|
|
|
|
# Ожидаемый порядок вызовов subprocess.run:
|
|
|
|
|
|
# call[0] = analyst (первый шаг пайплайна)
|
|
|
|
|
|
# call[1] = learning extractor (авто-обучение после завершения пайплайна)
|
|
|
|
|
|
assert mock_run.call_count >= 1, (
|
|
|
|
|
|
f"Ожидалось >= 1 вызова subprocess.run (analyst), "
|
|
|
|
|
|
f"получили {mock_run.call_count}"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Первый вызов (индекс 0): cmd = [claude, "-p", prompt, ...]
|
|
|
|
|
|
first_agent_cmd = list(mock_run.call_args_list[0].args[0])
|
|
|
|
|
|
assert "-p" in first_agent_cmd, "Claude CLI cmd должна содержать -p"
|
|
|
|
|
|
prompt_idx = first_agent_cmd.index("-p") + 1
|
|
|
|
|
|
first_prompt = first_agent_cmd[prompt_idx]
|
|
|
|
|
|
|
|
|
|
|
|
# agents/prompts/analyst.md начинается с "You are an Analyst for the Kin..."
|
|
|
|
|
|
assert "Analyst" in first_prompt, (
|
|
|
|
|
|
f"Первый выполненный агент должен быть analyst. "
|
|
|
|
|
|
f"Промпт начинается с: {first_prompt[:200]}"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2026-03-18 22:20:05 +02:00
|
|
|
|
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
# 2. Smoke tester: cannot_confirm → blocked; confirmed → continues
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
|
|
|
|
|
|
class TestSmokeTester:
|
|
|
|
|
|
def _make_pipeline(self, smoke_status: str):
|
|
|
|
|
|
"""Build single-step pipeline with smoke_tester returning given status."""
|
|
|
|
|
|
return [{"role": "smoke_tester", "model": "sonnet"}], {
|
|
|
|
|
|
"status": smoke_status,
|
|
|
|
|
|
"commands_run": [],
|
|
|
|
|
|
"evidence": "HTTP 200" if smoke_status == "confirmed" else None,
|
|
|
|
|
|
"reason": "нет доступа к prod" if smoke_status == "cannot_confirm" else None,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@patch("agents.runner.subprocess.run")
|
|
|
|
|
|
def test_smoke_tester_cannot_confirm_blocks_task(self, mock_run, conn):
|
|
|
|
|
|
"""smoke_tester с cannot_confirm → задача переходит в blocked."""
|
|
|
|
|
|
steps, smoke_output = self._make_pipeline("cannot_confirm")
|
|
|
|
|
|
mock_run.return_value = _mock_success(smoke_output)
|
|
|
|
|
|
|
|
|
|
|
|
result = run_pipeline(conn, "P1-001", steps)
|
|
|
|
|
|
|
|
|
|
|
|
assert result["success"] is False
|
|
|
|
|
|
assert result.get("blocked_by") == "smoke_tester"
|
|
|
|
|
|
|
|
|
|
|
|
task = models.get_task(conn, "P1-001")
|
|
|
|
|
|
assert task["status"] == "blocked"
|
|
|
|
|
|
assert task["blocked_agent_role"] == "smoke_tester"
|
|
|
|
|
|
assert "cannot_confirm" in (task.get("blocked_reason") or "")
|
|
|
|
|
|
|
|
|
|
|
|
@patch("agents.runner.subprocess.run")
|
|
|
|
|
|
def test_smoke_tester_confirmed_continues_pipeline(self, mock_run, conn):
|
|
|
|
|
|
"""smoke_tester с confirmed → пайплайн продолжается, задача не блокируется."""
|
|
|
|
|
|
steps = [
|
|
|
|
|
|
{"role": "smoke_tester", "model": "sonnet"},
|
|
|
|
|
|
]
|
|
|
|
|
|
_, smoke_output = self._make_pipeline("confirmed")
|
|
|
|
|
|
mock_run.return_value = _mock_success(smoke_output)
|
|
|
|
|
|
|
|
|
|
|
|
result = run_pipeline(conn, "P1-001", steps)
|
|
|
|
|
|
|
|
|
|
|
|
# Pipeline completes (only one step — smoke_tester — which passed)
|
|
|
|
|
|
assert result.get("blocked_by") != "smoke_tester"
|
|
|
|
|
|
task = models.get_task(conn, "P1-001")
|
|
|
|
|
|
assert task["status"] != "blocked"
|
|
|
|
|
|
|
|
|
|
|
|
@patch("agents.runner.subprocess.run")
|
|
|
|
|
|
def test_smoke_tester_confirmed_saves_result_to_db(self, mock_run, conn):
|
|
|
|
|
|
"""smoke_tester confirmed → результат сохраняется в smoke_test_result."""
|
|
|
|
|
|
steps = [{"role": "smoke_tester", "model": "sonnet"}]
|
|
|
|
|
|
smoke_output = {
|
|
|
|
|
|
"status": "confirmed",
|
|
|
|
|
|
"commands_run": ["curl https://example.com/health"],
|
|
|
|
|
|
"evidence": "HTTP 200 OK",
|
|
|
|
|
|
"reason": None,
|
|
|
|
|
|
}
|
|
|
|
|
|
mock_run.return_value = _mock_success(smoke_output)
|
|
|
|
|
|
|
|
|
|
|
|
run_pipeline(conn, "P1-001", steps)
|
|
|
|
|
|
|
|
|
|
|
|
task = models.get_task(conn, "P1-001")
|
|
|
|
|
|
result = task.get("smoke_test_result")
|
|
|
|
|
|
assert result is not None
|
|
|
|
|
|
assert result.get("status") == "confirmed"
|
|
|
|
|
|
|
|
|
|
|
|
@patch("agents.runner.subprocess.run")
|
|
|
|
|
|
def test_smoke_tester_cannot_confirm_saves_result_to_db(self, mock_run, conn):
|
|
|
|
|
|
"""smoke_tester cannot_confirm → результат также сохраняется в smoke_test_result."""
|
|
|
|
|
|
steps = [{"role": "smoke_tester", "model": "sonnet"}]
|
|
|
|
|
|
smoke_output = {
|
|
|
|
|
|
"status": "cannot_confirm",
|
|
|
|
|
|
"commands_run": [],
|
|
|
|
|
|
"evidence": None,
|
|
|
|
|
|
"reason": "prod недоступен",
|
|
|
|
|
|
}
|
|
|
|
|
|
mock_run.return_value = _mock_success(smoke_output)
|
|
|
|
|
|
|
|
|
|
|
|
run_pipeline(conn, "P1-001", steps)
|
|
|
|
|
|
|
|
|
|
|
|
task = models.get_task(conn, "P1-001")
|
|
|
|
|
|
result = task.get("smoke_test_result")
|
|
|
|
|
|
assert result is not None
|
|
|
|
|
|
assert result.get("status") == "cannot_confirm"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
# 3. Agent prompts contain mandatory proof block
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
|
|
|
|
|
|
class TestAgentPromptsProofBlock:
|
|
|
|
|
|
"""Verify that agent prompts structurally require a proof block."""
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("prompt_file,forbidden_keyword,proof_keyword", [
|
|
|
|
|
|
("backend_dev.md", "status: done", "proof"),
|
|
|
|
|
|
("frontend_dev.md", "status: done", "proof"),
|
|
|
|
|
|
("debugger.md", "status: fixed", "proof"),
|
|
|
|
|
|
])
|
|
|
|
|
|
def test_prompt_contains_proof_field(self, prompt_file, forbidden_keyword, proof_keyword):
|
|
|
|
|
|
"""Промпт содержит поле proof в разделе Output format."""
|
|
|
|
|
|
content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8")
|
|
|
|
|
|
assert proof_keyword in content, (
|
|
|
|
|
|
f"{prompt_file}: отсутствует поле '{proof_keyword}' в Output format"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("prompt_file,rule_phrase", [
|
|
|
|
|
|
("backend_dev.md", "ЗАПРЕЩЕНО"),
|
|
|
|
|
|
("frontend_dev.md", "ЗАПРЕЩЕНО"),
|
|
|
|
|
|
("debugger.md", "ЗАПРЕЩЕНО"),
|
|
|
|
|
|
])
|
|
|
|
|
|
def test_prompt_forbids_done_without_proof(self, prompt_file, rule_phrase):
|
|
|
|
|
|
"""Промпт явно запрещает возврат 'готово' без доказательства."""
|
|
|
|
|
|
content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8")
|
|
|
|
|
|
assert rule_phrase in content, (
|
|
|
|
|
|
f"{prompt_file}: не содержит правило ЗАПРЕЩЕНО о proof"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_smoke_tester_prompt_exists(self):
|
|
|
|
|
|
"""Промпт smoke_tester.md существует и содержит cannot_confirm."""
|
|
|
|
|
|
path = PROMPTS_DIR / "smoke_tester.md"
|
|
|
|
|
|
assert path.exists(), "smoke_tester.md не найден"
|
|
|
|
|
|
content = path.read_text(encoding="utf-8")
|
|
|
|
|
|
assert "cannot_confirm" in content
|
|
|
|
|
|
|
|
|
|
|
|
def test_analyst_prompt_exists(self):
|
|
|
|
|
|
"""Промпт analyst.md существует и содержит root_problem."""
|
|
|
|
|
|
path = PROMPTS_DIR / "analyst.md"
|
|
|
|
|
|
assert path.exists(), "analyst.md не найден"
|
|
|
|
|
|
content = path.read_text(encoding="utf-8")
|
|
|
|
|
|
assert "root_problem" in content
|
|
|
|
|
|
|
|
|
|
|
|
def test_backend_dev_proof_fields_complete(self):
|
|
|
|
|
|
"""backend_dev.md содержит все три поля внутри proof."""
|
|
|
|
|
|
content = (PROMPTS_DIR / "backend_dev.md").read_text(encoding="utf-8")
|
|
|
|
|
|
for field in ("what_was_done", "how_verified", "verification_result"):
|
|
|
|
|
|
assert field in content, f"backend_dev.md: отсутствует поле proof.{field}"
|
|
|
|
|
|
|
|
|
|
|
|
def test_debugger_proof_fields_complete(self):
|
|
|
|
|
|
"""debugger.md содержит все три поля внутри proof."""
|
|
|
|
|
|
content = (PROMPTS_DIR / "debugger.md").read_text(encoding="utf-8")
|
|
|
|
|
|
for field in ("what_was_fixed", "how_verified", "verification_result"):
|
|
|
|
|
|
assert field in content, f"debugger.md: отсутствует поле proof.{field}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
# 4. Tech debt output creates child task in DB
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
|
|
|
|
|
|
class TestTechDebtTaskCreation:
|
|
|
|
|
|
def test_tech_debt_roles_contains_expected_roles(self):
|
|
|
|
|
|
"""_TECH_DEBT_ROLES содержит все dev-роли + sysadmin."""
|
|
|
|
|
|
assert "backend_dev" in _TECH_DEBT_ROLES
|
|
|
|
|
|
assert "frontend_dev" in _TECH_DEBT_ROLES
|
|
|
|
|
|
assert "debugger" in _TECH_DEBT_ROLES
|
|
|
|
|
|
assert "sysadmin" in _TECH_DEBT_ROLES
|
|
|
|
|
|
|
|
|
|
|
|
def test_save_tech_debt_creates_child_task(self, conn):
|
|
|
|
|
|
"""tech_debt с description → создаётся дочерняя задача [TECH DEBT]."""
|
|
|
|
|
|
result = {
|
|
|
|
|
|
"success": True,
|
|
|
|
|
|
"raw_output": json.dumps({
|
|
|
|
|
|
"status": "done",
|
|
|
|
|
|
"tech_debt": {
|
|
|
|
|
|
"description": "Хардкод timeout в 30s нужно вынести в конфиг",
|
|
|
|
|
|
"reason_temporary": "MVP без конфига",
|
|
|
|
|
|
"proper_fix": "Добавить config.yaml с полем agent_timeout",
|
|
|
|
|
|
"category": "FIX",
|
|
|
|
|
|
},
|
|
|
|
|
|
}),
|
|
|
|
|
|
}
|
|
|
|
|
|
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
|
|
|
|
|
|
|
|
|
|
|
|
assert out["created"] is True
|
|
|
|
|
|
assert out["task_id"] is not None
|
|
|
|
|
|
|
|
|
|
|
|
child = models.get_task(conn, out["task_id"])
|
|
|
|
|
|
assert child is not None
|
|
|
|
|
|
assert "[TECH DEBT]" in child["title"]
|
|
|
|
|
|
assert "Хардкод timeout" in child["title"]
|
|
|
|
|
|
assert child["parent_task_id"] == "P1-001"
|
|
|
|
|
|
assert child["category"] == "FIX"
|
|
|
|
|
|
|
|
|
|
|
|
def test_save_tech_debt_without_description_creates_nothing(self, conn):
|
|
|
|
|
|
"""tech_debt без description → задача НЕ создаётся."""
|
|
|
|
|
|
result = {
|
|
|
|
|
|
"success": True,
|
|
|
|
|
|
"raw_output": json.dumps({
|
|
|
|
|
|
"status": "done",
|
|
|
|
|
|
"tech_debt": {"description": "", "reason_temporary": ""},
|
|
|
|
|
|
}),
|
|
|
|
|
|
}
|
|
|
|
|
|
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
|
|
|
|
|
|
assert out["created"] is False
|
|
|
|
|
|
|
|
|
|
|
|
def test_save_tech_debt_without_field_creates_nothing(self, conn):
|
|
|
|
|
|
"""Вывод без поля tech_debt → задача НЕ создаётся."""
|
|
|
|
|
|
result = {
|
|
|
|
|
|
"success": True,
|
|
|
|
|
|
"raw_output": json.dumps({"status": "done", "changes": []}),
|
|
|
|
|
|
}
|
|
|
|
|
|
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
|
|
|
|
|
|
assert out["created"] is False
|
|
|
|
|
|
|
|
|
|
|
|
def test_save_tech_debt_idempotent(self, conn):
|
|
|
|
|
|
"""Повторный вызов с тем же описанием — дубликат НЕ создаётся."""
|
|
|
|
|
|
raw = json.dumps({
|
|
|
|
|
|
"status": "done",
|
|
|
|
|
|
"tech_debt": {
|
|
|
|
|
|
"description": "Duplicate tech debt check",
|
|
|
|
|
|
"reason_temporary": "quick fix",
|
|
|
|
|
|
"proper_fix": "refactor",
|
|
|
|
|
|
},
|
|
|
|
|
|
})
|
|
|
|
|
|
result = {"success": True, "raw_output": raw}
|
|
|
|
|
|
|
|
|
|
|
|
out1 = _save_tech_debt_output(conn, "p1", "P1-001", result)
|
|
|
|
|
|
out2 = _save_tech_debt_output(conn, "p1", "P1-001", result)
|
|
|
|
|
|
|
|
|
|
|
|
assert out1["created"] is True
|
|
|
|
|
|
assert out2["created"] is False
|
|
|
|
|
|
assert out1["task_id"] == out2["task_id"]
|
|
|
|
|
|
|
|
|
|
|
|
def test_save_tech_debt_invalid_category_defaults_to_fix(self, conn):
|
|
|
|
|
|
"""Неизвестная category в tech_debt → дефолтная категория FIX."""
|
|
|
|
|
|
result = {
|
|
|
|
|
|
"success": True,
|
|
|
|
|
|
"raw_output": json.dumps({
|
|
|
|
|
|
"status": "done",
|
|
|
|
|
|
"tech_debt": {
|
|
|
|
|
|
"description": "Временный костыль",
|
|
|
|
|
|
"category": "UNKNOWN_GARBAGE",
|
|
|
|
|
|
},
|
|
|
|
|
|
}),
|
|
|
|
|
|
}
|
|
|
|
|
|
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
|
|
|
|
|
|
assert out["created"] is True
|
|
|
|
|
|
child = models.get_task(conn, out["task_id"])
|
|
|
|
|
|
assert child["category"] == "FIX"
|
|
|
|
|
|
|
|
|
|
|
|
@patch("agents.runner.subprocess.run")
|
|
|
|
|
|
def test_pipeline_creates_tech_debt_task_for_backend_dev(self, mock_run, conn):
|
|
|
|
|
|
"""run_pipeline для backend_dev с tech_debt → создаётся дочерняя задача."""
|
|
|
|
|
|
agent_output = {
|
|
|
|
|
|
"status": "done",
|
|
|
|
|
|
"changes": [],
|
|
|
|
|
|
"tech_debt": {
|
|
|
|
|
|
"description": "Pipeline integration: хардкод URL",
|
|
|
|
|
|
"reason_temporary": "нет конфига",
|
|
|
|
|
|
"proper_fix": "вынести в env",
|
|
|
|
|
|
"category": "FIX",
|
|
|
|
|
|
},
|
|
|
|
|
|
"proof": {
|
|
|
|
|
|
"what_was_done": "done",
|
|
|
|
|
|
"how_verified": "tests",
|
|
|
|
|
|
"verification_result": "ok",
|
|
|
|
|
|
},
|
|
|
|
|
|
}
|
|
|
|
|
|
mock_run.return_value = _mock_success(agent_output)
|
|
|
|
|
|
|
|
|
|
|
|
steps = [{"role": "backend_dev", "model": "sonnet"}]
|
|
|
|
|
|
run_pipeline(conn, "P1-001", steps)
|
|
|
|
|
|
|
|
|
|
|
|
# Check if a [TECH DEBT] child task was created
|
|
|
|
|
|
children = models.get_children(conn, "P1-001")
|
|
|
|
|
|
tech_debt_tasks = [t for t in children if "[TECH DEBT]" in (t.get("title") or "")]
|
|
|
|
|
|
assert len(tech_debt_tasks) >= 1, "Дочерняя tech debt задача должна быть создана"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
# 5. DB schema: smoke_test_result column exists
|
|
|
|
|
|
# ===========================================================================
|
|
|
|
|
|
|
|
|
|
|
|
def test_schema_tasks_has_smoke_test_result_column(conn):
|
|
|
|
|
|
"""KIN-128: таблица tasks содержит колонку smoke_test_result."""
|
|
|
|
|
|
cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
|
|
|
|
|
|
assert "smoke_test_result" in cols, "KIN-128: smoke_test_result должна быть в таблице tasks"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_update_task_smoke_test_result_roundtrip(conn):
|
|
|
|
|
|
"""smoke_test_result сохраняется и читается как dict."""
|
|
|
|
|
|
data = {"status": "confirmed", "evidence": "HTTP 200"}
|
|
|
|
|
|
models.update_task(conn, "P1-001", smoke_test_result=data)
|
|
|
|
|
|
task = models.get_task(conn, "P1-001")
|
|
|
|
|
|
assert task["smoke_test_result"] == data
|