kin: auto-commit after pipeline

This commit is contained in:
Gros Frumos 2026-03-18 22:20:05 +02:00
parent 710b284e3f
commit f1c868e335
3 changed files with 490 additions and 2 deletions

View file

@ -484,11 +484,15 @@ def get_decisions(
if category: if category:
query += " AND d.category = ?" query += " AND d.category = ?"
params.append(category) params.append(category)
if types: if types is not None:
if not types:
return []
placeholders = ", ".join("?" for _ in types) placeholders = ", ".join("?" for _ in types)
query += f" AND d.type IN ({placeholders})" query += f" AND d.type IN ({placeholders})"
params.extend(types) params.extend(types)
if tags: if tags is not None:
if not tags:
return []
query += """ AND d.id IN ( query += """ AND d.id IN (
SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t
WHERE t.value IN ({}) WHERE t.value IN ({})

View file

@ -0,0 +1,409 @@
"""Regression tests for KIN-128 — Quality over speed.
Covers 4 mechanisms:
1. Analyst auto-injection on 2nd+ revision (revise_count >= 2)
2. Smoke tester: cannot_confirm task blocked; confirmed pipeline continues
3. Agent prompts contain mandatory proof block (backend_dev, frontend_dev, debugger)
4. Tech debt output creates child task in DB
"""
import json
import subprocess
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from core.db import init_db
from core import models
from agents.runner import _save_tech_debt_output, _TECH_DEBT_ROLES, run_pipeline
PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts"
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB with a seeded project and task."""
c = init_db(":memory:")
models.create_project(c, "p1", "P1", "/p1", tech_stack=["python"])
models.create_task(c, "P1-001", "p1", "Fix bug", brief={"text": "fix"})
yield c
c.close()
@pytest.fixture
def api_client(tmp_path):
"""FastAPI TestClient with isolated DB."""
import web.api as api_module
api_module.DB_PATH = tmp_path / "test.db"
from web.api import app
from fastapi.testclient import TestClient
client = TestClient(app)
client.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"})
client.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"})
return client
def _mock_success(output_data):
m = MagicMock()
m.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data
m.stderr = ""
m.returncode = 0
return m
# ===========================================================================
# 1. Analyst injection on 2nd+ revision
# ===========================================================================
class TestAnalystInjectionOnRevise:
def test_first_revise_does_not_inject_analyst(self, api_client):
"""revise_count=1 → analyst НЕ добавляется в шаги пайплайна."""
steps = [{"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
r = api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": "please fix", "steps": steps},
)
assert r.status_code == 200
data = r.json()
assert data["revise_count"] == 1
roles = [s["role"] for s in (data.get("pipeline_steps") or [])]
assert roles[0] != "analyst", "analyst не должен быть первым шагом при revise_count=1"
def test_second_revise_injects_analyst_as_first_step(self, api_client):
"""revise_count=2 → analyst автоматически первым шагом пайплайна."""
steps = [{"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
# first revise
api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": "first attempt", "steps": steps},
)
# second revise
r = api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": "second attempt", "steps": steps},
)
assert r.status_code == 200
data = r.json()
assert data["revise_count"] == 2
pipeline_steps = data.get("pipeline_steps") or []
assert pipeline_steps, "pipeline_steps не должен быть пустым"
assert pipeline_steps[0]["role"] == "analyst", (
f"Первый шаг должен быть analyst, получили: {pipeline_steps[0]['role']}"
)
def test_second_revise_analyst_not_duplicated_if_already_first(self, api_client):
"""Если analyst уже первый шаг — не добавлять второй."""
steps = [{"role": "analyst", "model": "sonnet"}, {"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
api_client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": steps})
r = api_client.post("/api/tasks/P1-001/revise", json={"comment": "r2", "steps": steps})
data = r.json()
pipeline_steps = data.get("pipeline_steps") or []
analyst_steps = [s for s in pipeline_steps if s["role"] == "analyst"]
assert len(analyst_steps) == 1, "analyst не должен дублироваться"
def test_third_revise_also_injects_analyst(self, api_client):
"""revise_count=3 тоже инжектирует analyst."""
steps = [{"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
for comment in ("r1", "r2", "r3"):
r = api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": comment, "steps": steps},
)
data = r.json()
assert data["revise_count"] == 3
assert (data.get("pipeline_steps") or [{}])[0]["role"] == "analyst"
# ===========================================================================
# 2. Smoke tester: cannot_confirm → blocked; confirmed → continues
# ===========================================================================
class TestSmokeTester:
def _make_pipeline(self, smoke_status: str):
"""Build single-step pipeline with smoke_tester returning given status."""
return [{"role": "smoke_tester", "model": "sonnet"}], {
"status": smoke_status,
"commands_run": [],
"evidence": "HTTP 200" if smoke_status == "confirmed" else None,
"reason": "нет доступа к prod" if smoke_status == "cannot_confirm" else None,
}
@patch("agents.runner.subprocess.run")
def test_smoke_tester_cannot_confirm_blocks_task(self, mock_run, conn):
"""smoke_tester с cannot_confirm → задача переходит в blocked."""
steps, smoke_output = self._make_pipeline("cannot_confirm")
mock_run.return_value = _mock_success(smoke_output)
result = run_pipeline(conn, "P1-001", steps)
assert result["success"] is False
assert result.get("blocked_by") == "smoke_tester"
task = models.get_task(conn, "P1-001")
assert task["status"] == "blocked"
assert task["blocked_agent_role"] == "smoke_tester"
assert "cannot_confirm" in (task.get("blocked_reason") or "")
@patch("agents.runner.subprocess.run")
def test_smoke_tester_confirmed_continues_pipeline(self, mock_run, conn):
"""smoke_tester с confirmed → пайплайн продолжается, задача не блокируется."""
steps = [
{"role": "smoke_tester", "model": "sonnet"},
]
_, smoke_output = self._make_pipeline("confirmed")
mock_run.return_value = _mock_success(smoke_output)
result = run_pipeline(conn, "P1-001", steps)
# Pipeline completes (only one step — smoke_tester — which passed)
assert result.get("blocked_by") != "smoke_tester"
task = models.get_task(conn, "P1-001")
assert task["status"] != "blocked"
@patch("agents.runner.subprocess.run")
def test_smoke_tester_confirmed_saves_result_to_db(self, mock_run, conn):
"""smoke_tester confirmed → результат сохраняется в smoke_test_result."""
steps = [{"role": "smoke_tester", "model": "sonnet"}]
smoke_output = {
"status": "confirmed",
"commands_run": ["curl https://example.com/health"],
"evidence": "HTTP 200 OK",
"reason": None,
}
mock_run.return_value = _mock_success(smoke_output)
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
result = task.get("smoke_test_result")
assert result is not None
assert result.get("status") == "confirmed"
@patch("agents.runner.subprocess.run")
def test_smoke_tester_cannot_confirm_saves_result_to_db(self, mock_run, conn):
"""smoke_tester cannot_confirm → результат также сохраняется в smoke_test_result."""
steps = [{"role": "smoke_tester", "model": "sonnet"}]
smoke_output = {
"status": "cannot_confirm",
"commands_run": [],
"evidence": None,
"reason": "prod недоступен",
}
mock_run.return_value = _mock_success(smoke_output)
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
result = task.get("smoke_test_result")
assert result is not None
assert result.get("status") == "cannot_confirm"
# ===========================================================================
# 3. Agent prompts contain mandatory proof block
# ===========================================================================
class TestAgentPromptsProofBlock:
"""Verify that agent prompts structurally require a proof block."""
@pytest.mark.parametrize("prompt_file,forbidden_keyword,proof_keyword", [
("backend_dev.md", "status: done", "proof"),
("frontend_dev.md", "status: done", "proof"),
("debugger.md", "status: fixed", "proof"),
])
def test_prompt_contains_proof_field(self, prompt_file, forbidden_keyword, proof_keyword):
"""Промпт содержит поле proof в разделе Output format."""
content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8")
assert proof_keyword in content, (
f"{prompt_file}: отсутствует поле '{proof_keyword}' в Output format"
)
@pytest.mark.parametrize("prompt_file,rule_phrase", [
("backend_dev.md", "ЗАПРЕЩЕНО"),
("frontend_dev.md", "ЗАПРЕЩЕНО"),
("debugger.md", "ЗАПРЕЩЕНО"),
])
def test_prompt_forbids_done_without_proof(self, prompt_file, rule_phrase):
"""Промпт явно запрещает возврат 'готово' без доказательства."""
content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8")
assert rule_phrase in content, (
f"{prompt_file}: не содержит правило ЗАПРЕЩЕНО о proof"
)
def test_smoke_tester_prompt_exists(self):
"""Промпт smoke_tester.md существует и содержит cannot_confirm."""
path = PROMPTS_DIR / "smoke_tester.md"
assert path.exists(), "smoke_tester.md не найден"
content = path.read_text(encoding="utf-8")
assert "cannot_confirm" in content
def test_analyst_prompt_exists(self):
"""Промпт analyst.md существует и содержит root_problem."""
path = PROMPTS_DIR / "analyst.md"
assert path.exists(), "analyst.md не найден"
content = path.read_text(encoding="utf-8")
assert "root_problem" in content
def test_backend_dev_proof_fields_complete(self):
"""backend_dev.md содержит все три поля внутри proof."""
content = (PROMPTS_DIR / "backend_dev.md").read_text(encoding="utf-8")
for field in ("what_was_done", "how_verified", "verification_result"):
assert field in content, f"backend_dev.md: отсутствует поле proof.{field}"
def test_debugger_proof_fields_complete(self):
"""debugger.md содержит все три поля внутри proof."""
content = (PROMPTS_DIR / "debugger.md").read_text(encoding="utf-8")
for field in ("what_was_fixed", "how_verified", "verification_result"):
assert field in content, f"debugger.md: отсутствует поле proof.{field}"
# ===========================================================================
# 4. Tech debt output creates child task in DB
# ===========================================================================
class TestTechDebtTaskCreation:
def test_tech_debt_roles_contains_expected_roles(self):
"""_TECH_DEBT_ROLES содержит все dev-роли + sysadmin."""
assert "backend_dev" in _TECH_DEBT_ROLES
assert "frontend_dev" in _TECH_DEBT_ROLES
assert "debugger" in _TECH_DEBT_ROLES
assert "sysadmin" in _TECH_DEBT_ROLES
def test_save_tech_debt_creates_child_task(self, conn):
"""tech_debt с description → создаётся дочерняя задача [TECH DEBT]."""
result = {
"success": True,
"raw_output": json.dumps({
"status": "done",
"tech_debt": {
"description": "Хардкод timeout в 30s нужно вынести в конфиг",
"reason_temporary": "MVP без конфига",
"proper_fix": "Добавить config.yaml с полем agent_timeout",
"category": "FIX",
},
}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is True
assert out["task_id"] is not None
child = models.get_task(conn, out["task_id"])
assert child is not None
assert "[TECH DEBT]" in child["title"]
assert "Хардкод timeout" in child["title"]
assert child["parent_task_id"] == "P1-001"
assert child["category"] == "FIX"
def test_save_tech_debt_without_description_creates_nothing(self, conn):
"""tech_debt без description → задача НЕ создаётся."""
result = {
"success": True,
"raw_output": json.dumps({
"status": "done",
"tech_debt": {"description": "", "reason_temporary": ""},
}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is False
def test_save_tech_debt_without_field_creates_nothing(self, conn):
"""Вывод без поля tech_debt → задача НЕ создаётся."""
result = {
"success": True,
"raw_output": json.dumps({"status": "done", "changes": []}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is False
def test_save_tech_debt_idempotent(self, conn):
"""Повторный вызов с тем же описанием — дубликат НЕ создаётся."""
raw = json.dumps({
"status": "done",
"tech_debt": {
"description": "Duplicate tech debt check",
"reason_temporary": "quick fix",
"proper_fix": "refactor",
},
})
result = {"success": True, "raw_output": raw}
out1 = _save_tech_debt_output(conn, "p1", "P1-001", result)
out2 = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out1["created"] is True
assert out2["created"] is False
assert out1["task_id"] == out2["task_id"]
def test_save_tech_debt_invalid_category_defaults_to_fix(self, conn):
"""Неизвестная category в tech_debt → дефолтная категория FIX."""
result = {
"success": True,
"raw_output": json.dumps({
"status": "done",
"tech_debt": {
"description": "Временный костыль",
"category": "UNKNOWN_GARBAGE",
},
}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is True
child = models.get_task(conn, out["task_id"])
assert child["category"] == "FIX"
@patch("agents.runner.subprocess.run")
def test_pipeline_creates_tech_debt_task_for_backend_dev(self, mock_run, conn):
"""run_pipeline для backend_dev с tech_debt → создаётся дочерняя задача."""
agent_output = {
"status": "done",
"changes": [],
"tech_debt": {
"description": "Pipeline integration: хардкод URL",
"reason_temporary": "нет конфига",
"proper_fix": "вынести в env",
"category": "FIX",
},
"proof": {
"what_was_done": "done",
"how_verified": "tests",
"verification_result": "ok",
},
}
mock_run.return_value = _mock_success(agent_output)
steps = [{"role": "backend_dev", "model": "sonnet"}]
run_pipeline(conn, "P1-001", steps)
# Check if a [TECH DEBT] child task was created
children = models.get_children(conn, "P1-001")
tech_debt_tasks = [t for t in children if "[TECH DEBT]" in (t.get("title") or "")]
assert len(tech_debt_tasks) >= 1, "Дочерняя tech debt задача должна быть создана"
# ===========================================================================
# 5. DB schema: smoke_test_result column exists
# ===========================================================================
def test_schema_tasks_has_smoke_test_result_column(conn):
"""KIN-128: таблица tasks содержит колонку smoke_test_result."""
cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
assert "smoke_test_result" in cols, "KIN-128: smoke_test_result должна быть в таблице tasks"
def test_update_task_smoke_test_result_roundtrip(conn):
"""smoke_test_result сохраняется и читается как dict."""
data = {"status": "confirmed", "evidence": "HTTP 200"}
models.update_task(conn, "P1-001", smoke_test_result=data)
task = models.get_task(conn, "P1-001")
assert task["smoke_test_result"] == data

View file

@ -1173,3 +1173,78 @@ def test_get_decisions_empty_tags_differs_from_none(conn):
assert len(result_empty) == 0, "tags=[] должен давать 0 результатов" assert len(result_empty) == 0, "tags=[] должен давать 0 результатов"
assert len(result_none) == 2, "tags=None должен давать все 2 результата" assert len(result_none) == 2, "tags=None должен давать все 2 результата"
# ---------------------------------------------------------------------------
# Углублённое исследование: дополнительные edge cases get_decisions с []
# Задача KIN-P1-001 revision — оба параметра пустые, пустой проект, decisions без тегов
# ---------------------------------------------------------------------------
def test_get_decisions_both_types_and_tags_empty_returns_empty(conn):
"""types=[] + tags=[] одновременно — должен вернуть 0 результатов.
Ранний возврат по types=[] должен сработать до проверки tags=[], результат [].
"""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "Ловушка A", "desc", tags=["safari"])
models.add_decision(conn, "p1", "decision", "Решение B", "desc", tags=["chrome"])
result = models.get_decisions(conn, "p1", types=[], tags=[])
assert result == [], (
f"types=[] + tags=[] должен вернуть [], получено {len(result)} записей"
)
def test_get_decisions_empty_types_with_tags_none_returns_empty(conn):
"""types=[] при tags=None — ранний возврат по types, теги не проверяются → 0 результатов."""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "Ловушка 1", "desc", tags=["safari"])
result = models.get_decisions(conn, "p1", types=[], tags=None)
assert result == [], (
f"types=[] должен дать ранний возврат [], даже когда tags=None, получено {len(result)} записей"
)
def test_get_decisions_empty_tags_with_types_none_returns_empty(conn):
"""tags=[] при types=None — фильтр по тегам даёт ранний возврат → 0 результатов."""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "Ловушка 1", "desc", tags=["safari"])
result = models.get_decisions(conn, "p1", types=None, tags=[])
assert result == [], (
f"tags=[] должен дать ранний возврат [], даже когда types=None, получено {len(result)} записей"
)
def test_get_decisions_empty_types_on_empty_project_returns_empty(conn):
"""types=[] на проекте без решений — должен вернуть [] (ранний возврат, не обращение к пустой таблице)."""
models.create_project(conn, "p1", "P1", "/p1")
result = models.get_decisions(conn, "p1", types=[])
assert result == [], (
f"types=[] на пустом проекте должен вернуть [], получено {result!r}"
)
def test_get_decisions_empty_tags_on_decisions_without_tags(conn):
"""tags=[] при наличии decisions с tags=None — должен вернуть [] (не включать decisions без тегов)."""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "decision", "Без тегов 1", "desc", tags=None)
models.add_decision(conn, "p1", "gotcha", "Без тегов 2", "desc", tags=None)
result = models.get_decisions(conn, "p1", tags=[])
assert result == [], (
f"tags=[] должен вернуть [], даже если decisions имеют tags=None, получено {len(result)} записей"
)
def test_get_decisions_empty_types_with_category_ignores_category(conn):
"""types=[] + category='ui' — ранний возврат по types=[], category не влияет → 0 результатов."""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "UI ловушка", "desc", category="ui")
result = models.get_decisions(conn, "p1", types=[], category="ui")
assert result == [], (
f"types=[] должен давать ранний возврат [] независимо от category, получено {len(result)} записей"
)