kin/tests/test_kin_128_regression.py
2026-03-18 22:20:05 +02:00

409 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Regression tests for KIN-128 — Quality over speed.
Covers 4 mechanisms:
1. Analyst auto-injection on 2nd+ revision (revise_count >= 2)
2. Smoke tester: cannot_confirm → task blocked; confirmed → pipeline continues
3. Agent prompts contain mandatory proof block (backend_dev, frontend_dev, debugger)
4. Tech debt output creates child task in DB
"""
import json
import subprocess
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from core.db import init_db
from core import models
from agents.runner import _save_tech_debt_output, _TECH_DEBT_ROLES, run_pipeline
PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts"
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB with a seeded project and task."""
c = init_db(":memory:")
models.create_project(c, "p1", "P1", "/p1", tech_stack=["python"])
models.create_task(c, "P1-001", "p1", "Fix bug", brief={"text": "fix"})
yield c
c.close()
@pytest.fixture
def api_client(tmp_path):
"""FastAPI TestClient with isolated DB."""
import web.api as api_module
api_module.DB_PATH = tmp_path / "test.db"
from web.api import app
from fastapi.testclient import TestClient
client = TestClient(app)
client.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"})
client.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"})
return client
def _mock_success(output_data):
m = MagicMock()
m.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data
m.stderr = ""
m.returncode = 0
return m
# ===========================================================================
# 1. Analyst injection on 2nd+ revision
# ===========================================================================
class TestAnalystInjectionOnRevise:
def test_first_revise_does_not_inject_analyst(self, api_client):
"""revise_count=1 → analyst НЕ добавляется в шаги пайплайна."""
steps = [{"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
r = api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": "please fix", "steps": steps},
)
assert r.status_code == 200
data = r.json()
assert data["revise_count"] == 1
roles = [s["role"] for s in (data.get("pipeline_steps") or [])]
assert roles[0] != "analyst", "analyst не должен быть первым шагом при revise_count=1"
def test_second_revise_injects_analyst_as_first_step(self, api_client):
"""revise_count=2 → analyst автоматически первым шагом пайплайна."""
steps = [{"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
# first revise
api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": "first attempt", "steps": steps},
)
# second revise
r = api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": "second attempt", "steps": steps},
)
assert r.status_code == 200
data = r.json()
assert data["revise_count"] == 2
pipeline_steps = data.get("pipeline_steps") or []
assert pipeline_steps, "pipeline_steps не должен быть пустым"
assert pipeline_steps[0]["role"] == "analyst", (
f"Первый шаг должен быть analyst, получили: {pipeline_steps[0]['role']}"
)
def test_second_revise_analyst_not_duplicated_if_already_first(self, api_client):
"""Если analyst уже первый шаг — не добавлять второй."""
steps = [{"role": "analyst", "model": "sonnet"}, {"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
api_client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": steps})
r = api_client.post("/api/tasks/P1-001/revise", json={"comment": "r2", "steps": steps})
data = r.json()
pipeline_steps = data.get("pipeline_steps") or []
analyst_steps = [s for s in pipeline_steps if s["role"] == "analyst"]
assert len(analyst_steps) == 1, "analyst не должен дублироваться"
def test_third_revise_also_injects_analyst(self, api_client):
"""revise_count=3 тоже инжектирует analyst."""
steps = [{"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
for comment in ("r1", "r2", "r3"):
r = api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": comment, "steps": steps},
)
data = r.json()
assert data["revise_count"] == 3
assert (data.get("pipeline_steps") or [{}])[0]["role"] == "analyst"
# ===========================================================================
# 2. Smoke tester: cannot_confirm → blocked; confirmed → continues
# ===========================================================================
class TestSmokeTester:
def _make_pipeline(self, smoke_status: str):
"""Build single-step pipeline with smoke_tester returning given status."""
return [{"role": "smoke_tester", "model": "sonnet"}], {
"status": smoke_status,
"commands_run": [],
"evidence": "HTTP 200" if smoke_status == "confirmed" else None,
"reason": "нет доступа к prod" if smoke_status == "cannot_confirm" else None,
}
@patch("agents.runner.subprocess.run")
def test_smoke_tester_cannot_confirm_blocks_task(self, mock_run, conn):
"""smoke_tester с cannot_confirm → задача переходит в blocked."""
steps, smoke_output = self._make_pipeline("cannot_confirm")
mock_run.return_value = _mock_success(smoke_output)
result = run_pipeline(conn, "P1-001", steps)
assert result["success"] is False
assert result.get("blocked_by") == "smoke_tester"
task = models.get_task(conn, "P1-001")
assert task["status"] == "blocked"
assert task["blocked_agent_role"] == "smoke_tester"
assert "cannot_confirm" in (task.get("blocked_reason") or "")
@patch("agents.runner.subprocess.run")
def test_smoke_tester_confirmed_continues_pipeline(self, mock_run, conn):
"""smoke_tester с confirmed → пайплайн продолжается, задача не блокируется."""
steps = [
{"role": "smoke_tester", "model": "sonnet"},
]
_, smoke_output = self._make_pipeline("confirmed")
mock_run.return_value = _mock_success(smoke_output)
result = run_pipeline(conn, "P1-001", steps)
# Pipeline completes (only one step — smoke_tester — which passed)
assert result.get("blocked_by") != "smoke_tester"
task = models.get_task(conn, "P1-001")
assert task["status"] != "blocked"
@patch("agents.runner.subprocess.run")
def test_smoke_tester_confirmed_saves_result_to_db(self, mock_run, conn):
"""smoke_tester confirmed → результат сохраняется в smoke_test_result."""
steps = [{"role": "smoke_tester", "model": "sonnet"}]
smoke_output = {
"status": "confirmed",
"commands_run": ["curl https://example.com/health"],
"evidence": "HTTP 200 OK",
"reason": None,
}
mock_run.return_value = _mock_success(smoke_output)
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
result = task.get("smoke_test_result")
assert result is not None
assert result.get("status") == "confirmed"
@patch("agents.runner.subprocess.run")
def test_smoke_tester_cannot_confirm_saves_result_to_db(self, mock_run, conn):
"""smoke_tester cannot_confirm → результат также сохраняется в smoke_test_result."""
steps = [{"role": "smoke_tester", "model": "sonnet"}]
smoke_output = {
"status": "cannot_confirm",
"commands_run": [],
"evidence": None,
"reason": "prod недоступен",
}
mock_run.return_value = _mock_success(smoke_output)
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
result = task.get("smoke_test_result")
assert result is not None
assert result.get("status") == "cannot_confirm"
# ===========================================================================
# 3. Agent prompts contain mandatory proof block
# ===========================================================================
class TestAgentPromptsProofBlock:
"""Verify that agent prompts structurally require a proof block."""
@pytest.mark.parametrize("prompt_file,forbidden_keyword,proof_keyword", [
("backend_dev.md", "status: done", "proof"),
("frontend_dev.md", "status: done", "proof"),
("debugger.md", "status: fixed", "proof"),
])
def test_prompt_contains_proof_field(self, prompt_file, forbidden_keyword, proof_keyword):
"""Промпт содержит поле proof в разделе Output format."""
content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8")
assert proof_keyword in content, (
f"{prompt_file}: отсутствует поле '{proof_keyword}' в Output format"
)
@pytest.mark.parametrize("prompt_file,rule_phrase", [
("backend_dev.md", "ЗАПРЕЩЕНО"),
("frontend_dev.md", "ЗАПРЕЩЕНО"),
("debugger.md", "ЗАПРЕЩЕНО"),
])
def test_prompt_forbids_done_without_proof(self, prompt_file, rule_phrase):
"""Промпт явно запрещает возврат 'готово' без доказательства."""
content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8")
assert rule_phrase in content, (
f"{prompt_file}: не содержит правило ЗАПРЕЩЕНО о proof"
)
def test_smoke_tester_prompt_exists(self):
"""Промпт smoke_tester.md существует и содержит cannot_confirm."""
path = PROMPTS_DIR / "smoke_tester.md"
assert path.exists(), "smoke_tester.md не найден"
content = path.read_text(encoding="utf-8")
assert "cannot_confirm" in content
def test_analyst_prompt_exists(self):
"""Промпт analyst.md существует и содержит root_problem."""
path = PROMPTS_DIR / "analyst.md"
assert path.exists(), "analyst.md не найден"
content = path.read_text(encoding="utf-8")
assert "root_problem" in content
def test_backend_dev_proof_fields_complete(self):
"""backend_dev.md содержит все три поля внутри proof."""
content = (PROMPTS_DIR / "backend_dev.md").read_text(encoding="utf-8")
for field in ("what_was_done", "how_verified", "verification_result"):
assert field in content, f"backend_dev.md: отсутствует поле proof.{field}"
def test_debugger_proof_fields_complete(self):
"""debugger.md содержит все три поля внутри proof."""
content = (PROMPTS_DIR / "debugger.md").read_text(encoding="utf-8")
for field in ("what_was_fixed", "how_verified", "verification_result"):
assert field in content, f"debugger.md: отсутствует поле proof.{field}"
# ===========================================================================
# 4. Tech debt output creates child task in DB
# ===========================================================================
class TestTechDebtTaskCreation:
def test_tech_debt_roles_contains_expected_roles(self):
"""_TECH_DEBT_ROLES содержит все dev-роли + sysadmin."""
assert "backend_dev" in _TECH_DEBT_ROLES
assert "frontend_dev" in _TECH_DEBT_ROLES
assert "debugger" in _TECH_DEBT_ROLES
assert "sysadmin" in _TECH_DEBT_ROLES
def test_save_tech_debt_creates_child_task(self, conn):
"""tech_debt с description → создаётся дочерняя задача [TECH DEBT]."""
result = {
"success": True,
"raw_output": json.dumps({
"status": "done",
"tech_debt": {
"description": "Хардкод timeout в 30s нужно вынести в конфиг",
"reason_temporary": "MVP без конфига",
"proper_fix": "Добавить config.yaml с полем agent_timeout",
"category": "FIX",
},
}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is True
assert out["task_id"] is not None
child = models.get_task(conn, out["task_id"])
assert child is not None
assert "[TECH DEBT]" in child["title"]
assert "Хардкод timeout" in child["title"]
assert child["parent_task_id"] == "P1-001"
assert child["category"] == "FIX"
def test_save_tech_debt_without_description_creates_nothing(self, conn):
"""tech_debt без description → задача НЕ создаётся."""
result = {
"success": True,
"raw_output": json.dumps({
"status": "done",
"tech_debt": {"description": "", "reason_temporary": ""},
}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is False
def test_save_tech_debt_without_field_creates_nothing(self, conn):
"""Вывод без поля tech_debt → задача НЕ создаётся."""
result = {
"success": True,
"raw_output": json.dumps({"status": "done", "changes": []}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is False
def test_save_tech_debt_idempotent(self, conn):
"""Повторный вызов с тем же описанием — дубликат НЕ создаётся."""
raw = json.dumps({
"status": "done",
"tech_debt": {
"description": "Duplicate tech debt check",
"reason_temporary": "quick fix",
"proper_fix": "refactor",
},
})
result = {"success": True, "raw_output": raw}
out1 = _save_tech_debt_output(conn, "p1", "P1-001", result)
out2 = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out1["created"] is True
assert out2["created"] is False
assert out1["task_id"] == out2["task_id"]
def test_save_tech_debt_invalid_category_defaults_to_fix(self, conn):
"""Неизвестная category в tech_debt → дефолтная категория FIX."""
result = {
"success": True,
"raw_output": json.dumps({
"status": "done",
"tech_debt": {
"description": "Временный костыль",
"category": "UNKNOWN_GARBAGE",
},
}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is True
child = models.get_task(conn, out["task_id"])
assert child["category"] == "FIX"
@patch("agents.runner.subprocess.run")
def test_pipeline_creates_tech_debt_task_for_backend_dev(self, mock_run, conn):
"""run_pipeline для backend_dev с tech_debt → создаётся дочерняя задача."""
agent_output = {
"status": "done",
"changes": [],
"tech_debt": {
"description": "Pipeline integration: хардкод URL",
"reason_temporary": "нет конфига",
"proper_fix": "вынести в env",
"category": "FIX",
},
"proof": {
"what_was_done": "done",
"how_verified": "tests",
"verification_result": "ok",
},
}
mock_run.return_value = _mock_success(agent_output)
steps = [{"role": "backend_dev", "model": "sonnet"}]
run_pipeline(conn, "P1-001", steps)
# Check if a [TECH DEBT] child task was created
children = models.get_children(conn, "P1-001")
tech_debt_tasks = [t for t in children if "[TECH DEBT]" in (t.get("title") or "")]
assert len(tech_debt_tasks) >= 1, "Дочерняя tech debt задача должна быть создана"
# ===========================================================================
# 5. DB schema: smoke_test_result column exists
# ===========================================================================
def test_schema_tasks_has_smoke_test_result_column(conn):
"""KIN-128: таблица tasks содержит колонку smoke_test_result."""
cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
assert "smoke_test_result" in cols, "KIN-128: smoke_test_result должна быть в таблице tasks"
def test_update_task_smoke_test_result_roundtrip(conn):
"""smoke_test_result сохраняется и читается как dict."""
data = {"status": "confirmed", "evidence": "HTTP 200"}
models.update_task(conn, "P1-001", smoke_test_result=data)
task = models.get_task(conn, "P1-001")
assert task["smoke_test_result"] == data