kin: KIN-135-backend_dev

This commit is contained in:
Gros Frumos 2026-03-20 21:56:46 +02:00
parent 4c01e0e4ee
commit 24be66d16c
9 changed files with 436 additions and 6 deletions

View file

@ -13,6 +13,7 @@ You receive:
- ACTIVE TASKS: currently in-progress tasks (avoid conflicts)
- AVAILABLE SPECIALISTS: roles you can assign
- ROUTE TEMPLATES: common pipeline patterns
- RETURN HISTORY (optional): return_count (int) — number of full returns to PM; returns: list of [{return_number, reason_category, reason_text, returned_at}]
## Working Mode
@ -73,6 +74,12 @@ You receive:
- При ≥2 взаимосвязанных ошибках в одном расследовании — вставляй `error_coordinator` первым шагом перед `debugger`. Используй route template `multi_bug_debug`
- Один баг или независимые баги → route `debug` (напрямую к debugger)
**Return escalation routing (KIN-135):**
- Если `return_count >= 3` (из RETURN HISTORY) — ОБЯЗАТЕЛЬНО вставь `return_analyst` первым шагом в pipeline. Это mandatory правило, не опциональное.
- `return_analyst` получает контекст через previous_output следующего шага — его `refined_brief` заменяет исходный brief для специалистов.
- Решение об эскалации до dept_head принимает сам `return_analyst` через поле `escalate_to_dept_head` в своём output — PM его не принимает самостоятельно.
**`completion_mode` rules (in priority order):**
1. If `project.execution_mode` is set — use it
@ -88,6 +95,7 @@ You receive:
- `relevant_decisions` IDs are correct and relevant to the specialist's work
- Department heads are used only for genuinely cross-domain complex tasks
- При задаче с ≥2 взаимосвязанными багами: `pipeline[0].role == error_coordinator`
- При `return_count >= 3`: `pipeline[0].role == return_analyst`
## Return Format
@ -132,7 +140,19 @@ Return ONLY valid JSON (no markdown, no explanation):
If you cannot plan the pipeline (task is completely ambiguous, no information to work with, or explicitly outside the system scope), return this JSON **instead of** the normal output:
```json
{"status": "blocked", "reason": "<clear explanation>", "blocked_at": "<ISO-8601 datetime>"}
{
"status": "blocked",
"reason": "<clear explanation>",
"return_category": "<one of: requirements_unclear | scope_too_large | technical_blocker | missing_context | conflicting_requirements>",
"blocked_at": "<ISO-8601 datetime>"
}
```
Use current datetime for `blocked_at`. Do NOT guess — return blocked immediately.
`return_category` is REQUIRED in every blocked response. Choose the most accurate category:
- `requirements_unclear` — task description is too vague to plan
- `scope_too_large` — task is too big to execute as a single pipeline
- `technical_blocker` — a specific technical dependency or constraint prevents planning
- `missing_context` — key information is absent (credentials, URLs, decision not documented)
- `conflicting_requirements` — the brief contains internally contradictory requirements

View file

@ -0,0 +1,77 @@
You are a Return Analyst for the Kin multi-agent orchestrator.
Your job: analyse why a task keeps returning to the PM (full pipeline returns, not intra-pipeline revisions), identify the root cause, produce a refined brief, and decide whether escalation to a department head is warranted.
You are activated when a task has been returned to the PM 3 or more times.
## Input
You receive:
- PROJECT: id, name, tech stack
- TASK: id, title, current brief, return_count
- RETURN HISTORY: list of return records [{return_number, reason_category, reason_text, returned_at, returned_by}]
- DECISIONS: known gotchas and conventions for this project
## Working Mode
0. Read RETURN HISTORY first — this is the essential input. All other analysis flows from it.
1. Study the pattern of `reason_category` values across all returns — what repeats?
2. Read `reason_text` fields for details — look for recurring themes, not just categories
3. Cross-reference known `decisions` — the root cause may already be documented
4. Identify the root cause: one precise, testable statement about WHY the task keeps returning
5. Formulate a `refined_brief` that directly resolves the identified root cause — concrete, specific, unambiguous
6. If key information is still missing, list it in `clarification_list`
7. Decide `escalate_to_dept_head`:
- `false`: root cause is solvable by a better brief (unclear requirements, missing context, scope mismatch)
- `true`: systemic issue that requires architectural coordination — e.g., recurring `recurring_quality_fail` that a better brief alone cannot fix
## Focus On
- Pattern over individual incidents — what category repeats the most?
- Root cause must be ONE sentence: precise, specific, not "it didn't work"
- `refined_brief` replaces the current brief entirely — it must be ready to use as-is
- Do NOT escalate for clarification issues — escalate only for structural/quality failures
## Quality Checks
- `root_cause_analysis` is ONE sentence — not a paragraph
- `refined_brief` is actionable and complete — a specialist can execute it without asking questions
- `escalate_to_dept_head` is `false` unless systemic structural evidence is present
- All fields are present in the output JSON
- `clarification_list` is an array (may be empty `[]`)
## Return Format
Return ONLY valid JSON (no markdown, no explanation):
```json
{
"status": "done",
"root_cause_analysis": "Одна точная формулировка коренной причины повторных возвратов",
"refined_brief": "Уточнённое и детализированное описание задачи, готовое для передачи специалисту",
"clarification_list": [],
"escalate_to_dept_head": false
}
```
If you cannot perform analysis (no return history, insufficient context), return:
```json
{"status": "blocked", "reason": "<clear explanation>", "blocked_at": "<ISO-8601 datetime>"}
```
## Constraints
- Do NOT implement anything yourself — analysis and refined brief only
- Do NOT use free-text clustering to detect patterns — use the structured `reason_category` field
- `clarification_list` must be an array — use `[]` if no clarifications needed
- `escalate_to_dept_head` must be a boolean (`true` or `false`), not a string
- Do NOT mark `escalate_to_dept_head: true` unless `recurring_quality_fail` appears ≥ 2 times or there is clear evidence of a structural problem
## Blocked Protocol
If context is insufficient:
```json
{"status": "blocked", "reason": "<clear explanation>", "blocked_at": "<ISO-8601 datetime>"}
```

View file

@ -1127,6 +1127,97 @@ def _save_tech_debt_output(
return {"created": True, "task_id": new_task_id}
# ---------------------------------------------------------------------------
# Return analyst output: handle escalation pipeline creation (KIN-135)
# ---------------------------------------------------------------------------
# Mapping from task category to escalation dept_head (KIN-135)
_CATEGORY_TO_DEPT_HEAD = {
"SEC": "security_head",
"UI": "frontend_head",
"API": "backend_head",
"BIZ": "backend_head",
"DB": "backend_head",
"PERF": "backend_head",
"ARCH": "cto_advisor",
"FIX": "cto_advisor",
"INFRA": "infra_head",
"OBS": "infra_head",
"TEST": "qa_head",
"DOCS": "cto_advisor",
}
def _save_return_analyst_output(
conn: sqlite3.Connection,
task_id: str,
project_id: str,
result: dict,
parent_pipeline_id: int | None = None,
) -> dict:
"""Parse return_analyst output and create escalation pipeline if needed.
If escalate_to_dept_head=true in analyst output:
- Determines target dept_head from task.category (defaults to cto_advisor)
- Creates a new escalation pipeline with pipeline_type='escalation'
- The dept_head step receives analyst output as initial context
Returns {"escalated": bool, "escalation_pipeline_id": int | None, "dept_head": str | None}.
Never raises escalation errors must never block the current pipeline.
"""
raw = result.get("raw_output") or result.get("output") or ""
if isinstance(raw, (dict, list)):
raw = json.dumps(raw, ensure_ascii=False)
parsed = _try_parse_json(raw)
if not isinstance(parsed, dict):
return {"escalated": False, "escalation_pipeline_id": None, "dept_head": None}
if not parsed.get("escalate_to_dept_head"):
return {"escalated": False, "escalation_pipeline_id": None, "dept_head": None}
# Determine target dept_head from task category
task = models.get_task(conn, task_id)
category = (task or {}).get("category") or ""
dept_head = _CATEGORY_TO_DEPT_HEAD.get(category.upper(), "cto_advisor")
# Build escalation pipeline steps: dept_head with analyst context as brief
analyst_summary = parsed.get("root_cause_analysis", "")
refined_brief = parsed.get("refined_brief", "")
escalation_brief = (
f"[ESCALATION from return_analyst]\n"
f"Root cause: {analyst_summary}\n"
f"Refined brief: {refined_brief}"
)
escalation_steps = [{"role": dept_head, "model": "opus", "brief": escalation_brief}]
try:
esc_pipeline = models.create_pipeline(
conn, task_id, project_id,
route_type="escalation",
steps=escalation_steps,
parent_pipeline_id=parent_pipeline_id,
)
# Mark pipeline_type as escalation so return tracking is skipped inside it
conn.execute(
"UPDATE pipelines SET pipeline_type = 'escalation' WHERE id = ?",
(esc_pipeline["id"],),
)
conn.commit()
_logger.info(
"KIN-135: escalation pipeline %s created for task %s%s",
esc_pipeline["id"], task_id, dept_head,
)
return {
"escalated": True,
"escalation_pipeline_id": esc_pipeline["id"],
"dept_head": dept_head,
}
except Exception as exc:
_logger.warning("KIN-135: escalation pipeline creation failed for %s: %s", task_id, exc)
return {"escalated": False, "escalation_pipeline_id": None, "dept_head": None}
# ---------------------------------------------------------------------------
# Auto-learning: extract decisions from pipeline results
# ---------------------------------------------------------------------------
@ -1978,6 +2069,16 @@ def run_pipeline(
except Exception:
pass # Never block pipeline on decomposer save errors
# Return analyst: create escalation pipeline if escalate_to_dept_head=true (KIN-135)
if role == "return_analyst" and result["success"] and not dry_run:
try:
_save_return_analyst_output(
conn, task_id, project_id, result,
parent_pipeline_id=pipeline["id"] if pipeline else None,
)
except Exception:
pass # Never block pipeline on analyst escalation errors
# Smoke tester: parse result and escalate if cannot_confirm (KIN-128)
if role == "smoke_tester" and result["success"] and not dry_run:
smoke_output = result.get("output") or result.get("raw_output") or ""
@ -2423,6 +2524,20 @@ def run_pipeline(
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
# KIN-135: record gate return — skip for escalation pipelines to avoid loops
_pipeline_type = (pipeline or {}).get("pipeline_type", "standard")
if _pipeline_type != "escalation":
try:
models.record_task_return(
conn,
task_id=task_id,
reason_category="recurring_quality_fail",
reason_text=f"Gate {effective_last_role}: {_block_reason[:200]}",
returned_by=effective_last_role,
pipeline_id=pipeline["id"] if pipeline else None,
)
except Exception:
pass # Never block pipeline on return tracking errors
try:
models.write_log(
conn, pipeline["id"] if pipeline else None,

View file

@ -6,7 +6,7 @@ specialists:
name: "Project Manager"
model: sonnet
tools: [Read, Grep, Glob]
description: "Decomposes tasks, selects specialists, builds pipelines"
description: "Decomposes tasks, selects specialists, builds pipelines. See also: return_analyst (injected when return_count>=3 for escalation analysis)."
permissions: read_only
context_rules:
decisions: all
@ -333,6 +333,21 @@ specialists:
streams: "array of { specialist, scope, bugs: array, priority: high|medium|low }"
reintegration_checklist: "array of strings"
return_analyst:
name: "Return Analyst"
model: sonnet
tools: [Read, Grep, Glob]
description: "Analyses recurring task return patterns (full PM returns, not revisions), identifies root causes, refines brief, recommends escalation to dept_head. Standalone specialist — NOT a department worker. Injected by PM when return_count>=3. See also: pm (routing rule), analyst (intra-pipeline revision analysis), cto_advisor/dept_heads (escalation targets)."
permissions: read_only
context_rules:
decisions: all
output_schema:
status: "done | partial | blocked"
root_cause_analysis: string
refined_brief: string
clarification_list: "array of strings"
escalate_to_dept_head: "bool"
marketing_head:
name: "Marketing Department Head"
model: opus

View file

@ -661,6 +661,20 @@ def run_task(ctx, task_id, dry_run, allow_write):
raise SystemExit(1)
if not isinstance(output, dict) or "pipeline" not in output:
# KIN-135: if PM returned status='blocked', record the return for escalation tracking
if isinstance(output, dict) and output.get("status") == "blocked":
try:
reason_category = output.get("return_category") or "missing_context"
reason_text = output.get("reason") or ""
models.record_task_return(
conn,
task_id=task_id,
reason_category=reason_category,
reason_text=reason_text[:500] if reason_text else None,
returned_by="pm",
)
except Exception:
pass # Never block on return tracking errors
click.echo(f"PM output missing 'pipeline' key:\n{json.dumps(output, indent=2)[:500]}", err=True)
raise SystemExit(1)

View file

@ -70,6 +70,17 @@ def build_context(
ctx["available_specialists"] = []
ctx["routes"] = {}
ctx["departments"] = {}
# KIN-135: return history for escalation routing
try:
return_count = (task or {}).get("return_count") or 0
ctx["return_count"] = return_count
if return_count > 0:
ctx["return_history"] = models.get_task_returns(conn, task_id, limit=5)
else:
ctx["return_history"] = []
except Exception:
ctx["return_count"] = 0
ctx["return_history"] = []
elif role == "architect":
ctx["modules"] = models.get_modules(conn, project_id)
@ -151,6 +162,17 @@ def build_context(
except Exception:
pass
elif role == "return_analyst":
# KIN-135: return analyst needs full return history and decisions
ctx["decisions"] = models.get_decisions(conn, project_id)
try:
return_count = (task or {}).get("return_count") or 0
ctx["return_count"] = return_count
ctx["return_history"] = models.get_task_returns(conn, task_id, limit=20)
except Exception:
ctx["return_count"] = 0
ctx["return_history"] = []
else:
# Unknown role — give decisions as fallback
ctx["decisions"] = models.get_decisions(conn, project_id, limit=20)
@ -297,6 +319,20 @@ def format_prompt(context: dict, role: str, prompt_template: str | None = None)
sections.append(f"- {t['id']}: {t['title']} [{t['status']}]")
sections.append("")
# Return history (PM) — KIN-135
return_count = context.get("return_count", 0)
return_history = context.get("return_history")
if return_count and return_count > 0:
sections.append(f"## Return History (return_count={return_count}):")
if return_history:
for r in return_history:
reason_text = f"{r['reason_text']}" if r.get("reason_text") else ""
sections.append(
f"- #{r['return_number']} [{r['reason_category']}]{reason_text} "
f"(returned_by={r.get('returned_by', 'system')}, at={r.get('returned_at', '')})"
)
sections.append("")
# Available specialists (PM)
specialists = context.get("available_specialists")
if specialists:

View file

@ -72,6 +72,7 @@ CREATE TABLE IF NOT EXISTS tasks (
telegram_sent BOOLEAN DEFAULT 0,
acceptance_criteria TEXT,
smoke_test_result JSON DEFAULT NULL,
return_count INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
@ -151,6 +152,7 @@ CREATE TABLE IF NOT EXISTS pipelines (
parent_pipeline_id INTEGER REFERENCES pipelines(id),
department TEXT,
pid INTEGER,
pipeline_type TEXT DEFAULT 'standard',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
completed_at DATETIME
);
@ -326,6 +328,20 @@ CREATE TABLE IF NOT EXISTS pipeline_log (
);
CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id);
-- История возвратов задачи к PM (KIN-135)
CREATE TABLE IF NOT EXISTS task_returns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
return_number INTEGER NOT NULL,
reason_category TEXT NOT NULL,
reason_text TEXT,
returned_by TEXT DEFAULT 'system',
pipeline_id INTEGER REFERENCES pipelines(id),
returned_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_task_returns_task_id ON task_returns(task_id);
"""
@ -800,6 +816,39 @@ def _migrate(conn: sqlite3.Connection):
conn.execute("ALTER TABLE tasks ADD COLUMN smoke_test_result JSON DEFAULT NULL")
conn.commit()
# KIN-135: Add return_count to tasks — counts full returns to PM
task_cols_final3 = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
if "return_count" not in task_cols_final3:
conn.execute("ALTER TABLE tasks ADD COLUMN return_count INTEGER DEFAULT 0")
conn.commit()
# KIN-135: Add pipeline_type to pipelines — distinguishes escalation pipelines
if "pipelines" in existing_tables:
pipeline_cols2 = {r[1] for r in conn.execute("PRAGMA table_info(pipelines)").fetchall()}
if "pipeline_type" not in pipeline_cols2:
conn.execute("ALTER TABLE pipelines ADD COLUMN pipeline_type TEXT DEFAULT 'standard'")
conn.commit()
# KIN-135: Create task_returns table — return history per task
existing_tables2 = {r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()}
if "task_returns" not in existing_tables2:
conn.executescript("""
CREATE TABLE IF NOT EXISTS task_returns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
return_number INTEGER NOT NULL,
reason_category TEXT NOT NULL,
reason_text TEXT,
returned_by TEXT DEFAULT 'system',
pipeline_id INTEGER REFERENCES pipelines(id),
returned_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_task_returns_task_id ON task_returns(task_id);
""")
conn.commit()
def _seed_default_hooks(conn: sqlite3.Connection):
"""Seed default hooks for the kin project (idempotent).

View file

@ -23,6 +23,16 @@ TASK_CATEGORIES = [
"ARCH", "TEST", "PERF", "DOCS", "FIX", "OBS",
]
# Valid categories for task returns (KIN-135)
RETURN_CATEGORIES = [
"requirements_unclear",
"scope_too_large",
"technical_blocker",
"missing_context",
"recurring_quality_fail",
"conflicting_requirements",
]
def validate_completion_mode(value: str) -> str:
"""Validate completion mode from LLM output. Falls back to 'review' if invalid."""
@ -1301,3 +1311,97 @@ def get_chat_messages(
query += " ORDER BY created_at ASC, id ASC LIMIT ?"
params.append(limit)
return _rows_to_list(conn.execute(query, params).fetchall())
# ---------------------------------------------------------------------------
# Task returns — escalation tracking (KIN-135)
# ---------------------------------------------------------------------------
def record_task_return(
conn: sqlite3.Connection,
task_id: str,
reason_category: str,
reason_text: str | None = None,
returned_by: str = "system",
pipeline_id: int | None = None,
) -> dict:
"""Record a task return to PM and increment return_count.
reason_category must be one of RETURN_CATEGORIES; defaults to 'missing_context'
if an invalid value is supplied (fail-open).
Returns the inserted task_returns row as dict.
"""
if reason_category not in RETURN_CATEGORIES:
reason_category = "missing_context"
# Determine the next return_number for this task
row = conn.execute(
"SELECT COALESCE(MAX(return_number), 0) FROM task_returns WHERE task_id = ?",
(task_id,),
).fetchone()
next_number = (row[0] if row else 0) + 1
conn.execute(
"""INSERT INTO task_returns
(task_id, return_number, reason_category, reason_text, returned_by, pipeline_id)
VALUES (?, ?, ?, ?, ?, ?)""",
(task_id, next_number, reason_category, reason_text, returned_by, pipeline_id),
)
conn.execute(
"UPDATE tasks SET return_count = return_count + 1 WHERE id = ?",
(task_id,),
)
conn.commit()
inserted = conn.execute(
"SELECT * FROM task_returns WHERE task_id = ? ORDER BY id DESC LIMIT 1",
(task_id,),
).fetchone()
return dict(inserted) if inserted else {}
def get_task_returns(
conn: sqlite3.Connection,
task_id: str,
limit: int = 20,
) -> list[dict]:
"""Return task return history ordered by return_number ASC."""
rows = conn.execute(
"SELECT * FROM task_returns WHERE task_id = ? ORDER BY return_number ASC LIMIT ?",
(task_id, limit),
).fetchall()
return [dict(r) for r in rows]
def check_return_pattern(
conn: sqlite3.Connection,
task_id: str,
) -> dict:
"""Analyse return history for a recurring pattern.
Returns:
{
"pattern_detected": bool, # True if dominant_category appears >= 2 times
"dominant_category": str | None,
"occurrences": int,
}
"""
from collections import Counter
rows = conn.execute(
"SELECT reason_category FROM task_returns WHERE task_id = ?",
(task_id,),
).fetchall()
if not rows:
return {"pattern_detected": False, "dominant_category": None, "occurrences": 0}
counts = Counter(r[0] for r in rows)
dominant_category, occurrences = counts.most_common(1)[0]
pattern_detected = occurrences >= 2
return {
"pattern_detected": pattern_detected,
"dominant_category": dominant_category,
"occurrences": occurrences,
}

View file

@ -115,11 +115,11 @@ class TestAllPromptsContainStandardStructure:
class TestPromptCount:
"""Проверяет, что число промптов не изменилось неожиданно."""
def test_prompt_count_is_30(self):
"""В agents/prompts/ ровно 30 файлов .md."""
def test_prompt_count_is_31(self):
"""В agents/prompts/ ровно 31 файл .md."""
count = len(_prompt_files())
assert count == 30, ( # 30 промптов — актуально на 2026-03-19, +error_coordinator (KIN-DOCS-008, см. git log agents/prompts/)
f"Ожидалось 30 промптов, найдено {count}. "
assert count == 31, ( # 31 промпт — актуально на 2026-03-20, +return_analyst (KIN-135, см. git log agents/prompts/)
f"Ожидалось 31 промпт, найдено {count}. "
"Если добавлен новый промпт — обнови этот тест."
)