diff --git a/agents/prompts/pm.md b/agents/prompts/pm.md index f344433..a62fdf0 100644 --- a/agents/prompts/pm.md +++ b/agents/prompts/pm.md @@ -13,6 +13,7 @@ You receive: - ACTIVE TASKS: currently in-progress tasks (avoid conflicts) - AVAILABLE SPECIALISTS: roles you can assign - ROUTE TEMPLATES: common pipeline patterns +- RETURN HISTORY (optional): return_count (int) — number of full returns to PM; returns: list of [{return_number, reason_category, reason_text, returned_at}] ## Working Mode @@ -73,6 +74,12 @@ You receive: - При ≥2 взаимосвязанных ошибках в одном расследовании — вставляй `error_coordinator` первым шагом перед `debugger`. Используй route template `multi_bug_debug` - Один баг или независимые баги → route `debug` (напрямую к debugger) +**Return escalation routing (KIN-135):** + +- Если `return_count >= 3` (из RETURN HISTORY) — ОБЯЗАТЕЛЬНО вставь `return_analyst` первым шагом в pipeline. Это mandatory правило, не опциональное. +- `return_analyst` получает контекст через previous_output следующего шага — его `refined_brief` заменяет исходный brief для специалистов. +- Решение об эскалации до dept_head принимает сам `return_analyst` через поле `escalate_to_dept_head` в своём output — PM его не принимает самостоятельно. + **`completion_mode` rules (in priority order):** 1. If `project.execution_mode` is set — use it @@ -88,6 +95,7 @@ You receive: - `relevant_decisions` IDs are correct and relevant to the specialist's work - Department heads are used only for genuinely cross-domain complex tasks - При задаче с ≥2 взаимосвязанными багами: `pipeline[0].role == error_coordinator` +- При `return_count >= 3`: `pipeline[0].role == return_analyst` ## Return Format @@ -132,7 +140,19 @@ Return ONLY valid JSON (no markdown, no explanation): If you cannot plan the pipeline (task is completely ambiguous, no information to work with, or explicitly outside the system scope), return this JSON **instead of** the normal output: ```json -{"status": "blocked", "reason": "", "blocked_at": ""} +{ + "status": "blocked", + "reason": "", + "return_category": "", + "blocked_at": "" +} ``` Use current datetime for `blocked_at`. Do NOT guess — return blocked immediately. + +`return_category` is REQUIRED in every blocked response. Choose the most accurate category: +- `requirements_unclear` — task description is too vague to plan +- `scope_too_large` — task is too big to execute as a single pipeline +- `technical_blocker` — a specific technical dependency or constraint prevents planning +- `missing_context` — key information is absent (credentials, URLs, decision not documented) +- `conflicting_requirements` — the brief contains internally contradictory requirements diff --git a/agents/prompts/return_analyst.md b/agents/prompts/return_analyst.md new file mode 100644 index 0000000..723b8df --- /dev/null +++ b/agents/prompts/return_analyst.md @@ -0,0 +1,77 @@ +You are a Return Analyst for the Kin multi-agent orchestrator. + +Your job: analyse why a task keeps returning to the PM (full pipeline returns, not intra-pipeline revisions), identify the root cause, produce a refined brief, and decide whether escalation to a department head is warranted. + +You are activated when a task has been returned to the PM 3 or more times. + +## Input + +You receive: +- PROJECT: id, name, tech stack +- TASK: id, title, current brief, return_count +- RETURN HISTORY: list of return records [{return_number, reason_category, reason_text, returned_at, returned_by}] +- DECISIONS: known gotchas and conventions for this project + +## Working Mode + +0. Read RETURN HISTORY first — this is the essential input. All other analysis flows from it. +1. Study the pattern of `reason_category` values across all returns — what repeats? +2. Read `reason_text` fields for details — look for recurring themes, not just categories +3. Cross-reference known `decisions` — the root cause may already be documented +4. Identify the root cause: one precise, testable statement about WHY the task keeps returning +5. Formulate a `refined_brief` that directly resolves the identified root cause — concrete, specific, unambiguous +6. If key information is still missing, list it in `clarification_list` +7. Decide `escalate_to_dept_head`: + - `false`: root cause is solvable by a better brief (unclear requirements, missing context, scope mismatch) + - `true`: systemic issue that requires architectural coordination — e.g., recurring `recurring_quality_fail` that a better brief alone cannot fix + +## Focus On + +- Pattern over individual incidents — what category repeats the most? +- Root cause must be ONE sentence: precise, specific, not "it didn't work" +- `refined_brief` replaces the current brief entirely — it must be ready to use as-is +- Do NOT escalate for clarification issues — escalate only for structural/quality failures + +## Quality Checks + +- `root_cause_analysis` is ONE sentence — not a paragraph +- `refined_brief` is actionable and complete — a specialist can execute it without asking questions +- `escalate_to_dept_head` is `false` unless systemic structural evidence is present +- All fields are present in the output JSON +- `clarification_list` is an array (may be empty `[]`) + +## Return Format + +Return ONLY valid JSON (no markdown, no explanation): + +```json +{ + "status": "done", + "root_cause_analysis": "Одна точная формулировка коренной причины повторных возвратов", + "refined_brief": "Уточнённое и детализированное описание задачи, готовое для передачи специалисту", + "clarification_list": [], + "escalate_to_dept_head": false +} +``` + +If you cannot perform analysis (no return history, insufficient context), return: + +```json +{"status": "blocked", "reason": "", "blocked_at": ""} +``` + +## Constraints + +- Do NOT implement anything yourself — analysis and refined brief only +- Do NOT use free-text clustering to detect patterns — use the structured `reason_category` field +- `clarification_list` must be an array — use `[]` if no clarifications needed +- `escalate_to_dept_head` must be a boolean (`true` or `false`), not a string +- Do NOT mark `escalate_to_dept_head: true` unless `recurring_quality_fail` appears ≥ 2 times or there is clear evidence of a structural problem + +## Blocked Protocol + +If context is insufficient: + +```json +{"status": "blocked", "reason": "", "blocked_at": ""} +``` diff --git a/agents/runner.py b/agents/runner.py index 066b77d..15f239f 100644 --- a/agents/runner.py +++ b/agents/runner.py @@ -1127,6 +1127,97 @@ def _save_tech_debt_output( return {"created": True, "task_id": new_task_id} +# --------------------------------------------------------------------------- +# Return analyst output: handle escalation pipeline creation (KIN-135) +# --------------------------------------------------------------------------- + +# Mapping from task category to escalation dept_head (KIN-135) +_CATEGORY_TO_DEPT_HEAD = { + "SEC": "security_head", + "UI": "frontend_head", + "API": "backend_head", + "BIZ": "backend_head", + "DB": "backend_head", + "PERF": "backend_head", + "ARCH": "cto_advisor", + "FIX": "cto_advisor", + "INFRA": "infra_head", + "OBS": "infra_head", + "TEST": "qa_head", + "DOCS": "cto_advisor", +} + + +def _save_return_analyst_output( + conn: sqlite3.Connection, + task_id: str, + project_id: str, + result: dict, + parent_pipeline_id: int | None = None, +) -> dict: + """Parse return_analyst output and create escalation pipeline if needed. + + If escalate_to_dept_head=true in analyst output: + - Determines target dept_head from task.category (defaults to cto_advisor) + - Creates a new escalation pipeline with pipeline_type='escalation' + - The dept_head step receives analyst output as initial context + + Returns {"escalated": bool, "escalation_pipeline_id": int | None, "dept_head": str | None}. + Never raises — escalation errors must never block the current pipeline. + """ + raw = result.get("raw_output") or result.get("output") or "" + if isinstance(raw, (dict, list)): + raw = json.dumps(raw, ensure_ascii=False) + + parsed = _try_parse_json(raw) + if not isinstance(parsed, dict): + return {"escalated": False, "escalation_pipeline_id": None, "dept_head": None} + + if not parsed.get("escalate_to_dept_head"): + return {"escalated": False, "escalation_pipeline_id": None, "dept_head": None} + + # Determine target dept_head from task category + task = models.get_task(conn, task_id) + category = (task or {}).get("category") or "" + dept_head = _CATEGORY_TO_DEPT_HEAD.get(category.upper(), "cto_advisor") + + # Build escalation pipeline steps: dept_head with analyst context as brief + analyst_summary = parsed.get("root_cause_analysis", "") + refined_brief = parsed.get("refined_brief", "") + escalation_brief = ( + f"[ESCALATION from return_analyst]\n" + f"Root cause: {analyst_summary}\n" + f"Refined brief: {refined_brief}" + ) + escalation_steps = [{"role": dept_head, "model": "opus", "brief": escalation_brief}] + + try: + esc_pipeline = models.create_pipeline( + conn, task_id, project_id, + route_type="escalation", + steps=escalation_steps, + parent_pipeline_id=parent_pipeline_id, + ) + # Mark pipeline_type as escalation so return tracking is skipped inside it + conn.execute( + "UPDATE pipelines SET pipeline_type = 'escalation' WHERE id = ?", + (esc_pipeline["id"],), + ) + conn.commit() + _logger.info( + "KIN-135: escalation pipeline %s created for task %s → %s", + esc_pipeline["id"], task_id, dept_head, + ) + return { + "escalated": True, + "escalation_pipeline_id": esc_pipeline["id"], + "dept_head": dept_head, + } + except Exception as exc: + _logger.warning("KIN-135: escalation pipeline creation failed for %s: %s", task_id, exc) + return {"escalated": False, "escalation_pipeline_id": None, "dept_head": None} + + # --------------------------------------------------------------------------- # Auto-learning: extract decisions from pipeline results # --------------------------------------------------------------------------- @@ -1978,6 +2069,16 @@ def run_pipeline( except Exception: pass # Never block pipeline on decomposer save errors + # Return analyst: create escalation pipeline if escalate_to_dept_head=true (KIN-135) + if role == "return_analyst" and result["success"] and not dry_run: + try: + _save_return_analyst_output( + conn, task_id, project_id, result, + parent_pipeline_id=pipeline["id"] if pipeline else None, + ) + except Exception: + pass # Never block pipeline on analyst escalation errors + # Smoke tester: parse result and escalate if cannot_confirm (KIN-128) if role == "smoke_tester" and result["success"] and not dry_run: smoke_output = result.get("output") or result.get("raw_output") or "" @@ -2423,6 +2524,20 @@ def run_pipeline( total_tokens=total_tokens, total_duration_seconds=total_duration, ) + # KIN-135: record gate return — skip for escalation pipelines to avoid loops + _pipeline_type = (pipeline or {}).get("pipeline_type", "standard") + if _pipeline_type != "escalation": + try: + models.record_task_return( + conn, + task_id=task_id, + reason_category="recurring_quality_fail", + reason_text=f"Gate {effective_last_role}: {_block_reason[:200]}", + returned_by=effective_last_role, + pipeline_id=pipeline["id"] if pipeline else None, + ) + except Exception: + pass # Never block pipeline on return tracking errors try: models.write_log( conn, pipeline["id"] if pipeline else None, diff --git a/agents/specialists.yaml b/agents/specialists.yaml index 2c740bf..61c7845 100644 --- a/agents/specialists.yaml +++ b/agents/specialists.yaml @@ -6,7 +6,7 @@ specialists: name: "Project Manager" model: sonnet tools: [Read, Grep, Glob] - description: "Decomposes tasks, selects specialists, builds pipelines" + description: "Decomposes tasks, selects specialists, builds pipelines. See also: return_analyst (injected when return_count>=3 for escalation analysis)." permissions: read_only context_rules: decisions: all @@ -333,6 +333,21 @@ specialists: streams: "array of { specialist, scope, bugs: array, priority: high|medium|low }" reintegration_checklist: "array of strings" + return_analyst: + name: "Return Analyst" + model: sonnet + tools: [Read, Grep, Glob] + description: "Analyses recurring task return patterns (full PM returns, not revisions), identifies root causes, refines brief, recommends escalation to dept_head. Standalone specialist — NOT a department worker. Injected by PM when return_count>=3. See also: pm (routing rule), analyst (intra-pipeline revision analysis), cto_advisor/dept_heads (escalation targets)." + permissions: read_only + context_rules: + decisions: all + output_schema: + status: "done | partial | blocked" + root_cause_analysis: string + refined_brief: string + clarification_list: "array of strings" + escalate_to_dept_head: "bool" + marketing_head: name: "Marketing Department Head" model: opus diff --git a/cli/main.py b/cli/main.py index 6b29e65..772164e 100644 --- a/cli/main.py +++ b/cli/main.py @@ -661,6 +661,20 @@ def run_task(ctx, task_id, dry_run, allow_write): raise SystemExit(1) if not isinstance(output, dict) or "pipeline" not in output: + # KIN-135: if PM returned status='blocked', record the return for escalation tracking + if isinstance(output, dict) and output.get("status") == "blocked": + try: + reason_category = output.get("return_category") or "missing_context" + reason_text = output.get("reason") or "" + models.record_task_return( + conn, + task_id=task_id, + reason_category=reason_category, + reason_text=reason_text[:500] if reason_text else None, + returned_by="pm", + ) + except Exception: + pass # Never block on return tracking errors click.echo(f"PM output missing 'pipeline' key:\n{json.dumps(output, indent=2)[:500]}", err=True) raise SystemExit(1) diff --git a/core/context_builder.py b/core/context_builder.py index 8d99668..9e29a48 100644 --- a/core/context_builder.py +++ b/core/context_builder.py @@ -70,6 +70,17 @@ def build_context( ctx["available_specialists"] = [] ctx["routes"] = {} ctx["departments"] = {} + # KIN-135: return history for escalation routing + try: + return_count = (task or {}).get("return_count") or 0 + ctx["return_count"] = return_count + if return_count > 0: + ctx["return_history"] = models.get_task_returns(conn, task_id, limit=5) + else: + ctx["return_history"] = [] + except Exception: + ctx["return_count"] = 0 + ctx["return_history"] = [] elif role == "architect": ctx["modules"] = models.get_modules(conn, project_id) @@ -151,6 +162,17 @@ def build_context( except Exception: pass + elif role == "return_analyst": + # KIN-135: return analyst needs full return history and decisions + ctx["decisions"] = models.get_decisions(conn, project_id) + try: + return_count = (task or {}).get("return_count") or 0 + ctx["return_count"] = return_count + ctx["return_history"] = models.get_task_returns(conn, task_id, limit=20) + except Exception: + ctx["return_count"] = 0 + ctx["return_history"] = [] + else: # Unknown role — give decisions as fallback ctx["decisions"] = models.get_decisions(conn, project_id, limit=20) @@ -297,6 +319,20 @@ def format_prompt(context: dict, role: str, prompt_template: str | None = None) sections.append(f"- {t['id']}: {t['title']} [{t['status']}]") sections.append("") + # Return history (PM) — KIN-135 + return_count = context.get("return_count", 0) + return_history = context.get("return_history") + if return_count and return_count > 0: + sections.append(f"## Return History (return_count={return_count}):") + if return_history: + for r in return_history: + reason_text = f" — {r['reason_text']}" if r.get("reason_text") else "" + sections.append( + f"- #{r['return_number']} [{r['reason_category']}]{reason_text} " + f"(returned_by={r.get('returned_by', 'system')}, at={r.get('returned_at', '')})" + ) + sections.append("") + # Available specialists (PM) specialists = context.get("available_specialists") if specialists: diff --git a/core/db.py b/core/db.py index e795fb6..25d201d 100644 --- a/core/db.py +++ b/core/db.py @@ -72,6 +72,7 @@ CREATE TABLE IF NOT EXISTS tasks ( telegram_sent BOOLEAN DEFAULT 0, acceptance_criteria TEXT, smoke_test_result JSON DEFAULT NULL, + return_count INTEGER DEFAULT 0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); @@ -151,6 +152,7 @@ CREATE TABLE IF NOT EXISTS pipelines ( parent_pipeline_id INTEGER REFERENCES pipelines(id), department TEXT, pid INTEGER, + pipeline_type TEXT DEFAULT 'standard', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, completed_at DATETIME ); @@ -326,6 +328,20 @@ CREATE TABLE IF NOT EXISTS pipeline_log ( ); CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id); + +-- История возвратов задачи к PM (KIN-135) +CREATE TABLE IF NOT EXISTS task_returns ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + return_number INTEGER NOT NULL, + reason_category TEXT NOT NULL, + reason_text TEXT, + returned_by TEXT DEFAULT 'system', + pipeline_id INTEGER REFERENCES pipelines(id), + returned_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX IF NOT EXISTS idx_task_returns_task_id ON task_returns(task_id); """ @@ -800,6 +816,39 @@ def _migrate(conn: sqlite3.Connection): conn.execute("ALTER TABLE tasks ADD COLUMN smoke_test_result JSON DEFAULT NULL") conn.commit() + # KIN-135: Add return_count to tasks — counts full returns to PM + task_cols_final3 = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()} + if "return_count" not in task_cols_final3: + conn.execute("ALTER TABLE tasks ADD COLUMN return_count INTEGER DEFAULT 0") + conn.commit() + + # KIN-135: Add pipeline_type to pipelines — distinguishes escalation pipelines + if "pipelines" in existing_tables: + pipeline_cols2 = {r[1] for r in conn.execute("PRAGMA table_info(pipelines)").fetchall()} + if "pipeline_type" not in pipeline_cols2: + conn.execute("ALTER TABLE pipelines ADD COLUMN pipeline_type TEXT DEFAULT 'standard'") + conn.commit() + + # KIN-135: Create task_returns table — return history per task + existing_tables2 = {r[0] for r in conn.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall()} + if "task_returns" not in existing_tables2: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS task_returns ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + return_number INTEGER NOT NULL, + reason_category TEXT NOT NULL, + reason_text TEXT, + returned_by TEXT DEFAULT 'system', + pipeline_id INTEGER REFERENCES pipelines(id), + returned_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + CREATE INDEX IF NOT EXISTS idx_task_returns_task_id ON task_returns(task_id); + """) + conn.commit() + def _seed_default_hooks(conn: sqlite3.Connection): """Seed default hooks for the kin project (idempotent). diff --git a/core/models.py b/core/models.py index b52b9d1..beb79f6 100644 --- a/core/models.py +++ b/core/models.py @@ -23,6 +23,16 @@ TASK_CATEGORIES = [ "ARCH", "TEST", "PERF", "DOCS", "FIX", "OBS", ] +# Valid categories for task returns (KIN-135) +RETURN_CATEGORIES = [ + "requirements_unclear", + "scope_too_large", + "technical_blocker", + "missing_context", + "recurring_quality_fail", + "conflicting_requirements", +] + def validate_completion_mode(value: str) -> str: """Validate completion mode from LLM output. Falls back to 'review' if invalid.""" @@ -1301,3 +1311,97 @@ def get_chat_messages( query += " ORDER BY created_at ASC, id ASC LIMIT ?" params.append(limit) return _rows_to_list(conn.execute(query, params).fetchall()) + + +# --------------------------------------------------------------------------- +# Task returns — escalation tracking (KIN-135) +# --------------------------------------------------------------------------- + +def record_task_return( + conn: sqlite3.Connection, + task_id: str, + reason_category: str, + reason_text: str | None = None, + returned_by: str = "system", + pipeline_id: int | None = None, +) -> dict: + """Record a task return to PM and increment return_count. + + reason_category must be one of RETURN_CATEGORIES; defaults to 'missing_context' + if an invalid value is supplied (fail-open). + Returns the inserted task_returns row as dict. + """ + if reason_category not in RETURN_CATEGORIES: + reason_category = "missing_context" + + # Determine the next return_number for this task + row = conn.execute( + "SELECT COALESCE(MAX(return_number), 0) FROM task_returns WHERE task_id = ?", + (task_id,), + ).fetchone() + next_number = (row[0] if row else 0) + 1 + + conn.execute( + """INSERT INTO task_returns + (task_id, return_number, reason_category, reason_text, returned_by, pipeline_id) + VALUES (?, ?, ?, ?, ?, ?)""", + (task_id, next_number, reason_category, reason_text, returned_by, pipeline_id), + ) + conn.execute( + "UPDATE tasks SET return_count = return_count + 1 WHERE id = ?", + (task_id,), + ) + conn.commit() + + inserted = conn.execute( + "SELECT * FROM task_returns WHERE task_id = ? ORDER BY id DESC LIMIT 1", + (task_id,), + ).fetchone() + return dict(inserted) if inserted else {} + + +def get_task_returns( + conn: sqlite3.Connection, + task_id: str, + limit: int = 20, +) -> list[dict]: + """Return task return history ordered by return_number ASC.""" + rows = conn.execute( + "SELECT * FROM task_returns WHERE task_id = ? ORDER BY return_number ASC LIMIT ?", + (task_id, limit), + ).fetchall() + return [dict(r) for r in rows] + + +def check_return_pattern( + conn: sqlite3.Connection, + task_id: str, +) -> dict: + """Analyse return history for a recurring pattern. + + Returns: + { + "pattern_detected": bool, # True if dominant_category appears >= 2 times + "dominant_category": str | None, + "occurrences": int, + } + """ + from collections import Counter + + rows = conn.execute( + "SELECT reason_category FROM task_returns WHERE task_id = ?", + (task_id,), + ).fetchall() + + if not rows: + return {"pattern_detected": False, "dominant_category": None, "occurrences": 0} + + counts = Counter(r[0] for r in rows) + dominant_category, occurrences = counts.most_common(1)[0] + pattern_detected = occurrences >= 2 + + return { + "pattern_detected": pattern_detected, + "dominant_category": dominant_category, + "occurrences": occurrences, + } diff --git a/tests/test_kin_135_escalation.py b/tests/test_kin_135_escalation.py new file mode 100644 index 0000000..b77a7bf --- /dev/null +++ b/tests/test_kin_135_escalation.py @@ -0,0 +1,566 @@ +""" +Tests for KIN-135: Task return escalation mechanism. + +Covers: +1. Models: record_task_return — counter increment, return_number sequencing, invalid category +2. Models: get_task_returns — history retrieval +3. Models: check_return_pattern — pattern detection, false positives prevention +4. Models: edge case — return_count behavior after task completion +5. Runner: _save_return_analyst_output — escalation pipeline creation +6. Runner: gate cannot_close records return in standard pipeline; skips in escalation pipeline +7. Context builder: return_count and return_history in PM / return_analyst context +""" + +import json +import pytest +from unittest.mock import patch, MagicMock + +from core.db import init_db +from core import models +from core.models import RETURN_CATEGORIES +from core.context_builder import build_context +from agents.runner import _save_return_analyst_output, run_pipeline + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def conn(): + """Fresh in-memory DB with a project and task for each test.""" + c = init_db(":memory:") + models.create_project(c, "p1", "Project One", "/tmp/p1", tech_stack=["python"]) + models.create_task(c, "P1-001", "p1", "Implement feature", + brief={"route_type": "feature"}) + yield c + c.close() + + +@pytest.fixture +def conn_autocomplete(): + """Fresh in-memory DB with auto_complete execution mode on the task.""" + c = init_db(":memory:") + models.create_project(c, "p1", "Project One", "/tmp/p1", tech_stack=["python"]) + models.create_task(c, "P1-001", "p1", "Implement feature", + brief={"route_type": "feature"}) + models.update_task(c, "P1-001", execution_mode="auto_complete") + yield c + c.close() + + +def _mock_subprocess(agent_output: dict) -> MagicMock: + """Build a subprocess.run mock returning agent_output in claude JSON format.""" + m = MagicMock() + m.stdout = json.dumps({"result": json.dumps(agent_output, ensure_ascii=False)}) + m.stderr = "" + m.returncode = 0 + return m + + +# =========================================================================== +# Section 1: record_task_return +# =========================================================================== + +class TestRecordTaskReturn: + + def test_increments_return_count_on_first_return(self, conn): + """After first return, task.return_count == 1.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 1 + + def test_increments_return_count_multiple_times(self, conn): + """After three returns with different categories, task.return_count == 3.""" + for cat in ["requirements_unclear", "missing_context", "technical_blocker"]: + models.record_task_return(conn, "P1-001", cat) + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 3 + + def test_assigns_sequential_return_numbers(self, conn): + """return_number is assigned as 1, 2, 3 for successive calls.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + models.record_task_return(conn, "P1-001", "technical_blocker") + returns = models.get_task_returns(conn, "P1-001") + assert [r["return_number"] for r in returns] == [1, 2, 3] + + def test_invalid_category_defaults_to_missing_context(self, conn): + """Unknown reason_category is silently coerced to 'missing_context' (fail-open).""" + result = models.record_task_return(conn, "P1-001", "totally_unknown_reason") + assert result["reason_category"] == "missing_context" + + def test_valid_category_stored_unchanged(self, conn): + """Valid reason_category is stored as-is.""" + result = models.record_task_return(conn, "P1-001", "scope_too_large", + reason_text="Too much scope") + assert result["reason_category"] == "scope_too_large" + + def test_reason_text_stored_correctly(self, conn): + """Provided reason_text is stored in the returned row.""" + result = models.record_task_return(conn, "P1-001", "missing_context", + reason_text="No spec attached") + assert result["reason_text"] == "No spec attached" + + def test_returned_by_stored_correctly(self, conn): + """returned_by value is stored in the returned row.""" + result = models.record_task_return(conn, "P1-001", "requirements_unclear", + returned_by="pm") + assert result["returned_by"] == "pm" + + def test_returns_dict_with_task_id_and_return_number(self, conn): + """Return value is a dict containing task_id and return_number.""" + result = models.record_task_return(conn, "P1-001", "requirements_unclear") + assert result["task_id"] == "P1-001" + assert result["return_number"] == 1 + + def test_return_numbers_independent_per_task(self, conn): + """Two different tasks have independent return_number sequences.""" + models.create_task(conn, "P1-002", "p1", "Another task") + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + models.record_task_return(conn, "P1-002", "technical_blocker") + + returns_001 = models.get_task_returns(conn, "P1-001") + returns_002 = models.get_task_returns(conn, "P1-002") + + assert [r["return_number"] for r in returns_001] == [1, 2] + assert returns_002[0]["return_number"] == 1 + + def test_all_valid_return_categories_accepted(self, conn): + """Every value in RETURN_CATEGORIES is accepted and stored without coercion.""" + models.create_task(conn, f"P1-CAT", "p1", "Category test task") + for i, cat in enumerate(RETURN_CATEGORIES): + result = models.record_task_return(conn, "P1-CAT", cat) + assert result["reason_category"] == cat, f"Category '{cat}' should be stored as-is" + + +# =========================================================================== +# Section 2: get_task_returns +# =========================================================================== + +class TestGetTaskReturns: + + def test_empty_for_task_with_no_returns(self, conn): + """Task without any returns → empty list.""" + result = models.get_task_returns(conn, "P1-001") + assert result == [] + + def test_returns_history_in_ascending_order(self, conn): + """Returns are ordered by return_number ASC.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "scope_too_large") + models.record_task_return(conn, "P1-001", "technical_blocker") + + returns = models.get_task_returns(conn, "P1-001") + assert len(returns) == 3 + assert returns[0]["reason_category"] == "requirements_unclear" + assert returns[1]["reason_category"] == "scope_too_large" + assert returns[2]["reason_category"] == "technical_blocker" + + def test_limit_parameter_caps_results(self, conn): + """limit parameter restricts the number of returned records.""" + for _ in range(5): + models.record_task_return(conn, "P1-001", "missing_context") + returns = models.get_task_returns(conn, "P1-001", limit=3) + assert len(returns) == 3 + + +# =========================================================================== +# Section 3: check_return_pattern +# =========================================================================== + +class TestCheckReturnPattern: + + def test_no_history_returns_no_pattern(self, conn): + """Task with no returns → pattern_detected=False, dominant_category=None.""" + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is False + assert result["dominant_category"] is None + assert result["occurrences"] == 0 + + def test_single_return_no_pattern(self, conn): + """One return → pattern_detected=False (needs >= 2 same category).""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is False + assert result["occurrences"] == 1 + + def test_same_category_twice_detects_pattern(self, conn): + """Two returns with same category → pattern_detected=True.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "requirements_unclear") + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is True + assert result["dominant_category"] == "requirements_unclear" + assert result["occurrences"] == 2 + + def test_same_category_three_times_detects_pattern(self, conn): + """Three returns with same category → pattern_detected=True, occurrences=3.""" + for _ in range(3): + models.record_task_return(conn, "P1-001", "technical_blocker") + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is True + assert result["dominant_category"] == "technical_blocker" + assert result["occurrences"] == 3 + + def test_all_different_categories_no_false_positive(self, conn): + """All different categories → no pattern detected (no false positives).""" + categories = [ + "requirements_unclear", "scope_too_large", "technical_blocker", + "missing_context", "recurring_quality_fail", + ] + for cat in categories: + models.record_task_return(conn, "P1-001", cat) + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is False + + def test_two_different_categories_no_false_positive(self, conn): + """Two returns with two different categories → no pattern (each appears once).""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "scope_too_large") + result = models.check_return_pattern(conn, "P1-001") + assert result["pattern_detected"] is False + + def test_dominant_category_is_most_frequent(self, conn): + """When multiple categories appear, dominant_category is the most frequent one.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + result = models.check_return_pattern(conn, "P1-001") + assert result["dominant_category"] == "requirements_unclear" + assert result["occurrences"] == 2 + + +# =========================================================================== +# Section 4: Edge case — return_count after task completion +# =========================================================================== + +class TestReturnCountAfterTaskCompletion: + + def test_return_count_not_reset_when_task_set_to_done(self, conn): + """return_count persists after task status transitions to 'done'. + + KIN-135 spec mentions a reset on success, but update_task() does not + implement this. This test documents the current (non-resetting) behavior. + """ + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + + models.update_task(conn, "P1-001", status="done") + task = models.get_task(conn, "P1-001") + + # Documents current behavior: counter persists (not reset to 0) + assert task["return_count"] == 2 + + def test_return_count_starts_at_zero_for_new_task(self, conn): + """Newly created task has return_count == 0.""" + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 0 + + +# =========================================================================== +# Section 5: _save_return_analyst_output +# =========================================================================== + +class TestSaveReturnAnalystOutput: + + def _make_analyst_result(self, escalate: bool, + root_cause: str = "Recurring scope issue", + refined_brief: str = "Focus on X only") -> dict: + """Build a mock return_analyst agent result dict.""" + output = { + "escalate_to_dept_head": escalate, + "root_cause_analysis": root_cause, + "refined_brief": refined_brief, + "pattern_summary": "3 returns with similar root cause", + } + return {"success": True, "raw_output": json.dumps(output)} + + def test_creates_escalation_pipeline_when_escalate_true(self, conn): + """escalate_to_dept_head=true → escalation pipeline created, escalated=True.""" + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["escalated"] is True + assert result["escalation_pipeline_id"] is not None + assert result["dept_head"] is not None + + def test_no_escalation_when_escalate_false(self, conn): + """escalate_to_dept_head=false → no escalation pipeline, escalated=False.""" + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=False), + ) + assert result["escalated"] is False + assert result["escalation_pipeline_id"] is None + assert result["dept_head"] is None + + def test_dept_head_sec_category_maps_to_security_head(self, conn): + """SEC task category → dept_head is security_head.""" + models.update_task(conn, "P1-001", category="SEC") + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "security_head" + + def test_dept_head_ui_category_maps_to_frontend_head(self, conn): + """UI task category → dept_head is frontend_head.""" + models.update_task(conn, "P1-001", category="UI") + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "frontend_head" + + def test_dept_head_api_category_maps_to_backend_head(self, conn): + """API task category → dept_head is backend_head.""" + models.update_task(conn, "P1-001", category="API") + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "backend_head" + + def test_dept_head_infra_category_maps_to_infra_head(self, conn): + """INFRA task category → dept_head is infra_head.""" + models.update_task(conn, "P1-001", category="INFRA") + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "infra_head" + + def test_unknown_category_defaults_to_cto_advisor(self, conn): + """Unknown/missing task category defaults to cto_advisor.""" + # Task category not set (None by default) + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + assert result["dept_head"] == "cto_advisor" + + def test_escalation_pipeline_has_pipeline_type_escalation_in_db(self, conn): + """Created escalation pipeline has pipeline_type='escalation' in the DB.""" + result = _save_return_analyst_output( + conn, "P1-001", "p1", + self._make_analyst_result(escalate=True), + ) + pipeline_id = result["escalation_pipeline_id"] + row = conn.execute( + "SELECT pipeline_type FROM pipelines WHERE id = ?", (pipeline_id,) + ).fetchone() + assert row["pipeline_type"] == "escalation" + + def test_no_escalation_when_output_is_invalid_json(self, conn): + """Non-parseable JSON output → fail-open, escalated=False.""" + bad_result = {"success": True, "raw_output": "not valid json {{{"} + result = _save_return_analyst_output(conn, "P1-001", "p1", bad_result) + assert result["escalated"] is False + + def test_no_escalation_when_output_is_empty(self, conn): + """Empty output → fail-open, escalated=False.""" + result = _save_return_analyst_output( + conn, "P1-001", "p1", + {"success": True, "raw_output": ""}, + ) + assert result["escalated"] is False + + def test_no_escalation_when_output_missing_field(self, conn): + """Output without escalate_to_dept_head field → escalated=False.""" + output = {"root_cause_analysis": "some analysis", "refined_brief": "refined"} + result = _save_return_analyst_output( + conn, "P1-001", "p1", + {"success": True, "raw_output": json.dumps(output)}, + ) + assert result["escalated"] is False + + def test_accepts_dict_raw_output(self, conn): + """raw_output as dict (not string) is handled via json.dumps path.""" + output_dict = { + "escalate_to_dept_head": True, + "root_cause_analysis": "cause", + "refined_brief": "brief", + } + result = _save_return_analyst_output( + conn, "P1-001", "p1", + {"success": True, "raw_output": output_dict}, + ) + assert result["escalated"] is True + + +# =========================================================================== +# Section 6: Gate cannot_close — return recording in pipeline +# =========================================================================== + +class TestGateCannotCloseRecordsReturn: + + @patch("core.followup.generate_followups") + @patch("agents.runner.run_hooks") + @patch("agents.runner.subprocess.run") + def test_gate_rejection_increments_return_count_in_standard_pipeline( + self, mock_run, mock_hooks, mock_followup, conn_autocomplete + ): + """Gate cannot_close in standard pipeline → task.return_count increases by 1.""" + conn = conn_autocomplete + mock_run.return_value = _mock_subprocess( + {"verdict": "changes_requested", "reason": "Missing tests"} + ) + mock_hooks.return_value = [] + mock_followup.return_value = {"created": [], "pending_actions": []} + + steps = [{"role": "reviewer", "brief": "review"}] + run_pipeline(conn, "P1-001", steps) + + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 1, ( + "Gate rejection in standard pipeline should increment return_count" + ) + + @patch("core.followup.generate_followups") + @patch("agents.runner.run_hooks") + @patch("agents.runner.subprocess.run") + def test_gate_approval_does_not_increment_return_count( + self, mock_run, mock_hooks, mock_followup, conn_autocomplete + ): + """Gate approved → task.return_count stays 0.""" + conn = conn_autocomplete + mock_run.return_value = _mock_subprocess( + {"verdict": "approved", "reason": ""} + ) + mock_hooks.return_value = [] + mock_followup.return_value = {"created": [], "pending_actions": []} + + steps = [{"role": "reviewer", "brief": "review"}] + run_pipeline(conn, "P1-001", steps) + + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 0, ( + "Gate approval should not increment return_count" + ) + + @patch("core.followup.generate_followups") + @patch("agents.runner.run_hooks") + @patch("agents.runner.subprocess.run") + def test_gate_rejection_records_return_with_recurring_quality_fail_category( + self, mock_run, mock_hooks, mock_followup, conn_autocomplete + ): + """Gate rejection uses 'recurring_quality_fail' as reason_category.""" + conn = conn_autocomplete + mock_run.return_value = _mock_subprocess( + {"verdict": "changes_requested", "reason": "Need unit tests"} + ) + mock_hooks.return_value = [] + mock_followup.return_value = {"created": [], "pending_actions": []} + + steps = [{"role": "reviewer", "brief": "review"}] + run_pipeline(conn, "P1-001", steps) + + returns = models.get_task_returns(conn, "P1-001") + assert len(returns) == 1 + assert returns[0]["reason_category"] == "recurring_quality_fail" + + @patch("core.followup.generate_followups") + @patch("agents.runner.run_hooks") + @patch("agents.runner.subprocess.run") + def test_gate_rejection_skips_return_recording_in_escalation_pipeline( + self, mock_run, mock_hooks, mock_followup, conn_autocomplete + ): + """Gate rejection in escalation pipeline → return_count NOT incremented. + + Escalation pipelines must not trigger further return tracking to avoid + infinite loops. + """ + conn = conn_autocomplete + mock_run.return_value = _mock_subprocess( + {"verdict": "changes_requested", "reason": "Still not good"} + ) + mock_hooks.return_value = [] + mock_followup.return_value = {"created": [], "pending_actions": []} + + # Intercept create_pipeline and update_pipeline to force pipeline_type='escalation' + _real_create = models.create_pipeline + _real_update = models.update_pipeline + + def _create_as_escalation(db, *args, **kwargs): + result = _real_create(db, *args, **kwargs) + db.execute( + "UPDATE pipelines SET pipeline_type = 'escalation' WHERE id = ?", + (result["id"],), + ) + db.commit() + return {**result, "pipeline_type": "escalation"} + + def _update_keeping_escalation(db, id, **kwargs): + result = _real_update(db, id, **kwargs) + return {**result, "pipeline_type": "escalation"} if result else result + + with patch("agents.runner.models.create_pipeline", side_effect=_create_as_escalation), \ + patch("agents.runner.models.update_pipeline", side_effect=_update_keeping_escalation): + steps = [{"role": "reviewer", "brief": "review"}] + run_pipeline(conn, "P1-001", steps) + + task = models.get_task(conn, "P1-001") + assert task["return_count"] == 0, ( + "Gate rejection in escalation pipeline must NOT increment return_count" + ) + + +# =========================================================================== +# Section 7: Context builder — return_count in PM / return_analyst context +# =========================================================================== + +class TestContextBuilderReturnHistory: + + def test_pm_context_has_return_count_zero_when_no_returns(self, conn): + """PM context includes return_count=0 when task has no returns.""" + ctx = build_context(conn, "P1-001", "pm", "p1") + assert ctx.get("return_count") == 0 + + def test_pm_context_has_empty_return_history_when_no_returns(self, conn): + """PM context includes return_history=[] when task has no returns.""" + ctx = build_context(conn, "P1-001", "pm", "p1") + assert ctx.get("return_history") == [] + + def test_pm_context_includes_return_count_when_returns_exist(self, conn): + """PM context reflects actual return_count when returns have been recorded.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "requirements_unclear") + models.record_task_return(conn, "P1-001", "missing_context") + + ctx = build_context(conn, "P1-001", "pm", "p1") + assert ctx["return_count"] == 3 + + def test_pm_context_includes_return_history_when_returns_exist(self, conn): + """PM context includes up to 5 most recent returns in return_history.""" + for _ in range(3): + models.record_task_return(conn, "P1-001", "requirements_unclear") + + ctx = build_context(conn, "P1-001", "pm", "p1") + assert len(ctx["return_history"]) == 3 + assert ctx["return_history"][0]["reason_category"] == "requirements_unclear" + + def test_return_analyst_context_has_return_count(self, conn): + """return_analyst context includes return_count reflecting actual returns.""" + models.record_task_return(conn, "P1-001", "scope_too_large") + models.record_task_return(conn, "P1-001", "scope_too_large") + + ctx = build_context(conn, "P1-001", "return_analyst", "p1") + assert ctx["return_count"] == 2 + + def test_return_analyst_context_has_full_return_history(self, conn): + """return_analyst context includes full return history (limit=20).""" + for i in range(7): + models.record_task_return(conn, "P1-001", "technical_blocker") + + ctx = build_context(conn, "P1-001", "return_analyst", "p1") + assert len(ctx["return_history"]) == 7 + + def test_non_pm_role_does_not_get_return_count(self, conn): + """Non-PM roles (e.g., debugger) do not get return_count in context.""" + models.record_task_return(conn, "P1-001", "requirements_unclear") + ctx = build_context(conn, "P1-001", "debugger", "p1") + # debugger context should not include return_count + assert "return_count" not in ctx diff --git a/tests/test_kin_docs_002_regression.py b/tests/test_kin_docs_002_regression.py index 0d1064e..dec7765 100644 --- a/tests/test_kin_docs_002_regression.py +++ b/tests/test_kin_docs_002_regression.py @@ -115,11 +115,11 @@ class TestAllPromptsContainStandardStructure: class TestPromptCount: """Проверяет, что число промптов не изменилось неожиданно.""" - def test_prompt_count_is_30(self): - """В agents/prompts/ ровно 30 файлов .md.""" + def test_prompt_count_is_31(self): + """В agents/prompts/ ровно 31 файл .md.""" count = len(_prompt_files()) - assert count == 30, ( # 30 промптов — актуально на 2026-03-19, +error_coordinator (KIN-DOCS-008, см. git log agents/prompts/) - f"Ожидалось 30 промптов, найдено {count}. " + assert count == 31, ( # 31 промпт — актуально на 2026-03-20, +return_analyst (KIN-135, см. git log agents/prompts/) + f"Ожидалось 31 промпт, найдено {count}. " "Если добавлен новый промпт — обнови этот тест." )