Compare commits

..

No commits in common. "f1c868e335401bce74468af72ffdca29e50d3ece" and "d3bb5ef6a9744dc9e732c4dc330796f9836aa2c9" have entirely different histories.

11 changed files with 6 additions and 838 deletions

View file

@ -1,63 +0,0 @@
You are an Analyst for the Kin multi-agent orchestrator.
Your job: provide fresh analytical perspective when a task has failed multiple revisions. You are called when a task returns for revision 2 or more times — your goal is to identify WHY previous approaches failed and propose a fundamentally different path.
## Input
You receive:
- PROJECT: id, name, path, tech stack
- TASK: id, title, brief, revise_comment (latest revision comment), revise_count
- DECISIONS: known gotchas and conventions for this project
- PREVIOUS STEP OUTPUT: last agent's output from the prior pipeline run
## Your responsibilities
1. Understand what was attempted in previous iterations (read previous output, revise_comment)
2. Identify the root reason(s) why previous approaches failed or were insufficient
3. Propose a concrete alternative approach — not the same thing again
4. Document failed approaches so the next agent doesn't repeat them
5. Give specific implementation notes for the next specialist
## What to read
- Previous step output: what the last developer/debugger tried
- Task brief + revise_comment: what the user wanted vs what was delivered
- Known decisions: existing gotchas that may explain the failures
## Rules
- Do NOT implement anything yourself — your output is a plan for the next agent
- Be specific about WHY previous approaches failed (not just "it didn't work")
- Propose ONE clear recommended approach — don't give a menu of options
- If the task brief is fundamentally ambiguous, flag it — don't guess
- Your output becomes the `previous_output` for the next developer agent
## Output format
Return ONLY valid JSON (no markdown, no explanation):
```json
{
"status": "done",
"root_problem": "Краткое описание коренной причины провала предыдущих попыток",
"failed_approaches": [
"Подход 1: что пробовали и почему не сработало",
"Подход 2: что пробовали и почему не сработало"
],
"recommended_approach": "Конкретный альтернативный подход с обоснованием",
"implementation_notes": "Специфические детали реализации для следующего агента: файлы, функции, паттерны",
"risks": "Возможные риски нового подхода (если есть)"
}
```
Valid values for `status`: `"done"`, `"blocked"`.
If status is "blocked", include `"blocked_reason": "..."`.
## Blocked Protocol
If task context is insufficient to analyze:
```json
{"status": "blocked", "reason": "<clear explanation>", "blocked_at": "<ISO-8601 datetime>"}
```

View file

@ -37,8 +37,6 @@ You receive:
- API responses must be JSON-serializable dicts — no raw SQLite Row objects. - API responses must be JSON-serializable dicts — no raw SQLite Row objects.
- Do NOT modify frontend files — scope is backend only. - Do NOT modify frontend files — scope is backend only.
- Do NOT add new Python dependencies without noting it in `notes`. - Do NOT add new Python dependencies without noting it in `notes`.
- **ЗАПРЕЩЕНО** возвращать `status: done` без блока `proof`. "Готово" = сделал + проверил + результат проверки.
- Если решение временное — обязательно заполни поле `tech_debt` и создай followup на правильный фикс.
## Output format ## Output format
@ -61,23 +59,10 @@ Return ONLY valid JSON (no markdown, no explanation):
"schema_changes": [ "schema_changes": [
"ALTER TABLE projects ADD COLUMN execution_mode TEXT DEFAULT 'review'" "ALTER TABLE projects ADD COLUMN execution_mode TEXT DEFAULT 'review'"
], ],
"notes": "Frontend needs to call PATCH /api/projects/{id} to update mode", "notes": "Frontend needs to call PATCH /api/projects/{id} to update mode"
"proof": {
"what_was_done": "Что конкретно было реализовано или изменено",
"how_verified": "Как проверялась корректность: какие команды запускались, что читалось",
"verification_result": "Результат проверки: вывод команды, статус тестов, наблюдение"
},
"tech_debt": {
"description": "Краткое описание временного решения (если есть)",
"reason_temporary": "Почему сделано временно, а не правильно",
"proper_fix": "Что нужно сделать правильно",
"category": "FIX"
}
} }
``` ```
**`proof` обязателен при `status: done`.** Поле `tech_debt` опционально — заполняй только если решение действительно временное.
Valid values for `status`: `"done"`, `"blocked"`, `"partial"`. Valid values for `status`: `"done"`, `"blocked"`, `"partial"`.
If status is "blocked", include `"blocked_reason": "..."`. If status is "blocked", include `"blocked_reason": "..."`.

View file

@ -34,7 +34,6 @@ You receive:
- If the bug is in a dependency or environment, say so clearly. - If the bug is in a dependency or environment, say so clearly.
- If you cannot reproduce or locate the bug, return status "blocked" with reason. - If you cannot reproduce or locate the bug, return status "blocked" with reason.
- Never skip known decisions — they often explain why the bug exists. - Never skip known decisions — they often explain why the bug exists.
- **ЗАПРЕЩЕНО** возвращать `status: fixed` без блока `proof`. Фикс = что исправлено + как проверено + результат.
## Output format ## Output format
@ -60,20 +59,13 @@ Return ONLY valid JSON (no markdown, no explanation):
], ],
"files_read": ["path/to/file1.py", "path/to/file2.py"], "files_read": ["path/to/file1.py", "path/to/file2.py"],
"related_decisions": [12, 5], "related_decisions": [12, 5],
"notes": "Any important caveats or follow-up needed", "notes": "Any important caveats or follow-up needed"
"proof": {
"what_was_fixed": "Что именно исправлено: файл, строка, причина",
"how_verified": "Как проверяли: команды, тесты, трассировка",
"verification_result": "Результат проверки: тесты прошли / ошибка исчезла / вывод"
}
} }
``` ```
Each affected file must be a separate element in the `fixes` array. Each affected file must be a separate element in the `fixes` array.
If only one file is changed, `fixes` still must be an array with one element. If only one file is changed, `fixes` still must be an array with one element.
**`proof` обязателен при `status: fixed`.** Нельзя возвращать "fixed" без доказательства: что исправлено + как проверено + результат.
Valid values for `status`: `"fixed"`, `"blocked"`, `"needs_more_info"`. Valid values for `status`: `"fixed"`, `"blocked"`, `"needs_more_info"`.
If status is "blocked", include `"blocked_reason": "..."` instead of `"fixes"`. If status is "blocked", include `"blocked_reason": "..."` instead of `"fixes"`.

View file

@ -35,8 +35,6 @@ You receive:
- Do NOT modify Python backend files — scope is frontend only. - Do NOT modify Python backend files — scope is frontend only.
- Do NOT add new dependencies without noting it explicitly in `notes`. - Do NOT add new dependencies without noting it explicitly in `notes`.
- Keep components small and focused on one responsibility. - Keep components small and focused on one responsibility.
- **ЗАПРЕЩЕНО** возвращать `status: done` без блока `proof`. "Готово" = сделал + проверил + результат проверки.
- Если решение временное — обязательно заполни поле `tech_debt` и создай followup на правильный фикс.
## Output format ## Output format
@ -53,23 +51,10 @@ Return ONLY valid JSON (no markdown, no explanation):
], ],
"new_files": [], "new_files": [],
"api_changes": "None required — used existing /api/tasks/{id} endpoint", "api_changes": "None required — used existing /api/tasks/{id} endpoint",
"notes": "Requires backend endpoint /api/projects/{id}/mode (not yet implemented)", "notes": "Requires backend endpoint /api/projects/{id}/mode (not yet implemented)"
"proof": {
"what_was_done": "Что конкретно было реализовано или изменено",
"how_verified": "Как проверялась корректность: какие файлы читались, что запускалось",
"verification_result": "Результат проверки: компилируется, тесты прошли, поведение соответствует"
},
"tech_debt": {
"description": "Краткое описание временного решения (если есть)",
"reason_temporary": "Почему сделано временно, а не правильно",
"proper_fix": "Что нужно сделать правильно",
"category": "FIX"
}
} }
``` ```
**`proof` обязателен при `status: done`.** Поле `tech_debt` опционально — заполняй только если решение действительно временное.
Valid values for `status`: `"done"`, `"blocked"`, `"partial"`. Valid values for `status`: `"done"`, `"blocked"`, `"partial"`.
If status is "blocked", include `"blocked_reason": "..."`. If status is "blocked", include `"blocked_reason": "..."`.

View file

@ -1,74 +0,0 @@
You are a Smoke Tester for the Kin multi-agent orchestrator.
Your job: verify that the implemented feature actually works on the real running service — not unit tests, but real smoke test against the live environment.
## Input
You receive:
- PROJECT: id, name, path, tech stack, environments (SSH hosts, ports)
- TASK: id, title, brief describing what was implemented
- PREVIOUS STEP OUTPUT: developer output (what was done)
## Your responsibilities
1. Read the developer's previous output to understand what was implemented
2. Determine HOW to verify it: HTTP endpoint, SSH command, CLI check, log inspection
3. Attempt the actual verification against the running service
4. Report the result honestly — `confirmed` or `cannot_confirm`
## Verification approach
- For web services: curl/wget against the endpoint, check response code and body
- For backend changes: SSH to the deploy host, run health check or targeted query
- For CLI tools: run the command and check output
- For DB changes: query the database directly and verify schema/data
If you have no access to the running environment (no SSH key, no host in project environments, service not deployed), return `cannot_confirm` — this is honest escalation, NOT a failure.
## Rules
- Do NOT just run unit tests. Smoke test = real environment check.
- Do NOT fake results. If you cannot verify — say so.
- If the service is unreachable: `cannot_confirm` with clear reason.
- Use the project's environments from context (ssh_host, project_environments) for SSH.
- Return `confirmed` ONLY if you actually received a successful response from the live service.
- **ЗАПРЕЩЕНО** возвращать `confirmed` без реального доказательства (вывода команды, HTTP ответа, и т.д.).
## Output format
Return ONLY valid JSON (no markdown, no explanation):
```json
{
"status": "confirmed",
"commands_run": [
"curl -s https://example.com/api/health",
"ssh pelmen@prod-host 'systemctl status myservice'"
],
"evidence": "HTTP 200 OK: {\"status\": \"healthy\"}\nService: active (running)",
"reason": null
}
```
When cannot verify:
```json
{
"status": "cannot_confirm",
"commands_run": [],
"evidence": null,
"reason": "Нет доступа к prod-среде: project_environments не содержит хоста с установленным сервисом. Необходима ручная проверка."
}
```
Valid values for `status`: `"confirmed"`, `"cannot_confirm"`.
`cannot_confirm` = честная эскалация. Задача уйдёт в blocked с причиной для ручного разбора.
## Blocked Protocol
If task context is missing or request is fundamentally unclear:
```json
{"status": "blocked", "reason": "<clear explanation>", "blocked_at": "<ISO-8601 datetime>"}
```

View file

@ -986,85 +986,6 @@ def _save_decomposer_output(
return {"created": created, "skipped": skipped} return {"created": created, "skipped": skipped}
# ---------------------------------------------------------------------------
# Tech debt: create followup child task from dev agent output
# ---------------------------------------------------------------------------
# Roles whose output is parsed for tech_debt (KIN-128)
_TECH_DEBT_ROLES = {"backend_dev", "frontend_dev", "debugger", "sysadmin"}
def _save_tech_debt_output(
conn: sqlite3.Connection,
project_id: str,
task_id: str,
result: dict,
) -> dict:
"""Parse dev agent JSON output for tech_debt field and create a child task.
If the agent output contains a non-empty 'tech_debt' object with a 'description',
creates one child task with title='[TECH DEBT] {description}'.
At most 1 tech_debt task per call (prevents runaway task creation).
Returns {created: bool, task_id: str | None}.
"""
raw = result.get("raw_output") or result.get("output") or ""
if isinstance(raw, (dict, list)):
raw = json.dumps(raw, ensure_ascii=False)
try:
parsed = _try_parse_json(raw)
except Exception:
return {"created": False, "task_id": None}
if not isinstance(parsed, dict):
return {"created": False, "task_id": None}
tech_debt = parsed.get("tech_debt")
if not isinstance(tech_debt, dict):
return {"created": False, "task_id": None}
description = (tech_debt.get("description") or "").strip()
if not description:
return {"created": False, "task_id": None}
reason_temporary = (tech_debt.get("reason_temporary") or "").strip()
proper_fix = (tech_debt.get("proper_fix") or "").strip()
# Idempotency: skip if a [TECH DEBT] child with same description already exists
title = f"[TECH DEBT] {description}"
existing = conn.execute(
"""SELECT id FROM tasks
WHERE parent_task_id = ? AND lower(trim(title)) = lower(trim(?))""",
(task_id, title),
).fetchone()
if existing:
return {"created": False, "task_id": existing[0]}
category = (tech_debt.get("category") or "").strip().upper()
if category not in models.TASK_CATEGORIES:
category = "FIX"
brief_text = f"Технический долг из задачи {task_id}."
if reason_temporary:
brief_text += f"\n\nПричина временного решения: {reason_temporary}"
if proper_fix:
brief_text += f"\n\nПравильный фикс: {proper_fix}"
new_task_id = models.next_task_id(conn, project_id, category=category)
models.create_task(
conn,
new_task_id,
project_id,
title,
priority=7,
brief={"text": brief_text, "source": f"tech_debt:{task_id}"},
category=category,
parent_task_id=task_id,
)
_logger.info("tech_debt: created task %s for parent %s", new_task_id, task_id)
return {"created": True, "task_id": new_task_id}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Auto-learning: extract decisions from pipeline results # Auto-learning: extract decisions from pipeline results
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -1899,74 +1820,6 @@ def run_pipeline(
except Exception: except Exception:
pass # Never block pipeline on decomposer save errors pass # Never block pipeline on decomposer save errors
# Smoke tester: parse result and escalate if cannot_confirm (KIN-128)
if role == "smoke_tester" and result["success"] and not dry_run:
smoke_output = result.get("output") or result.get("raw_output") or ""
smoke_parsed = None
try:
if isinstance(smoke_output, dict):
smoke_parsed = smoke_output
elif isinstance(smoke_output, str):
smoke_parsed = _try_parse_json(smoke_output)
except Exception:
pass
if isinstance(smoke_parsed, dict):
# Save smoke_test_result regardless of outcome
try:
models.update_task(conn, task_id, smoke_test_result=smoke_parsed)
except Exception:
pass
smoke_status = smoke_parsed.get("status", "")
if smoke_status == "cannot_confirm":
reason = smoke_parsed.get("reason") or "smoke_tester: cannot confirm — no proof of working service"
blocked_reason = f"smoke_test: cannot_confirm — {reason}"
models.update_task(
conn, task_id,
status="blocked",
blocked_reason=blocked_reason,
blocked_agent_role="smoke_tester",
blocked_pipeline_step=str(i + 1),
)
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
try:
models.write_log(
conn, pipeline["id"],
f"Smoke test cannot_confirm: {reason}",
level="WARN",
extra={"role": "smoke_tester", "reason": reason},
)
except Exception:
pass
return {
"success": False,
"error": blocked_reason,
"blocked_by": "smoke_tester",
"blocked_reason": blocked_reason,
"steps_completed": i + 1,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# status == 'confirmed': smoke test passed, continue pipeline
# Tech debt: create followup child task from dev agent output (KIN-128)
if role in _TECH_DEBT_ROLES and result["success"] and not dry_run:
try:
_save_tech_debt_output(conn, project_id, task_id, result)
except Exception:
pass # Never block pipeline on tech_debt save errors
# Department head: execute sub-pipeline planned by the dept head # Department head: execute sub-pipeline planned by the dept head
if _is_department_head(role) and result["success"] and not dry_run: if _is_department_head(role) and result["success"] and not dry_run:
# Determine next department for handoff routing # Determine next department for handoff routing

View file

@ -69,7 +69,6 @@ CREATE TABLE IF NOT EXISTS tasks (
category TEXT DEFAULT NULL, category TEXT DEFAULT NULL,
telegram_sent BOOLEAN DEFAULT 0, telegram_sent BOOLEAN DEFAULT 0,
acceptance_criteria TEXT, acceptance_criteria TEXT,
smoke_test_result JSON DEFAULT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
); );
@ -780,12 +779,6 @@ def _migrate(conn: sqlite3.Connection):
conn.execute("ALTER TABLE tasks ADD COLUMN completed_at DATETIME DEFAULT NULL") conn.execute("ALTER TABLE tasks ADD COLUMN completed_at DATETIME DEFAULT NULL")
conn.commit() conn.commit()
# KIN-128: Add smoke_test_result to tasks — stores smoke_tester agent output
task_cols_final2 = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
if "smoke_test_result" not in task_cols_final2:
conn.execute("ALTER TABLE tasks ADD COLUMN smoke_test_result JSON DEFAULT NULL")
conn.commit()
def _seed_default_hooks(conn: sqlite3.Connection): def _seed_default_hooks(conn: sqlite3.Connection):
"""Seed default hooks for the kin project (idempotent). """Seed default hooks for the kin project (idempotent).

View file

@ -36,7 +36,6 @@ def validate_completion_mode(value: str) -> str:
_JSON_COLUMNS: frozenset[str] = frozenset({ _JSON_COLUMNS: frozenset[str] = frozenset({
"tech_stack", "tech_stack",
"brief", "spec", "review", "test_result", "security_result", "labels", "brief", "spec", "review", "test_result", "security_result", "labels",
"smoke_test_result",
"tags", "tags",
"dependencies", "dependencies",
"steps", "steps",
@ -380,7 +379,7 @@ def update_task(conn: sqlite3.Connection, id: str, **fields) -> dict:
""" """
if not fields: if not fields:
return get_task(conn, id) return get_task(conn, id)
json_cols = ("brief", "spec", "review", "test_result", "security_result", "labels", "smoke_test_result") json_cols = ("brief", "spec", "review", "test_result", "security_result", "labels")
for key in json_cols: for key in json_cols:
if key in fields: if key in fields:
fields[key] = _json_encode(fields[key]) fields[key] = _json_encode(fields[key])
@ -484,15 +483,11 @@ def get_decisions(
if category: if category:
query += " AND d.category = ?" query += " AND d.category = ?"
params.append(category) params.append(category)
if types is not None: if types:
if not types:
return []
placeholders = ", ".join("?" for _ in types) placeholders = ", ".join("?" for _ in types)
query += f" AND d.type IN ({placeholders})" query += f" AND d.type IN ({placeholders})"
params.extend(types) params.extend(types)
if tags is not None: if tags:
if not tags:
return []
query += """ AND d.id IN ( query += """ AND d.id IN (
SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t
WHERE t.value IN ({}) WHERE t.value IN ({})

View file

@ -1,409 +0,0 @@
"""Regression tests for KIN-128 — Quality over speed.
Covers 4 mechanisms:
1. Analyst auto-injection on 2nd+ revision (revise_count >= 2)
2. Smoke tester: cannot_confirm task blocked; confirmed pipeline continues
3. Agent prompts contain mandatory proof block (backend_dev, frontend_dev, debugger)
4. Tech debt output creates child task in DB
"""
import json
import subprocess
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from core.db import init_db
from core import models
from agents.runner import _save_tech_debt_output, _TECH_DEBT_ROLES, run_pipeline
PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts"
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB with a seeded project and task."""
c = init_db(":memory:")
models.create_project(c, "p1", "P1", "/p1", tech_stack=["python"])
models.create_task(c, "P1-001", "p1", "Fix bug", brief={"text": "fix"})
yield c
c.close()
@pytest.fixture
def api_client(tmp_path):
"""FastAPI TestClient with isolated DB."""
import web.api as api_module
api_module.DB_PATH = tmp_path / "test.db"
from web.api import app
from fastapi.testclient import TestClient
client = TestClient(app)
client.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"})
client.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"})
return client
def _mock_success(output_data):
m = MagicMock()
m.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data
m.stderr = ""
m.returncode = 0
return m
# ===========================================================================
# 1. Analyst injection on 2nd+ revision
# ===========================================================================
class TestAnalystInjectionOnRevise:
def test_first_revise_does_not_inject_analyst(self, api_client):
"""revise_count=1 → analyst НЕ добавляется в шаги пайплайна."""
steps = [{"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
r = api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": "please fix", "steps": steps},
)
assert r.status_code == 200
data = r.json()
assert data["revise_count"] == 1
roles = [s["role"] for s in (data.get("pipeline_steps") or [])]
assert roles[0] != "analyst", "analyst не должен быть первым шагом при revise_count=1"
def test_second_revise_injects_analyst_as_first_step(self, api_client):
"""revise_count=2 → analyst автоматически первым шагом пайплайна."""
steps = [{"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
# first revise
api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": "first attempt", "steps": steps},
)
# second revise
r = api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": "second attempt", "steps": steps},
)
assert r.status_code == 200
data = r.json()
assert data["revise_count"] == 2
pipeline_steps = data.get("pipeline_steps") or []
assert pipeline_steps, "pipeline_steps не должен быть пустым"
assert pipeline_steps[0]["role"] == "analyst", (
f"Первый шаг должен быть analyst, получили: {pipeline_steps[0]['role']}"
)
def test_second_revise_analyst_not_duplicated_if_already_first(self, api_client):
"""Если analyst уже первый шаг — не добавлять второй."""
steps = [{"role": "analyst", "model": "sonnet"}, {"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
api_client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": steps})
r = api_client.post("/api/tasks/P1-001/revise", json={"comment": "r2", "steps": steps})
data = r.json()
pipeline_steps = data.get("pipeline_steps") or []
analyst_steps = [s for s in pipeline_steps if s["role"] == "analyst"]
assert len(analyst_steps) == 1, "analyst не должен дублироваться"
def test_third_revise_also_injects_analyst(self, api_client):
"""revise_count=3 тоже инжектирует analyst."""
steps = [{"role": "backend_dev", "model": "sonnet"}]
with patch("web.api._launch_pipeline_subprocess"):
for comment in ("r1", "r2", "r3"):
r = api_client.post(
"/api/tasks/P1-001/revise",
json={"comment": comment, "steps": steps},
)
data = r.json()
assert data["revise_count"] == 3
assert (data.get("pipeline_steps") or [{}])[0]["role"] == "analyst"
# ===========================================================================
# 2. Smoke tester: cannot_confirm → blocked; confirmed → continues
# ===========================================================================
class TestSmokeTester:
def _make_pipeline(self, smoke_status: str):
"""Build single-step pipeline with smoke_tester returning given status."""
return [{"role": "smoke_tester", "model": "sonnet"}], {
"status": smoke_status,
"commands_run": [],
"evidence": "HTTP 200" if smoke_status == "confirmed" else None,
"reason": "нет доступа к prod" if smoke_status == "cannot_confirm" else None,
}
@patch("agents.runner.subprocess.run")
def test_smoke_tester_cannot_confirm_blocks_task(self, mock_run, conn):
"""smoke_tester с cannot_confirm → задача переходит в blocked."""
steps, smoke_output = self._make_pipeline("cannot_confirm")
mock_run.return_value = _mock_success(smoke_output)
result = run_pipeline(conn, "P1-001", steps)
assert result["success"] is False
assert result.get("blocked_by") == "smoke_tester"
task = models.get_task(conn, "P1-001")
assert task["status"] == "blocked"
assert task["blocked_agent_role"] == "smoke_tester"
assert "cannot_confirm" in (task.get("blocked_reason") or "")
@patch("agents.runner.subprocess.run")
def test_smoke_tester_confirmed_continues_pipeline(self, mock_run, conn):
"""smoke_tester с confirmed → пайплайн продолжается, задача не блокируется."""
steps = [
{"role": "smoke_tester", "model": "sonnet"},
]
_, smoke_output = self._make_pipeline("confirmed")
mock_run.return_value = _mock_success(smoke_output)
result = run_pipeline(conn, "P1-001", steps)
# Pipeline completes (only one step — smoke_tester — which passed)
assert result.get("blocked_by") != "smoke_tester"
task = models.get_task(conn, "P1-001")
assert task["status"] != "blocked"
@patch("agents.runner.subprocess.run")
def test_smoke_tester_confirmed_saves_result_to_db(self, mock_run, conn):
"""smoke_tester confirmed → результат сохраняется в smoke_test_result."""
steps = [{"role": "smoke_tester", "model": "sonnet"}]
smoke_output = {
"status": "confirmed",
"commands_run": ["curl https://example.com/health"],
"evidence": "HTTP 200 OK",
"reason": None,
}
mock_run.return_value = _mock_success(smoke_output)
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
result = task.get("smoke_test_result")
assert result is not None
assert result.get("status") == "confirmed"
@patch("agents.runner.subprocess.run")
def test_smoke_tester_cannot_confirm_saves_result_to_db(self, mock_run, conn):
"""smoke_tester cannot_confirm → результат также сохраняется в smoke_test_result."""
steps = [{"role": "smoke_tester", "model": "sonnet"}]
smoke_output = {
"status": "cannot_confirm",
"commands_run": [],
"evidence": None,
"reason": "prod недоступен",
}
mock_run.return_value = _mock_success(smoke_output)
run_pipeline(conn, "P1-001", steps)
task = models.get_task(conn, "P1-001")
result = task.get("smoke_test_result")
assert result is not None
assert result.get("status") == "cannot_confirm"
# ===========================================================================
# 3. Agent prompts contain mandatory proof block
# ===========================================================================
class TestAgentPromptsProofBlock:
"""Verify that agent prompts structurally require a proof block."""
@pytest.mark.parametrize("prompt_file,forbidden_keyword,proof_keyword", [
("backend_dev.md", "status: done", "proof"),
("frontend_dev.md", "status: done", "proof"),
("debugger.md", "status: fixed", "proof"),
])
def test_prompt_contains_proof_field(self, prompt_file, forbidden_keyword, proof_keyword):
"""Промпт содержит поле proof в разделе Output format."""
content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8")
assert proof_keyword in content, (
f"{prompt_file}: отсутствует поле '{proof_keyword}' в Output format"
)
@pytest.mark.parametrize("prompt_file,rule_phrase", [
("backend_dev.md", "ЗАПРЕЩЕНО"),
("frontend_dev.md", "ЗАПРЕЩЕНО"),
("debugger.md", "ЗАПРЕЩЕНО"),
])
def test_prompt_forbids_done_without_proof(self, prompt_file, rule_phrase):
"""Промпт явно запрещает возврат 'готово' без доказательства."""
content = (PROMPTS_DIR / prompt_file).read_text(encoding="utf-8")
assert rule_phrase in content, (
f"{prompt_file}: не содержит правило ЗАПРЕЩЕНО о proof"
)
def test_smoke_tester_prompt_exists(self):
"""Промпт smoke_tester.md существует и содержит cannot_confirm."""
path = PROMPTS_DIR / "smoke_tester.md"
assert path.exists(), "smoke_tester.md не найден"
content = path.read_text(encoding="utf-8")
assert "cannot_confirm" in content
def test_analyst_prompt_exists(self):
"""Промпт analyst.md существует и содержит root_problem."""
path = PROMPTS_DIR / "analyst.md"
assert path.exists(), "analyst.md не найден"
content = path.read_text(encoding="utf-8")
assert "root_problem" in content
def test_backend_dev_proof_fields_complete(self):
"""backend_dev.md содержит все три поля внутри proof."""
content = (PROMPTS_DIR / "backend_dev.md").read_text(encoding="utf-8")
for field in ("what_was_done", "how_verified", "verification_result"):
assert field in content, f"backend_dev.md: отсутствует поле proof.{field}"
def test_debugger_proof_fields_complete(self):
"""debugger.md содержит все три поля внутри proof."""
content = (PROMPTS_DIR / "debugger.md").read_text(encoding="utf-8")
for field in ("what_was_fixed", "how_verified", "verification_result"):
assert field in content, f"debugger.md: отсутствует поле proof.{field}"
# ===========================================================================
# 4. Tech debt output creates child task in DB
# ===========================================================================
class TestTechDebtTaskCreation:
def test_tech_debt_roles_contains_expected_roles(self):
"""_TECH_DEBT_ROLES содержит все dev-роли + sysadmin."""
assert "backend_dev" in _TECH_DEBT_ROLES
assert "frontend_dev" in _TECH_DEBT_ROLES
assert "debugger" in _TECH_DEBT_ROLES
assert "sysadmin" in _TECH_DEBT_ROLES
def test_save_tech_debt_creates_child_task(self, conn):
"""tech_debt с description → создаётся дочерняя задача [TECH DEBT]."""
result = {
"success": True,
"raw_output": json.dumps({
"status": "done",
"tech_debt": {
"description": "Хардкод timeout в 30s нужно вынести в конфиг",
"reason_temporary": "MVP без конфига",
"proper_fix": "Добавить config.yaml с полем agent_timeout",
"category": "FIX",
},
}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is True
assert out["task_id"] is not None
child = models.get_task(conn, out["task_id"])
assert child is not None
assert "[TECH DEBT]" in child["title"]
assert "Хардкод timeout" in child["title"]
assert child["parent_task_id"] == "P1-001"
assert child["category"] == "FIX"
def test_save_tech_debt_without_description_creates_nothing(self, conn):
"""tech_debt без description → задача НЕ создаётся."""
result = {
"success": True,
"raw_output": json.dumps({
"status": "done",
"tech_debt": {"description": "", "reason_temporary": ""},
}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is False
def test_save_tech_debt_without_field_creates_nothing(self, conn):
"""Вывод без поля tech_debt → задача НЕ создаётся."""
result = {
"success": True,
"raw_output": json.dumps({"status": "done", "changes": []}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is False
def test_save_tech_debt_idempotent(self, conn):
"""Повторный вызов с тем же описанием — дубликат НЕ создаётся."""
raw = json.dumps({
"status": "done",
"tech_debt": {
"description": "Duplicate tech debt check",
"reason_temporary": "quick fix",
"proper_fix": "refactor",
},
})
result = {"success": True, "raw_output": raw}
out1 = _save_tech_debt_output(conn, "p1", "P1-001", result)
out2 = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out1["created"] is True
assert out2["created"] is False
assert out1["task_id"] == out2["task_id"]
def test_save_tech_debt_invalid_category_defaults_to_fix(self, conn):
"""Неизвестная category в tech_debt → дефолтная категория FIX."""
result = {
"success": True,
"raw_output": json.dumps({
"status": "done",
"tech_debt": {
"description": "Временный костыль",
"category": "UNKNOWN_GARBAGE",
},
}),
}
out = _save_tech_debt_output(conn, "p1", "P1-001", result)
assert out["created"] is True
child = models.get_task(conn, out["task_id"])
assert child["category"] == "FIX"
@patch("agents.runner.subprocess.run")
def test_pipeline_creates_tech_debt_task_for_backend_dev(self, mock_run, conn):
"""run_pipeline для backend_dev с tech_debt → создаётся дочерняя задача."""
agent_output = {
"status": "done",
"changes": [],
"tech_debt": {
"description": "Pipeline integration: хардкод URL",
"reason_temporary": "нет конфига",
"proper_fix": "вынести в env",
"category": "FIX",
},
"proof": {
"what_was_done": "done",
"how_verified": "tests",
"verification_result": "ok",
},
}
mock_run.return_value = _mock_success(agent_output)
steps = [{"role": "backend_dev", "model": "sonnet"}]
run_pipeline(conn, "P1-001", steps)
# Check if a [TECH DEBT] child task was created
children = models.get_children(conn, "P1-001")
tech_debt_tasks = [t for t in children if "[TECH DEBT]" in (t.get("title") or "")]
assert len(tech_debt_tasks) >= 1, "Дочерняя tech debt задача должна быть создана"
# ===========================================================================
# 5. DB schema: smoke_test_result column exists
# ===========================================================================
def test_schema_tasks_has_smoke_test_result_column(conn):
"""KIN-128: таблица tasks содержит колонку smoke_test_result."""
cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
assert "smoke_test_result" in cols, "KIN-128: smoke_test_result должна быть в таблице tasks"
def test_update_task_smoke_test_result_roundtrip(conn):
"""smoke_test_result сохраняется и читается как dict."""
data = {"status": "confirmed", "evidence": "HTTP 200"}
models.update_task(conn, "P1-001", smoke_test_result=data)
task = models.get_task(conn, "P1-001")
assert task["smoke_test_result"] == data

View file

@ -1173,78 +1173,3 @@ def test_get_decisions_empty_tags_differs_from_none(conn):
assert len(result_empty) == 0, "tags=[] должен давать 0 результатов" assert len(result_empty) == 0, "tags=[] должен давать 0 результатов"
assert len(result_none) == 2, "tags=None должен давать все 2 результата" assert len(result_none) == 2, "tags=None должен давать все 2 результата"
# ---------------------------------------------------------------------------
# Углублённое исследование: дополнительные edge cases get_decisions с []
# Задача KIN-P1-001 revision — оба параметра пустые, пустой проект, decisions без тегов
# ---------------------------------------------------------------------------
def test_get_decisions_both_types_and_tags_empty_returns_empty(conn):
"""types=[] + tags=[] одновременно — должен вернуть 0 результатов.
Ранний возврат по types=[] должен сработать до проверки tags=[], результат [].
"""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "Ловушка A", "desc", tags=["safari"])
models.add_decision(conn, "p1", "decision", "Решение B", "desc", tags=["chrome"])
result = models.get_decisions(conn, "p1", types=[], tags=[])
assert result == [], (
f"types=[] + tags=[] должен вернуть [], получено {len(result)} записей"
)
def test_get_decisions_empty_types_with_tags_none_returns_empty(conn):
"""types=[] при tags=None — ранний возврат по types, теги не проверяются → 0 результатов."""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "Ловушка 1", "desc", tags=["safari"])
result = models.get_decisions(conn, "p1", types=[], tags=None)
assert result == [], (
f"types=[] должен дать ранний возврат [], даже когда tags=None, получено {len(result)} записей"
)
def test_get_decisions_empty_tags_with_types_none_returns_empty(conn):
"""tags=[] при types=None — фильтр по тегам даёт ранний возврат → 0 результатов."""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "Ловушка 1", "desc", tags=["safari"])
result = models.get_decisions(conn, "p1", types=None, tags=[])
assert result == [], (
f"tags=[] должен дать ранний возврат [], даже когда types=None, получено {len(result)} записей"
)
def test_get_decisions_empty_types_on_empty_project_returns_empty(conn):
"""types=[] на проекте без решений — должен вернуть [] (ранний возврат, не обращение к пустой таблице)."""
models.create_project(conn, "p1", "P1", "/p1")
result = models.get_decisions(conn, "p1", types=[])
assert result == [], (
f"types=[] на пустом проекте должен вернуть [], получено {result!r}"
)
def test_get_decisions_empty_tags_on_decisions_without_tags(conn):
"""tags=[] при наличии decisions с tags=None — должен вернуть [] (не включать decisions без тегов)."""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "decision", "Без тегов 1", "desc", tags=None)
models.add_decision(conn, "p1", "gotcha", "Без тегов 2", "desc", tags=None)
result = models.get_decisions(conn, "p1", tags=[])
assert result == [], (
f"tags=[] должен вернуть [], даже если decisions имеют tags=None, получено {len(result)} записей"
)
def test_get_decisions_empty_types_with_category_ignores_category(conn):
"""types=[] + category='ui' — ранний возврат по types=[], category не влияет → 0 результатов."""
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "UI ловушка", "desc", category="ui")
result = models.get_decisions(conn, "p1", types=[], category="ui")
assert result == [], (
f"types=[] должен давать ранний возврат [] независимо от category, получено {len(result)} записей"
)

View file

@ -1051,20 +1051,6 @@ def revise_task(task_id: str, body: TaskRevise):
raw = row["steps"] raw = row["steps"]
steps = _json.loads(raw) if isinstance(raw, str) else raw steps = _json.loads(raw) if isinstance(raw, str) else raw
# KIN-128: On 2nd+ revision, inject analyst as first step for fresh perspective.
# Guard: skip if analyst is already the first step (idempotent), or if steps is None.
if revise_count >= 2 and steps and (not steps or steps[0].get("role") != "analyst"):
analyst_step = {
"role": "analyst",
"model": "sonnet",
"brief": (
f"Задача вернулась на ревизию №{revise_count}. "
"Проведи свежий анализ причин провала предыдущих попыток "
"и предложи другой подход."
),
}
steps = [analyst_step] + list(steps)
conn.close() conn.close()
# Launch pipeline in background subprocess # Launch pipeline in background subprocess