kin/agents/runner.py

2807 lines
110 KiB
Python
Raw Normal View History

"""
Kin agent runner launches Claude Code as subprocess with role-specific context.
Each agent = separate process with isolated context.
"""
2026-03-17 16:36:52 +02:00
import errno as _errno
import json
import logging
import os
import re
2026-03-17 15:40:31 +02:00
import shlex
import shutil
import sqlite3
2026-03-17 21:25:12 +02:00
import sys
import subprocess
import time
from pathlib import Path
from typing import Any
_logger = logging.getLogger("kin.runner")
# Extra PATH entries to inject when searching for claude CLI.
# launchctl daemons start with a stripped PATH that may omit these.
_EXTRA_PATH_DIRS = [
"/opt/homebrew/bin",
"/opt/homebrew/sbin",
"/usr/local/bin",
"/usr/local/sbin",
]
2026-03-17 14:03:53 +02:00
# Default timeouts per model (seconds). Override globally with KIN_AGENT_TIMEOUT
# or per role via timeout_seconds in specialists.yaml.
_MODEL_TIMEOUTS = {
2026-03-17 22:09:29 +02:00
"opus": 2400, # 40 min
"sonnet": 2400, # 40 min
"haiku": 1200, # 20 min
2026-03-17 14:03:53 +02:00
}
2026-03-21 08:18:11 +02:00
# KIN-136: auto-return — max times a task can be auto-returned before escalating to human.
# Override via KIN_AUTO_RETURN_MAX env var.
_AUTO_RETURN_MAX: int = int(os.environ.get("KIN_AUTO_RETURN_MAX") or 3)
# KIN-136: valid exit_condition values that force human escalation instead of auto-return.
_EXIT_CONDITIONS = frozenset({"login_required", "missing_data", "strategic_decision"})
def _build_claude_env() -> dict:
"""Return an env dict with an extended PATH that includes common CLI tool locations.
Merges _EXTRA_PATH_DIRS with the current process PATH, deduplicating entries.
Also resolves ~/.nvm/versions/node/*/bin globs that launchctl may not expand.
"""
env = os.environ.copy()
2026-03-18 22:28:16 +02:00
# Propagate canonical DB path to agent subprocesses so they don't fall back
# to module-relative path (which breaks in worktrees — KIN-129).
if "KIN_DB_PATH" not in env:
env["KIN_DB_PATH"] = str(Path.home() / ".kin" / "kin.db")
existing = env.get("PATH", "").split(":")
extra = list(_EXTRA_PATH_DIRS)
# Expand nvm node bin dirs dynamically
nvm_root = Path.home() / ".nvm" / "versions" / "node"
if nvm_root.is_dir():
for node_ver in sorted(nvm_root.iterdir(), reverse=True):
bin_dir = node_ver / "bin"
if bin_dir.is_dir():
extra.append(str(bin_dir))
seen: set[str] = set()
deduped: list[str] = []
for d in extra + existing:
if d and d not in seen:
seen.add(d)
deduped.append(d)
env["PATH"] = ":".join(deduped)
# Ensure SSH agent is available for agents that connect via SSH.
# Under launchd, SSH_AUTH_SOCK is not inherited — detect macOS system socket.
if "SSH_AUTH_SOCK" not in env:
import glob
socks = glob.glob("/private/tmp/com.apple.launchd.*/Listeners")
if socks:
env["SSH_AUTH_SOCK"] = socks[0]
return env
def _resolve_claude_cmd() -> str:
"""Return the full path to the claude CLI, or 'claude' as fallback."""
extended_env = _build_claude_env()
found = shutil.which("claude", path=extended_env["PATH"])
return found or "claude"
from core import models
from core.context_builder import build_context, format_prompt
from core.hooks import run_hooks
class ClaudeAuthError(Exception):
"""Raised when Claude CLI is not authenticated or not available."""
pass
def check_claude_auth(timeout: int = 10) -> None:
"""Check that claude CLI is authenticated before running a pipeline.
Runs: claude -p 'ok' --output-format json with timeout.
Returns None if auth is confirmed.
Raises ClaudeAuthError if:
- claude CLI not found in PATH (FileNotFoundError)
- stdout/stderr contains 'not logged in' (case-insensitive)
- returncode != 0
- is_error=true in parsed JSON output
Returns silently on TimeoutExpired (ambiguous don't block pipeline).
"""
claude_cmd = _resolve_claude_cmd()
env = _build_claude_env()
try:
proc = subprocess.run(
[claude_cmd, "-p", "ok", "--output-format", "json"],
capture_output=True,
text=True,
timeout=timeout,
env=env,
stdin=subprocess.DEVNULL,
)
except FileNotFoundError:
raise ClaudeAuthError("claude CLI not found in PATH. Install it or add to PATH.")
except subprocess.TimeoutExpired:
return # Ambiguous — don't block pipeline on timeout
stdout = proc.stdout or ""
stderr = proc.stderr or ""
combined = stdout + stderr
if "not logged in" in combined.lower():
raise ClaudeAuthError("Claude CLI requires login. Run: claude login")
if proc.returncode != 0:
raise ClaudeAuthError("Claude CLI requires login. Run: claude login")
parsed = _try_parse_json(stdout)
if isinstance(parsed, dict) and parsed.get("is_error"):
raise ClaudeAuthError("Claude CLI requires login. Run: claude login")
def run_agent(
conn: sqlite3.Connection,
role: str,
task_id: str,
project_id: str,
model: str = "sonnet",
previous_output: str | None = None,
brief_override: str | None = None,
dry_run: bool = False,
allow_write: bool = False,
noninteractive: bool = False,
working_dir_override: str | None = None,
) -> dict:
"""Run a single Claude Code agent as a subprocess.
1. Build context from DB
2. Format prompt with role template
3. Run: claude -p "{prompt}" --output-format json
4. Log result to agent_logs
5. Return {success, output, tokens_used, duration_seconds, cost_usd}
"""
# Build context
ctx = build_context(conn, task_id, role, project_id)
if previous_output:
ctx["previous_output"] = previous_output
if brief_override:
if ctx.get("task"):
ctx["task"]["brief"] = brief_override
prompt = format_prompt(ctx, role)
if dry_run:
return {
"success": True,
"output": None,
"prompt": prompt,
"role": role,
"model": model,
"dry_run": True,
}
# Determine working directory
project = models.get_project(conn, project_id)
working_dir = None
# Operations projects have no local path — sysadmin works via SSH
is_operations = project and project.get("project_type") == "operations"
if working_dir_override:
working_dir = working_dir_override
elif not is_operations and project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security", "constitution", "spec", "task_decomposer"):
project_path = Path(project["path"]).expanduser()
if project_path.is_dir():
working_dir = str(project_path)
2026-03-17 14:03:53 +02:00
# Determine timeout: role-specific (specialists.yaml) > model-based > default
role_timeout = None
try:
from core.context_builder import _load_specialists
specs = _load_specialists().get("specialists", {})
role_spec = specs.get(role, {})
if role_spec.get("timeout_seconds"):
role_timeout = int(role_spec["timeout_seconds"])
except Exception:
pass
# Run claude subprocess
start = time.monotonic()
result = _run_claude(prompt, model=model, working_dir=working_dir,
2026-03-17 14:03:53 +02:00
allow_write=allow_write, noninteractive=noninteractive,
timeout=role_timeout)
duration = int(time.monotonic() - start)
# Parse output — ensure output_text is always a string for DB storage
raw_output = result.get("output", "")
if not isinstance(raw_output, str):
raw_output = json.dumps(raw_output, ensure_ascii=False)
output_text = raw_output
success = result["returncode"] == 0
parsed_output = _try_parse_json(output_text)
# Log FULL output to DB (no truncation)
models.log_agent_run(
conn,
project_id=project_id,
task_id=task_id,
agent_role=role,
action="execute",
input_summary=f"task={task_id}, model={model}",
output_summary=output_text or None,
tokens_used=result.get("tokens_used"),
model=model,
cost_usd=result.get("cost_usd"),
success=success,
error_message=result.get("error") if not success else None,
duration_seconds=duration,
)
return {
"success": success,
"error": result.get("error") if not success else None,
2026-03-17 22:28:04 +02:00
"output": output_text if parsed_output is None else parsed_output,
"raw_output": output_text,
"role": role,
"model": model,
"duration_seconds": duration,
"tokens_used": result.get("tokens_used"),
"cost_usd": result.get("cost_usd"),
}
def _run_claude(
prompt: str,
model: str = "sonnet",
working_dir: str | None = None,
allow_write: bool = False,
noninteractive: bool = False,
timeout: int | None = None,
) -> dict:
"""Execute claude CLI as subprocess. Returns dict with output, returncode, etc."""
claude_cmd = _resolve_claude_cmd()
cmd = [
claude_cmd,
"-p", prompt,
"--output-format", "json",
"--model", model,
]
is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1"
2026-03-18 00:52:44 +02:00
if allow_write or is_noninteractive:
2026-03-17 23:31:24 +02:00
cmd.append("--dangerously-skip-permissions")
if timeout is None:
2026-03-17 14:03:53 +02:00
env_timeout = os.environ.get("KIN_AGENT_TIMEOUT")
if env_timeout:
timeout = int(env_timeout)
else:
timeout = _MODEL_TIMEOUTS.get(model, _MODEL_TIMEOUTS["sonnet"])
env = _build_claude_env()
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
cwd=working_dir,
env=env,
stdin=subprocess.DEVNULL if is_noninteractive else None,
)
except FileNotFoundError:
return {
"output": "",
"error": "claude CLI not found in PATH",
"returncode": 127,
}
except subprocess.TimeoutExpired:
return {
"output": "",
"error": f"Agent timed out after {timeout}s",
"returncode": 124,
}
# Always preserve the full raw stdout
raw_stdout = proc.stdout or ""
result: dict[str, Any] = {
"output": raw_stdout,
"error": proc.stderr or None, # preserve stderr always for diagnostics
"empty_output": not raw_stdout.strip(),
"returncode": proc.returncode,
}
# Parse JSON wrapper from claude --output-format json
# Extract metadata (tokens, cost) but keep output as the full content string
parsed = _try_parse_json(raw_stdout)
if isinstance(parsed, dict):
result["tokens_used"] = parsed.get("usage", {}).get("total_tokens")
result["cost_usd"] = parsed.get("cost_usd")
# Extract the agent's actual response, converting to string if needed
2026-03-17 22:28:04 +02:00
result_val = parsed.get("result")
content = result_val if result_val is not None else parsed.get("content")
if content is not None:
result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False)
return result
def _try_parse_json(text: str) -> Any:
"""Try to parse JSON from text. Returns parsed obj or None."""
text = text.strip()
if not text:
return None
# Direct parse
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try to find JSON block in markdown code fences
import re
m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
# Try to find first { ... } or [ ... ]
for start_char, end_char in [("{", "}"), ("[", "]")]:
start = text.find(start_char)
if start >= 0:
# Find matching close
depth = 0
for i in range(start, len(text)):
if text[i] == start_char:
depth += 1
elif text[i] == end_char:
depth -= 1
if depth == 0:
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
break
return None
# ---------------------------------------------------------------------------
# Backlog audit
# ---------------------------------------------------------------------------
PROMPTS_DIR = Path(__file__).parent / "prompts"
_LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish",
"de": "German", "fr": "French"}
def run_audit(
conn: sqlite3.Connection,
project_id: str,
noninteractive: bool = False,
auto_apply: bool = False,
) -> dict:
"""Audit pending tasks against the actual codebase.
auto_apply=True: marks already_done tasks as done in DB.
auto_apply=False: returns results only (for API/GUI).
Returns {success, already_done, still_pending, unclear, duration_seconds, ...}
"""
project = models.get_project(conn, project_id)
if not project:
return {"success": False, "error": f"Project '{project_id}' not found"}
pending = models.list_tasks(conn, project_id=project_id, status="pending")
if not pending:
return {
"success": True,
"already_done": [],
"still_pending": [],
"unclear": [],
"message": "No pending tasks to audit",
}
# Build prompt
prompt_path = PROMPTS_DIR / "backlog_audit.md"
template = prompt_path.read_text() if prompt_path.exists() else (
"You are a QA analyst. Check if pending tasks are already done in the code."
)
task_list = [
{"id": t["id"], "title": t["title"], "brief": t.get("brief")}
for t in pending
]
sections = [
template,
"",
f"## Project: {project['id']}{project['name']}",
]
if project.get("tech_stack"):
sections.append(f"Tech stack: {', '.join(project['tech_stack'])}")
sections.append(f"Path: {project['path']}")
sections.append("")
sections.append(f"## Pending tasks ({len(task_list)}):")
sections.append(json.dumps(task_list, ensure_ascii=False, indent=2))
sections.append("")
language = project.get("language", "ru")
lang_name = _LANG_NAMES.get(language, language)
sections.append("## Language")
sections.append(f"ALWAYS respond in {lang_name}.")
sections.append("")
prompt = "\n".join(sections)
# Determine working dir
working_dir = None
project_path = Path(project["path"]).expanduser()
if project_path.is_dir():
working_dir = str(project_path)
# Run agent — allow_write=True so claude can use Read/Bash tools
# without interactive permission prompts (critical for noninteractive mode)
start = time.monotonic()
result = _run_claude(prompt, model="sonnet", working_dir=working_dir,
allow_write=True, noninteractive=noninteractive)
duration = int(time.monotonic() - start)
raw_output = result.get("output", "")
if not isinstance(raw_output, str):
raw_output = json.dumps(raw_output, ensure_ascii=False)
success = result["returncode"] == 0
# Log to agent_logs
models.log_agent_run(
conn,
project_id=project_id,
task_id=None,
agent_role="backlog_audit",
action="audit",
input_summary=f"project={project_id}, pending_tasks={len(pending)}",
output_summary=raw_output or None,
tokens_used=result.get("tokens_used"),
model="sonnet",
cost_usd=result.get("cost_usd"),
success=success,
error_message=result.get("error") if not success else None,
duration_seconds=duration,
)
if not success:
return {
"success": False,
"error": result.get("error", "Agent failed"),
"raw_output": raw_output,
"duration_seconds": duration,
}
# Parse structured output
parsed = _try_parse_json(raw_output)
if not isinstance(parsed, dict):
return {
"success": False,
"error": "Agent returned non-JSON output",
"raw_output": raw_output,
"duration_seconds": duration,
}
already_done = parsed.get("already_done", [])
# Auto-apply: mark already_done tasks as done in DB
applied = []
if auto_apply and already_done:
for item in already_done:
tid = item.get("id")
if tid:
t = models.get_task(conn, tid)
if t and t["project_id"] == project_id and t["status"] == "pending":
models.update_task(conn, tid, status="done")
applied.append(tid)
return {
"success": True,
"already_done": already_done,
"still_pending": parsed.get("still_pending", []),
"unclear": parsed.get("unclear", []),
"applied": applied,
"duration_seconds": duration,
"tokens_used": result.get("tokens_used"),
"cost_usd": result.get("cost_usd"),
}
# ---------------------------------------------------------------------------
# Blocked protocol detection
# ---------------------------------------------------------------------------
def _parse_agent_blocked(result: dict) -> dict | None:
"""Detect semantic blocked status from a successful agent result.
Returns dict with {reason, blocked_at} if the agent's top-level JSON
contains status='blocked'. Returns None otherwise.
Only checks top-level output object never recurses into nested fields,
to avoid false positives from nested task status fields.
"""
from datetime import datetime
if not result.get("success"):
return None
output = result.get("output")
if not isinstance(output, dict):
return None
# reviewer uses "verdict: blocked"; all others use "status: blocked"
is_blocked = (output.get("status") == "blocked" or output.get("verdict") == "blocked")
if not is_blocked:
return None
return {
"reason": output.get("reason") or output.get("blocked_reason") or "",
"blocked_at": output.get("blocked_at") or datetime.now().isoformat(),
}
2026-03-19 15:50:52 +02:00
# ---------------------------------------------------------------------------
# Gate cannot-close detection (KIN-133)
# ---------------------------------------------------------------------------
def _find_gate_result(results: list[dict], role: str) -> dict | None:
"""Return the last successful result for the given gate role.
Iterates results in reverse order to handle auto_fix retry loops where
tester may appear multiple times; returns the latest successful attempt.
"""
for r in reversed(results):
if r.get("role") == role and r.get("success"):
return r
return None
def _parse_gate_cannot_close(result: dict, role: str) -> dict | None:
"""Detect gate rejection from a final gate agent (reviewer or tester).
Returns dict with {reason} if the gate agent signals the task must NOT
be closed. Returns None if the gate approves or if output format is
unrecognised (fail-open: never block on unknown formats).
Reviewer: verdict must be 'approved'; anything else ('changes_requested',
'revise', 'blocked') is a rejection. Note: 'blocked' is handled earlier
by _parse_agent_blocked(), but if it somehow reaches here we treat it as
cannot_close too.
Tester: status must be 'passed'; 'failed' or 'blocked' are rejections.
"""
from datetime import datetime
output = result.get("output")
if not isinstance(output, dict):
return None # fail-open: non-dict output → don't block
if role == "reviewer":
verdict = output.get("verdict")
if verdict is None:
return None # fail-open: no verdict field
if verdict == "approved":
return None # approved → close
reason = output.get("reason") or output.get("summary") or f"Reviewer verdict: {verdict}"
return {"reason": reason}
if role == "tester":
status = output.get("status")
if status is None:
return None # fail-open: no status field
if status == "passed":
return None # passed → close
reason = output.get("reason") or output.get("blocked_reason") or f"Tester status: {status}"
return {"reason": reason}
return None # unknown gate role → fail-open
2026-03-21 08:18:11 +02:00
# ---------------------------------------------------------------------------
# Auto-return helpers (KIN-136)
# ---------------------------------------------------------------------------
def _parse_exit_condition(gate_result: dict, role: str) -> str | None:
"""Extract exit_condition from gate agent output.
Returns one of _EXIT_CONDITIONS values, or None if absent/invalid/unsupported.
Fail-open: invalid or unknown values are treated as None triggers auto-return.
Only reviewer output supports exit_condition; tester always returns None.
"""
if role != "reviewer":
return None
output = gate_result.get("output")
if not isinstance(output, dict):
return None
raw = output.get("exit_condition")
if raw in _EXIT_CONDITIONS:
return raw
if raw is not None:
_logger.warning(
"KIN-136: unknown exit_condition %r from reviewer — treating as None (auto-return)",
raw,
)
return None
def _trigger_auto_return(
conn: "sqlite3.Connection",
task_id: str,
project_id: str,
pipeline: dict | None,
original_steps: list[dict],
gate_role: str,
gate_reason: str,
allow_write: bool,
noninteractive: bool,
gate_output_json: str | None = None,
) -> dict:
"""Attempt auto-return: re-run the pipeline with return_analyst prepended.
Steps:
(a) Check return_count against _AUTO_RETURN_MAX threshold escalate if exceeded.
(b) Record the task return.
(c) Mark current pipeline failed; set task status to 'revising'.
(d) Spawn new pipeline: [return_analyst] + original_steps.
Returns:
{"should_escalate": True, "reason": "auto_return_threshold_exceeded"} if threshold hit.
{"should_escalate": False, "auto_return_result": {...}} otherwise.
"""
# (a) Check threshold — fetch current return_count before recording new one
current_task = models.get_task(conn, task_id)
current_return_count = (current_task or {}).get("return_count") or 0
if current_return_count >= _AUTO_RETURN_MAX:
_logger.warning(
"KIN-136: auto-return threshold reached for task %s "
"(return_count=%d >= max=%d) — escalating to human",
task_id, current_return_count, _AUTO_RETURN_MAX,
)
return {"should_escalate": True, "reason": "auto_return_threshold_exceeded"}
pipeline_id = pipeline["id"] if pipeline else None
# (b) Record return
try:
models.record_task_return(
conn,
task_id=task_id,
reason_category="recurring_quality_fail",
reason_text=f"Gate {gate_role}: {gate_reason[:200]}",
returned_by=gate_role,
pipeline_id=pipeline_id,
)
except Exception:
pass # Never block auto-return on tracking errors
# (c) Mark current pipeline failed, set task to revising
if pipeline:
try:
models.update_pipeline(conn, pipeline_id, status="failed")
except Exception:
pass
models.update_task(conn, task_id, status="revising")
try:
models.write_log(
conn, pipeline_id,
f"KIN-136: auto-return triggered by {gate_role} "
f"(return_count now {current_return_count + 1}) — spawning return_analyst pipeline",
level="INFO",
extra={"gate_role": gate_role, "return_count": current_return_count + 1},
)
except Exception:
pass
# (d) Build new steps and spawn new pipeline
new_steps = [{"role": "return_analyst", "model": "opus"}] + list(original_steps)
auto_return_result = run_pipeline(
conn,
task_id,
new_steps,
allow_write=allow_write,
noninteractive=noninteractive,
initial_previous_output=gate_output_json,
parent_pipeline_id=pipeline_id,
)
return {"should_escalate": False, "auto_return_result": auto_return_result}
# ---------------------------------------------------------------------------
# Destructive operation detection (KIN-116)
# ---------------------------------------------------------------------------
# Patterns that indicate destructive operations in agent output.
# Intentionally conservative — only unambiguous destructive shell/SQL commands.
_DESTRUCTIVE_PATTERNS = [
# Shell: rm with recursive or force flags (alone or combined)
r"\brm\s+(-[a-zA-Z]*[rf][a-zA-Z]*\s+|--recursive\s+|--force\s+)",
# Shell: unlink (removes a file)
r"\bunlink\s+\S",
# SQL: DROP TABLE / DATABASE / INDEX / VIEW / SCHEMA
r"\bDROP\s+(TABLE|DATABASE|INDEX|VIEW|SCHEMA)\b",
# SQL: DELETE FROM without WHERE — full table delete is the risky form.
# DELETE FROM ... WHERE ... is a targeted operation and is NOT flagged.
r"\bDELETE\s+FROM\b(?![^;]*\bWHERE\b)",
# Python: shutil.rmtree
r"\bshutil\.rmtree\s*\(",
# Python: os.remove / os.unlink
r"\bos\.(remove|unlink)\s*\(",
]
_DESTRUCTIVE_RE = [re.compile(p, re.DOTALL | re.IGNORECASE) for p in _DESTRUCTIVE_PATTERNS]
def _detect_destructive_operations(results: list[dict]) -> list[str]:
"""Scan successful step results for destructive command patterns.
Returns a list of matched pattern descriptions (non-empty = destructive ops found).
Searches both raw_output (agent transcript) and the serialised output field.
"""
found: list[str] = []
for r in results:
if not r.get("success"):
continue
raw = r.get("raw_output") or ""
out = r.get("output") or ""
if not isinstance(raw, str):
raw = json.dumps(raw, ensure_ascii=False)
if not isinstance(out, str):
out = json.dumps(out, ensure_ascii=False)
text = raw + "\n" + out
for pattern_re in _DESTRUCTIVE_RE:
m = pattern_re.search(text)
if m:
found.append(m.group(0).strip())
return found
# ---------------------------------------------------------------------------
# Permission error detection
# ---------------------------------------------------------------------------
def _is_permission_error(result: dict) -> bool:
"""Return True if agent result indicates a permission/write failure."""
from core.followup import PERMISSION_PATTERNS
output = (result.get("raw_output") or result.get("output") or "")
if not isinstance(output, str):
output = json.dumps(output, ensure_ascii=False)
error = result.get("error") or ""
text = output + " " + error
return any(re.search(p, text) for p in PERMISSION_PATTERNS)
# ---------------------------------------------------------------------------
# Autocommit: git add -A && git commit after successful pipeline
# ---------------------------------------------------------------------------
def _get_changed_files(project_path: str) -> list[str]:
"""Return files changed in the current pipeline run.
Combines unstaged changes, staged changes, and the last commit diff
to cover both autocommit-on and autocommit-off scenarios.
Returns [] on any git error (e.g. no git repo, first commit).
"""
env = _build_claude_env()
git_cmd = shutil.which("git", path=env["PATH"]) or "git"
files: set[str] = set()
for git_args in (
["diff", "--name-only"], # unstaged tracked changes
["diff", "--cached", "--name-only"], # staged changes
["diff", "HEAD~1", "HEAD", "--name-only"], # last commit (post-autocommit)
):
try:
r = subprocess.run(
[git_cmd] + git_args,
cwd=project_path,
capture_output=True,
text=True,
timeout=10,
env=env,
)
if r.returncode == 0:
files.update(f.strip() for f in r.stdout.splitlines() if f.strip())
except Exception:
pass
return list(files)
def _run_autocommit(
conn: sqlite3.Connection,
task_id: str,
project_id: str,
) -> None:
"""Auto-commit changes after successful pipeline completion.
Runs: git add -A && git commit -m 'kin: {task_id} {title}'.
Silently skips if nothing to commit (exit code 1) or project path not found.
Never raises autocommit errors must never block the pipeline.
Uses stderr=subprocess.DEVNULL per decision #30.
"""
task = models.get_task(conn, task_id)
project = models.get_project(conn, project_id)
if not task or not project:
return
if not project.get("autocommit_enabled"):
return
project_path = Path(project["path"]).expanduser()
if not project_path.is_dir():
return
working_dir = str(project_path)
env = _build_claude_env()
git_cmd = shutil.which("git", path=env["PATH"]) or "git"
title = (task.get("title") or "").replace('"', "'").replace("\n", " ").replace("\r", "")
commit_msg = f"kin: {task_id} {title}"
try:
subprocess.run(
[git_cmd, "add", "-A"],
cwd=working_dir,
env=env,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
)
result = subprocess.run(
[git_cmd, "commit", "-m", commit_msg],
cwd=working_dir,
env=env,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
)
if result.returncode == 0:
_logger.info("Autocommit: %s", commit_msg)
else:
_logger.debug("Autocommit: nothing to commit for %s", task_id)
except Exception as exc:
_logger.warning("Autocommit failed for %s: %s", task_id, exc)
# ---------------------------------------------------------------------------
# Sysadmin output: save server map to decisions and modules
# ---------------------------------------------------------------------------
def _save_sysadmin_output(
conn: sqlite3.Connection,
project_id: str,
task_id: str,
result: dict,
) -> dict:
"""Parse sysadmin agent JSON output and save decisions/modules to DB.
Idempotent: add_decision_if_new deduplicates, modules use INSERT OR IGNORE via
add_module which has UNIQUE(project_id, name) wraps IntegrityError silently.
Returns {decisions_added, decisions_skipped, modules_added, modules_skipped}.
"""
raw = result.get("raw_output") or result.get("output") or ""
if isinstance(raw, (dict, list)):
raw = json.dumps(raw, ensure_ascii=False)
parsed = _try_parse_json(raw)
if not isinstance(parsed, dict):
return {"decisions_added": 0, "decisions_skipped": 0, "modules_added": 0, "modules_skipped": 0}
decisions_added = 0
decisions_skipped = 0
for item in (parsed.get("decisions") or []):
if not isinstance(item, dict):
continue
d_type = item.get("type", "decision")
if d_type not in VALID_DECISION_TYPES:
d_type = "decision"
d_title = (item.get("title") or "").strip()
d_desc = (item.get("description") or "").strip()
if not d_title or not d_desc:
continue
saved = models.add_decision_if_new(
conn,
project_id=project_id,
type=d_type,
title=d_title,
description=d_desc,
tags=item.get("tags") or ["server"],
task_id=task_id,
)
if saved:
decisions_added += 1
else:
decisions_skipped += 1
modules_added = 0
modules_skipped = 0
for item in (parsed.get("modules") or []):
if not isinstance(item, dict):
continue
m_name = (item.get("name") or "").strip()
m_type = (item.get("type") or "service").strip()
m_path = (item.get("path") or "").strip()
if not m_name:
continue
try:
m = models.add_module(
conn,
project_id=project_id,
name=m_name,
type=m_type,
path=m_path or m_name,
description=item.get("description"),
owner_role="sysadmin",
)
if m.get("_created", True):
modules_added += 1
else:
modules_skipped += 1
except Exception:
modules_skipped += 1
return {
"decisions_added": decisions_added,
"decisions_skipped": decisions_skipped,
"modules_added": modules_added,
"modules_skipped": modules_skipped,
}
# ---------------------------------------------------------------------------
# Auto-test: detect test failure in agent output
# ---------------------------------------------------------------------------
_TEST_FAILURE_PATTERNS = [
r"\bFAILED\b",
r"\bFAIL\b",
r"\d+\s+failed",
r"test(?:s)?\s+failed",
r"assert(?:ion)?\s*(error|failed)",
r"exception(?:s)?\s+occurred",
r"returncode\s*[!=]=\s*0",
r"Error:\s",
r"ERRORS?\b",
]
_TEST_SUCCESS_PATTERNS = [
r"no\s+failures",
r"all\s+tests?\s+pass",
r"0\s+failed",
r"passed.*no\s+errors",
]
def _is_test_failure(result: dict) -> bool:
"""Return True if agent output indicates test failures.
Checks for failure keywords, guards against false positives from
explicit success phrases (e.g. 'no failures').
"""
output = result.get("raw_output") or result.get("output") or ""
if not isinstance(output, str):
output = json.dumps(output, ensure_ascii=False)
for p in _TEST_SUCCESS_PATTERNS:
if re.search(p, output, re.IGNORECASE):
return False
for p in _TEST_FAILURE_PATTERNS:
if re.search(p, output, re.IGNORECASE):
return True
return False
# ---------------------------------------------------------------------------
# Auto-test runner: run project tests via `make test`
# ---------------------------------------------------------------------------
# Roles that trigger auto-test when project.auto_test_enabled is set
_AUTO_TEST_ROLES = {"backend_dev", "frontend_dev"}
2026-03-17 20:54:20 +02:00
_WORKTREE_ROLES = {"backend_dev", "frontend_dev", "debugger"}
2026-03-17 23:31:24 +02:00
_DEV_GUARD_ROLES = {"backend_dev", "frontend_dev", "debugger"}
2026-03-18 14:06:23 +02:00
def _detect_test_command(project_path: str, role: str | None = None) -> str | None:
2026-03-17 19:30:15 +02:00
"""Auto-detect test command by inspecting project files.
Candidates (in priority order):
1. make test Makefile exists and has a 'test' target
2. npm test package.json exists and has scripts.test
3. pytest pyproject.toml or setup.py exists
4. npx tsc --noEmit tsconfig.json exists
2026-03-18 14:06:23 +02:00
When role='backend_dev' and a Python project marker (pyproject.toml / setup.py)
is present, pytest is returned directly bypassing make test. This prevents
false-positive failures in mixed projects whose Makefile test target also runs
frontend (e.g. vitest) commands that may be unrelated to backend changes.
2026-03-17 19:30:15 +02:00
Returns the first matching command, or None if no framework is detected.
"""
path = Path(project_path)
2026-03-18 14:06:23 +02:00
# For backend_dev: Python project marker takes precedence over Makefile.
# Rationale: make test in mixed projects often runs frontend tests too;
# backend changes should only be validated by the Python test runner.
if role == "backend_dev":
if (path / "pyproject.toml").is_file() or (path / "setup.py").is_file():
return f"{sys.executable} -m pytest"
2026-03-17 19:30:15 +02:00
# 1. make test
makefile = path / "Makefile"
if makefile.is_file():
try:
content = makefile.read_text(errors="ignore")
if re.search(r"^test\s*:", content, re.MULTILINE):
return "make test"
except OSError:
pass
# 2. npm test
pkg_json = path / "package.json"
if pkg_json.is_file():
try:
pkg = json.loads(pkg_json.read_text())
if pkg.get("scripts", {}).get("test"):
return "npm test"
except (json.JSONDecodeError, OSError):
pass
# 3. pytest
if (path / "pyproject.toml").is_file() or (path / "setup.py").is_file():
2026-03-17 21:25:12 +02:00
return f"{sys.executable} -m pytest"
2026-03-17 19:30:15 +02:00
# 4. npx tsc --noEmit
if (path / "tsconfig.json").is_file():
return "npx tsc --noEmit"
return None
2026-03-18 13:33:29 +02:00
def _run_project_tests(project_path: str, test_command: str = 'make test', timeout: int | None = None) -> dict:
2026-03-17 15:40:31 +02:00
"""Run test_command in project_path. Returns {success, output, returncode}.
Never raises all errors are captured and returned in output.
"""
2026-03-18 13:33:29 +02:00
if timeout is None:
timeout = int(os.environ.get("KIN_AUTO_TEST_TIMEOUT") or 600)
env = _build_claude_env()
2026-03-17 15:40:31 +02:00
parts = shlex.split(test_command)
if not parts:
return {"success": False, "output": "Empty test_command", "returncode": -1}
resolved = shutil.which(parts[0], path=env["PATH"]) or parts[0]
cmd = [resolved] + parts[1:]
try:
result = subprocess.run(
2026-03-17 15:40:31 +02:00
cmd,
cwd=project_path,
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
output = (result.stdout or "") + (result.stderr or "")
return {"success": result.returncode == 0, "output": output, "returncode": result.returncode}
except subprocess.TimeoutExpired:
2026-03-17 15:40:31 +02:00
return {"success": False, "output": f"{test_command} timed out after {timeout}s", "returncode": 124}
except FileNotFoundError:
2026-03-17 15:40:31 +02:00
return {"success": False, "output": f"{parts[0]} not found in PATH", "returncode": 127}
except Exception as exc:
return {"success": False, "output": f"Test run error: {exc}", "returncode": -1}
# ---------------------------------------------------------------------------
# Decomposer output: create child tasks from task_decomposer JSON
# ---------------------------------------------------------------------------
def _save_decomposer_output(
conn: sqlite3.Connection,
project_id: str,
parent_task_id: str,
result: dict,
) -> dict:
"""Parse task_decomposer output and create child tasks in DB.
Expected output format: {tasks: [{title, brief, priority, category, acceptance_criteria}]}
Idempotent: skips tasks with same parent_task_id + title (case-insensitive).
Returns {created: int, skipped: int}.
"""
raw = result.get("raw_output") or result.get("output") or ""
if isinstance(raw, (dict, list)):
raw = json.dumps(raw, ensure_ascii=False)
parsed = _try_parse_json(raw)
if not isinstance(parsed, dict):
return {"created": 0, "skipped": 0, "error": "non-JSON decomposer output"}
task_list = parsed.get("tasks", [])
if not isinstance(task_list, list):
return {"created": 0, "skipped": 0, "error": "invalid tasks format"}
created = 0
skipped = 0
for item in task_list:
if not isinstance(item, dict):
continue
title = (item.get("title") or "").strip()
if not title:
continue
# Idempotency: skip if same parent + title already exists
existing = conn.execute(
"""SELECT id FROM tasks
WHERE parent_task_id = ? AND lower(trim(title)) = lower(trim(?))""",
(parent_task_id, title),
).fetchone()
if existing:
skipped += 1
2026-03-17 15:25:53 +02:00
_logger.info(
"task_decomposer: skip duplicate child task '%s' (parent=%s, existing=%s)",
title,
parent_task_id,
existing[0],
)
continue
category = (item.get("category") or "").strip().upper()
if category not in models.TASK_CATEGORIES:
category = None
task_id = models.next_task_id(conn, project_id, category=category)
brief_text = item.get("brief") or ""
models.create_task(
conn,
task_id,
project_id,
title,
priority=item.get("priority", 5),
brief={"text": brief_text, "source": f"decomposer:{parent_task_id}"},
category=category,
acceptance_criteria=item.get("acceptance_criteria"),
parent_task_id=parent_task_id,
)
created += 1
return {"created": created, "skipped": skipped}
2026-03-18 22:11:14 +02:00
# ---------------------------------------------------------------------------
# Tech debt: create followup child task from dev agent output
# ---------------------------------------------------------------------------
# Roles whose output is parsed for tech_debt (KIN-128)
_TECH_DEBT_ROLES = {"backend_dev", "frontend_dev", "debugger", "sysadmin"}
def _save_tech_debt_output(
conn: sqlite3.Connection,
project_id: str,
task_id: str,
result: dict,
) -> dict:
"""Parse dev agent JSON output for tech_debt field and create a child task.
If the agent output contains a non-empty 'tech_debt' object with a 'description',
creates one child task with title='[TECH DEBT] {description}'.
At most 1 tech_debt task per call (prevents runaway task creation).
Returns {created: bool, task_id: str | None}.
"""
raw = result.get("raw_output") or result.get("output") or ""
if isinstance(raw, (dict, list)):
raw = json.dumps(raw, ensure_ascii=False)
try:
parsed = _try_parse_json(raw)
except Exception:
return {"created": False, "task_id": None}
if not isinstance(parsed, dict):
return {"created": False, "task_id": None}
tech_debt = parsed.get("tech_debt")
if not isinstance(tech_debt, dict):
return {"created": False, "task_id": None}
description = (tech_debt.get("description") or "").strip()
if not description:
return {"created": False, "task_id": None}
reason_temporary = (tech_debt.get("reason_temporary") or "").strip()
proper_fix = (tech_debt.get("proper_fix") or "").strip()
# Idempotency: skip if a [TECH DEBT] child with same description already exists
title = f"[TECH DEBT] {description}"
existing = conn.execute(
"""SELECT id FROM tasks
WHERE parent_task_id = ? AND lower(trim(title)) = lower(trim(?))""",
(task_id, title),
).fetchone()
if existing:
return {"created": False, "task_id": existing[0]}
category = (tech_debt.get("category") or "").strip().upper()
if category not in models.TASK_CATEGORIES:
category = "FIX"
brief_text = f"Технический долг из задачи {task_id}."
if reason_temporary:
brief_text += f"\n\nПричина временного решения: {reason_temporary}"
if proper_fix:
brief_text += f"\n\nПравильный фикс: {proper_fix}"
new_task_id = models.next_task_id(conn, project_id, category=category)
models.create_task(
conn,
new_task_id,
project_id,
title,
priority=7,
brief={"text": brief_text, "source": f"tech_debt:{task_id}"},
category=category,
parent_task_id=task_id,
)
_logger.info("tech_debt: created task %s for parent %s", new_task_id, task_id)
return {"created": True, "task_id": new_task_id}
2026-03-20 21:56:46 +02:00
# ---------------------------------------------------------------------------
# Return analyst output: handle escalation pipeline creation (KIN-135)
# ---------------------------------------------------------------------------
# Mapping from task category to escalation dept_head (KIN-135)
_CATEGORY_TO_DEPT_HEAD = {
"SEC": "security_head",
"UI": "frontend_head",
"API": "backend_head",
"BIZ": "backend_head",
"DB": "backend_head",
"PERF": "backend_head",
"ARCH": "cto_advisor",
"FIX": "cto_advisor",
"INFRA": "infra_head",
"OBS": "infra_head",
"TEST": "qa_head",
"DOCS": "cto_advisor",
}
def _save_return_analyst_output(
conn: sqlite3.Connection,
task_id: str,
project_id: str,
result: dict,
parent_pipeline_id: int | None = None,
) -> dict:
"""Parse return_analyst output and create escalation pipeline if needed.
If escalate_to_dept_head=true in analyst output:
- Determines target dept_head from task.category (defaults to cto_advisor)
- Creates a new escalation pipeline with pipeline_type='escalation'
- The dept_head step receives analyst output as initial context
Returns {"escalated": bool, "escalation_pipeline_id": int | None, "dept_head": str | None}.
Never raises escalation errors must never block the current pipeline.
"""
raw = result.get("raw_output") or result.get("output") or ""
if isinstance(raw, (dict, list)):
raw = json.dumps(raw, ensure_ascii=False)
parsed = _try_parse_json(raw)
if not isinstance(parsed, dict):
return {"escalated": False, "escalation_pipeline_id": None, "dept_head": None}
if not parsed.get("escalate_to_dept_head"):
return {"escalated": False, "escalation_pipeline_id": None, "dept_head": None}
# Determine target dept_head from task category
task = models.get_task(conn, task_id)
category = (task or {}).get("category") or ""
dept_head = _CATEGORY_TO_DEPT_HEAD.get(category.upper(), "cto_advisor")
# Build escalation pipeline steps: dept_head with analyst context as brief
analyst_summary = parsed.get("root_cause_analysis", "")
refined_brief = parsed.get("refined_brief", "")
escalation_brief = (
f"[ESCALATION from return_analyst]\n"
f"Root cause: {analyst_summary}\n"
f"Refined brief: {refined_brief}"
)
escalation_steps = [{"role": dept_head, "model": "opus", "brief": escalation_brief}]
try:
esc_pipeline = models.create_pipeline(
conn, task_id, project_id,
route_type="escalation",
steps=escalation_steps,
parent_pipeline_id=parent_pipeline_id,
)
# Mark pipeline_type as escalation so return tracking is skipped inside it
conn.execute(
"UPDATE pipelines SET pipeline_type = 'escalation' WHERE id = ?",
(esc_pipeline["id"],),
)
conn.commit()
_logger.info(
"KIN-135: escalation pipeline %s created for task %s%s",
esc_pipeline["id"], task_id, dept_head,
)
return {
"escalated": True,
"escalation_pipeline_id": esc_pipeline["id"],
"dept_head": dept_head,
}
except Exception as exc:
_logger.warning("KIN-135: escalation pipeline creation failed for %s: %s", task_id, exc)
return {"escalated": False, "escalation_pipeline_id": None, "dept_head": None}
# ---------------------------------------------------------------------------
# Auto-learning: extract decisions from pipeline results
# ---------------------------------------------------------------------------
VALID_DECISION_TYPES = {"decision", "gotcha", "convention"}
def _run_learning_extraction(
conn: sqlite3.Connection,
task_id: str,
project_id: str,
step_results: list[dict],
) -> dict:
"""Extract and save decisions from completed pipeline results.
Calls the learner agent with step outputs + existing decisions,
parses the JSON response, and saves new decisions via add_decision_if_new.
Returns a summary dict with added/skipped counts.
"""
learner_prompt_path = PROMPTS_DIR / "learner.md"
if not learner_prompt_path.exists():
return {"added": 0, "skipped": 0, "error": "learner.md not found"}
template = learner_prompt_path.read_text()
# Summarize step outputs (first 2000 chars each)
step_summaries = {}
for r in step_results:
role = r.get("role", "unknown")
output = r.get("raw_output") or r.get("output") or ""
if isinstance(output, (dict, list)):
output = json.dumps(output, ensure_ascii=False)
step_summaries[role] = output[:2000]
# Fetch existing decisions for dedup hint
existing = models.get_decisions(conn, project_id)
existing_hints = [
{"title": d["title"], "type": d["type"]}
for d in existing
]
prompt_parts = [
template,
"",
"## PIPELINE_OUTPUTS",
json.dumps(step_summaries, ensure_ascii=False, indent=2),
"",
"## EXISTING_DECISIONS",
json.dumps(existing_hints, ensure_ascii=False, indent=2),
]
prompt = "\n".join(prompt_parts)
learner_timeout = int(os.environ.get("KIN_LEARNER_TIMEOUT") or 120)
start = time.monotonic()
result = _run_claude(prompt, model="sonnet", noninteractive=True, timeout=learner_timeout)
duration = int(time.monotonic() - start)
raw_output = result.get("output", "")
if not isinstance(raw_output, str):
raw_output = json.dumps(raw_output, ensure_ascii=False)
success = result["returncode"] == 0
# Log to agent_logs
models.log_agent_run(
conn,
project_id=project_id,
task_id=task_id,
agent_role="learner",
action="learn",
input_summary=f"project={project_id}, task={task_id}, steps={len(step_results)}",
output_summary=raw_output or None,
tokens_used=result.get("tokens_used"),
model="sonnet",
cost_usd=result.get("cost_usd"),
success=success,
error_message=result.get("error") if not success else None,
duration_seconds=duration,
)
parsed = _try_parse_json(raw_output)
if not isinstance(parsed, dict):
return {"added": 0, "skipped": 0, "error": "non-JSON learner output"}
decisions = parsed.get("decisions", [])
if not isinstance(decisions, list):
return {"added": 0, "skipped": 0, "error": "invalid decisions format"}
added = 0
skipped = 0
for item in decisions[:5]:
if not isinstance(item, dict):
continue
d_type = item.get("type", "decision")
if d_type not in VALID_DECISION_TYPES:
d_type = "decision"
d_title = (item.get("title") or "").strip()
d_desc = (item.get("description") or "").strip()
if not d_title or not d_desc:
continue
saved = models.add_decision_if_new(
conn,
project_id=project_id,
type=d_type,
title=d_title,
description=d_desc,
tags=item.get("tags") or [],
task_id=task_id,
)
if saved:
added += 1
else:
skipped += 1
return {"added": added, "skipped": skipped}
2026-03-17 14:03:53 +02:00
# ---------------------------------------------------------------------------
# Department head detection
# ---------------------------------------------------------------------------
# Cache of roles with execution_type=department_head from specialists.yaml
_DEPT_HEAD_ROLES: set[str] | None = None
def _is_department_head(role: str) -> bool:
"""Check if a role is a department head.
Uses execution_type from specialists.yaml as primary check,
falls back to role.endswith('_head') convention.
"""
global _DEPT_HEAD_ROLES
if _DEPT_HEAD_ROLES is None:
try:
from core.context_builder import _load_specialists
specs = _load_specialists()
all_specs = specs.get("specialists", {})
_DEPT_HEAD_ROLES = {
name for name, spec in all_specs.items()
if spec.get("execution_type") == "department_head"
}
except Exception:
_DEPT_HEAD_ROLES = set()
return role in _DEPT_HEAD_ROLES or role.endswith("_head")
# ---------------------------------------------------------------------------
# Department head sub-pipeline execution
# ---------------------------------------------------------------------------
def _execute_department_head_step(
conn: sqlite3.Connection,
task_id: str,
project_id: str,
parent_pipeline_id: int | None,
step: dict,
dept_head_result: dict,
allow_write: bool = False,
noninteractive: bool = False,
next_department: str | None = None,
) -> dict:
"""Execute sub-pipeline planned by a department head.
Parses the dept head's JSON output, validates the sub_pipeline,
creates a child pipeline in DB, runs it, and saves a handoff record.
Returns dict with success, output, cost_usd, tokens_used, duration_seconds.
"""
raw = dept_head_result.get("raw_output") or dept_head_result.get("output") or ""
if isinstance(raw, (dict, list)):
raw = json.dumps(raw, ensure_ascii=False)
parsed = _try_parse_json(raw)
if not isinstance(parsed, dict):
return {
"success": False,
"output": "Department head returned non-JSON output",
"cost_usd": 0, "tokens_used": 0, "duration_seconds": 0,
}
# Blocked status from dept head
if parsed.get("status") == "blocked":
reason = parsed.get("blocked_reason", "Department head reported blocked")
2026-03-17 17:26:31 +02:00
# KIN-084: log dept head blocked
if parent_pipeline_id:
try:
models.write_log(
conn, parent_pipeline_id,
f"Dept {step['role']} blocked: {reason}",
level="WARN",
extra={"role": step["role"], "blocked_reason": reason},
)
except Exception:
pass
2026-03-17 14:03:53 +02:00
return {
"success": False,
"output": json.dumps(parsed, ensure_ascii=False),
"blocked": True,
"blocked_reason": reason,
"cost_usd": 0, "tokens_used": 0, "duration_seconds": 0,
}
sub_pipeline = parsed.get("sub_pipeline", [])
if not isinstance(sub_pipeline, list) or not sub_pipeline:
return {
"success": False,
"output": "Department head returned empty or invalid sub_pipeline",
"cost_usd": 0, "tokens_used": 0, "duration_seconds": 0,
}
# Recursion guard: no department head roles allowed in sub_pipeline
for sub_step in sub_pipeline:
if isinstance(sub_step, dict) and _is_department_head(str(sub_step.get("role", ""))):
return {
"success": False,
"output": f"Recursion blocked: sub_pipeline contains _head role '{sub_step['role']}'",
"cost_usd": 0, "tokens_used": 0, "duration_seconds": 0,
}
role = step["role"]
dept_name = role.replace("_head", "")
2026-03-19 20:30:50 +02:00
# Extract context_packet (KIN-DOCS-004): fail-open if missing
context_packet = parsed.get("context_packet")
if context_packet is None and parent_pipeline_id:
try:
models.write_log(
conn, parent_pipeline_id,
f"Dept {step['role']}: context_packet missing from output — handoff quality degraded",
level="WARN",
extra={"role": step["role"]},
)
except Exception:
pass
# Build initial context for workers: context_packet first, then dept head's plan
dept_plan_context_dict: dict = {}
if context_packet is not None:
dept_plan_context_dict["context_packet"] = context_packet
dept_plan_context_dict["department_head_plan"] = {
"department": dept_name,
"artifacts": parsed.get("artifacts", {}),
"handoff_notes": parsed.get("handoff_notes", ""),
}
dept_plan_context = json.dumps(dept_plan_context_dict, ensure_ascii=False)
2026-03-17 14:03:53 +02:00
2026-03-17 17:26:31 +02:00
# KIN-084: log sub-pipeline start
if parent_pipeline_id:
try:
sub_roles = [s.get("role", "") for s in sub_pipeline]
models.write_log(
conn, parent_pipeline_id,
f"Sub-pipeline start: dept={dept_name}, steps={len(sub_pipeline)}",
extra={"dept_name": dept_name, "sub_steps": len(sub_pipeline), "sub_roles": sub_roles},
)
except Exception:
pass
2026-03-17 14:03:53 +02:00
# Run the sub-pipeline (noninteractive=True — Opus already reviewed the plan)
2026-03-17 16:01:51 +02:00
# pass parent_pipeline_id and department so run_pipeline creates the child
# pipeline with correct attributes (route_type='dept_sub') — no double create
2026-03-17 14:03:53 +02:00
sub_result = run_pipeline(
conn, task_id, sub_pipeline,
dry_run=False,
allow_write=allow_write,
noninteractive=True,
initial_previous_output=dept_plan_context,
2026-03-17 16:01:51 +02:00
parent_pipeline_id=parent_pipeline_id,
department=dept_name,
2026-03-17 14:03:53 +02:00
)
2026-03-17 17:26:31 +02:00
# KIN-084: log sub-pipeline done
if parent_pipeline_id:
try:
sub_success = sub_result.get("success", False)
models.write_log(
conn, parent_pipeline_id,
f"Sub-pipeline done: dept={dept_name}, success={sub_success}, steps={sub_result.get('steps_completed', 0)}",
level="INFO" if sub_success else "ERROR",
extra={"dept_name": dept_name, "success": sub_success, "steps_completed": sub_result.get("steps_completed", 0)},
)
except Exception:
pass
2026-03-17 14:03:53 +02:00
# Extract decisions from sub-pipeline results for handoff
decisions_made = []
sub_results = sub_result.get("results", [])
for sr in sub_results:
output = sr.get("output") or sr.get("raw_output") or ""
if isinstance(output, str):
try:
output = json.loads(output)
except (json.JSONDecodeError, ValueError):
pass
if isinstance(output, dict):
# Reviewer/tester may include decisions or findings
for key in ("decisions", "findings", "recommendations"):
val = output.get(key)
if isinstance(val, list):
decisions_made.extend(val)
elif isinstance(val, str) and val:
decisions_made.append(val)
# Determine last worker role for auto_complete tracking
last_sub_role = sub_pipeline[-1].get("role", "") if sub_pipeline else ""
# Save handoff for inter-department context
handoff_status = "done" if sub_result.get("success") else "partial"
try:
models.create_handoff(
conn,
pipeline_id=sub_result.get("pipeline_id") or parent_pipeline_id,
2026-03-17 14:03:53 +02:00
task_id=task_id,
from_department=dept_name,
to_department=next_department,
artifacts=parsed.get("artifacts", {}),
decisions_made=decisions_made,
blockers=[],
status=handoff_status,
2026-03-19 20:30:50 +02:00
context_packet=context_packet,
2026-03-17 14:03:53 +02:00
)
except Exception:
pass # Handoff save errors must never block pipeline
# Build summary output for the next pipeline step
summary = {
"from_department": dept_name,
"handoff_notes": parsed.get("handoff_notes", ""),
"artifacts": parsed.get("artifacts", {}),
2026-03-19 20:30:50 +02:00
"context_packet": context_packet,
2026-03-17 14:03:53 +02:00
"sub_pipeline_summary": {
"steps_completed": sub_result.get("steps_completed", 0),
"success": sub_result.get("success", False),
},
}
ret = {
2026-03-17 14:03:53 +02:00
"success": sub_result.get("success", False),
"output": json.dumps(summary, ensure_ascii=False),
"cost_usd": sub_result.get("total_cost_usd", 0),
"tokens_used": sub_result.get("total_tokens", 0),
"duration_seconds": sub_result.get("total_duration_seconds", 0),
"last_sub_role": last_sub_role,
}
if not sub_result.get("success"):
ret["blocked_reason"] = sub_result.get("blocked_reason") or sub_result.get("error")
ret["error"] = sub_result.get("error")
return ret
2026-03-17 14:03:53 +02:00
2026-03-17 15:59:43 +02:00
# ---------------------------------------------------------------------------
# Watchdog helpers
# ---------------------------------------------------------------------------
def _check_parent_alive(
conn: sqlite3.Connection,
pipeline: dict,
task_id: str,
project_id: str,
) -> bool:
"""Check if parent process is alive. Returns True if pipeline should abort.
Only treats ESRCH (no such process) as dead parent.
PermissionError (pid 1 / init) and ValueError are ignored pipeline continues.
"""
ppid = os.getppid()
try:
os.kill(ppid, 0)
except OSError as exc:
if exc.errno == _errno.ESRCH:
reason = f"Parent process died unexpectedly (PID {ppid})"
_logger.warning("Pipeline %s: %s — aborting", pipeline["id"], reason)
models.update_pipeline(conn, pipeline["id"], status="failed")
models.update_task(conn, task_id, status="blocked", blocked_reason=reason)
return True
# PermissionError (EPERM) — process exists but we can't signal it: continue
return False
# ---------------------------------------------------------------------------
# Pipeline executor
# ---------------------------------------------------------------------------
def run_pipeline(
conn: sqlite3.Connection,
task_id: str,
steps: list[dict],
dry_run: bool = False,
allow_write: bool = False,
noninteractive: bool = False,
2026-03-17 14:03:53 +02:00
initial_previous_output: str | None = None,
parent_pipeline_id: int | None = None,
department: str | None = None,
2026-03-17 18:29:32 +02:00
pm_result: dict | None = None,
pm_started_at: str | None = None,
pm_ended_at: str | None = None,
) -> dict:
"""Execute a multi-step pipeline of agents.
steps = [
{"role": "debugger", "model": "opus", "brief": "..."},
{"role": "tester", "depends_on": "debugger", "brief": "..."},
]
2026-03-17 14:03:53 +02:00
initial_previous_output: context injected as previous_output for the first step
(used by dept head sub-pipelines to pass artifacts/plan to workers).
Returns {success, steps_completed, total_cost, total_tokens, total_duration, results}
"""
2026-03-17 21:25:12 +02:00
# Guard: empty pipeline — return immediately without touching DB or task state
if not steps:
return {"success": False, "error": "empty_pipeline"}
# Auth check — skip for dry_run (dry_run never calls claude CLI)
if not dry_run:
try:
check_claude_auth()
except ClaudeAuthError as exc:
return {
"success": False,
"error": "claude_auth_required",
"message": str(exc),
"instructions": "Run: claude login",
}
task = models.get_task(conn, task_id)
if not task:
return {"success": False, "error": f"Task '{task_id}' not found"}
project_id = task["project_id"]
# Determine route type from steps or task brief
route_type = "custom"
if task.get("brief") and isinstance(task["brief"], dict):
route_type = task["brief"].get("route_type", "custom") or "custom"
# Determine execution mode (auto vs review)
mode = models.get_effective_mode(conn, project_id, task_id)
# Create pipeline in DB
pipeline = None
if not dry_run:
effective_route_type = "dept_sub" if parent_pipeline_id else route_type
pipeline = models.create_pipeline(
conn, task_id, project_id, effective_route_type, steps,
parent_pipeline_id=parent_pipeline_id,
department=department,
)
2026-03-17 15:59:43 +02:00
# Save PID so watchdog can detect dead subprocesses (KIN-099)
2026-03-17 16:36:52 +02:00
pipeline = models.update_pipeline(conn, pipeline["id"], pid=os.getpid())
models.update_task(conn, task_id, status="in_progress")
2026-03-17 17:26:31 +02:00
# KIN-084: log pipeline start
try:
models.write_log(
conn, pipeline["id"],
f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}",
extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode},
)
except Exception:
pass
2026-03-17 18:29:32 +02:00
# KIN-OBS-025: log PM step retrospectively (after create_pipeline FK is satisfied)
# Only for top-level pipelines — sub-pipelines have no PM step.
if pm_result is not None and not parent_pipeline_id:
try:
models.write_log(
conn, pipeline["id"],
"PM start: task planning",
level="INFO",
ts=pm_started_at,
extra={"role": "pm"},
)
except Exception:
pass
try:
_n_steps = len(steps)
_success = pm_result.get("success")
_cost = pm_result.get("cost_usd") or 0.0
_tokens = pm_result.get("tokens_used") or 0
models.write_log(
conn, pipeline["id"],
f"PM done: {_n_steps} steps planned, success={_success}, cost=${_cost:.4f}, tokens={_tokens}",
level="INFO",
ts=pm_ended_at,
extra={
"role": "pm",
"steps_count": _n_steps,
"success": _success,
"cost_usd": _cost,
"tokens_used": _tokens,
},
)
except Exception:
pass
results = []
total_cost = 0.0
total_tokens = 0
total_duration = 0
2026-03-17 14:03:53 +02:00
previous_output = initial_previous_output
_last_sub_role = None # Track last worker role from dept sub-pipelines (for auto_complete)
for i, step in enumerate(steps):
role = step["role"]
model = step.get("model", "sonnet")
brief = step.get("brief")
2026-03-17 17:26:31 +02:00
# KIN-084: log step start
try:
if pipeline:
brief_preview = (step.get("brief") or "")[:100]
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} start: role={role}, model={model}",
level="INFO",
extra={"role": role, "model": model, "brief_preview": brief_preview},
)
except Exception:
pass
2026-03-17 15:59:43 +02:00
# Check parent process is still alive (KIN-099 watchdog)
if not dry_run and pipeline:
if _check_parent_alive(conn, pipeline, task_id, project_id):
return {
"success": False,
"error": "parent_process_died",
"steps_completed": i,
"total_cost": total_cost,
"total_tokens": total_tokens,
"total_duration": total_duration,
"results": results,
}
# Worktree isolation: opt-in per project, for write-capable roles
worktree_path = None
project_for_wt = models.get_project(conn, task["project_id"]) if not dry_run else None
use_worktree = (
not dry_run
and role in _WORKTREE_ROLES
and project_for_wt
and project_for_wt.get("worktrees_enabled")
and project_for_wt.get("path")
)
if use_worktree:
try:
from core.worktree import create_worktree, ensure_gitignore
p_path = str(Path(project_for_wt["path"]).expanduser())
ensure_gitignore(p_path)
worktree_path = create_worktree(p_path, task_id, role)
except Exception:
worktree_path = None # Fall back to normal execution
try:
result = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=previous_output,
brief_override=brief,
dry_run=dry_run,
allow_write=allow_write,
noninteractive=noninteractive,
working_dir_override=worktree_path,
)
except Exception as exc:
exc_msg = f"Step {i+1}/{len(steps)} ({role}) raised exception: {exc}"
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
2026-03-17 17:26:31 +02:00
# KIN-084: log step exception
try:
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} exception: {exc}",
level="ERROR",
extra={"role": role, "exc_type": type(exc).__name__},
)
except Exception:
pass
models.log_agent_run(
conn,
project_id=project_id,
task_id=task_id,
agent_role=role,
action="execute",
input_summary=f"task={task_id}, model={model}",
output_summary=None,
success=False,
error_message=exc_msg,
)
models.update_task(conn, task_id, status="blocked", blocked_reason=exc_msg)
try:
from core.telegram import send_telegram_escalation
project = models.get_project(conn, project_id)
project_name = project["name"] if project else project_id
sent = send_telegram_escalation(
task_id=task_id,
project_name=project_name,
agent_role=role,
reason=exc_msg,
pipeline_step=str(i + 1),
)
if sent:
models.mark_telegram_sent(conn, task_id)
except Exception:
pass # Telegram errors must never block pipeline
return {
"success": False,
"error": exc_msg,
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
if dry_run:
results.append(result)
continue
# Accumulate stats
total_cost += result.get("cost_usd") or 0
total_tokens += result.get("tokens_used") or 0
total_duration += result.get("duration_seconds") or 0
2026-03-17 17:26:31 +02:00
# KIN-084: log step done
try:
if pipeline:
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} done: role={role}, success={result.get('success')}",
level="INFO",
extra={
"role": role,
"tokens_used": result.get("tokens_used") or 0,
"cost_usd": result.get("cost_usd") or 0,
"duration_seconds": result.get("duration_seconds") or 0,
},
)
except Exception:
pass
if not result["success"]:
# Auto mode: retry once with allow_write on permission error
if mode == "auto_complete" and not allow_write and _is_permission_error(result):
task_modules = models.get_modules(conn, project_id)
try:
run_hooks(conn, project_id, task_id,
event="task_permission_retry",
task_modules=task_modules)
except Exception:
pass
# Audit log: record dangerous skip before retry
try:
models.log_audit_event(
conn,
event_type="dangerous_skip",
task_id=task_id,
step_id=role,
reason=f"auto mode permission retry: step {i+1}/{len(steps)} ({role})",
project_id=project_id,
)
models.update_task(conn, task_id, dangerously_skipped=1)
except Exception:
pass
retry = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=previous_output,
brief_override=brief,
dry_run=False,
allow_write=True,
noninteractive=noninteractive,
2026-03-17 23:31:24 +02:00
working_dir_override=worktree_path,
)
allow_write = True # subsequent steps also with allow_write
total_cost += retry.get("cost_usd") or 0
total_tokens += retry.get("tokens_used") or 0
total_duration += retry.get("duration_seconds") or 0
if retry["success"]:
result = retry
if not result["success"]:
# Still failed — block regardless of mode
results.append(result)
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
agent_error = result.get("error") or ""
2026-03-17 17:26:31 +02:00
# KIN-084: log step failed
try:
if pipeline:
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} failed: {agent_error}",
level="ERROR",
extra={"role": role, "error": agent_error},
)
except Exception:
pass
error_msg = f"Step {i+1}/{len(steps)} ({role}) failed"
if agent_error:
error_msg += f": {agent_error}"
models.update_task(conn, task_id, status="blocked", blocked_reason=error_msg)
2026-03-17 20:44:44 +02:00
# Worktree cleanup on step failure (KIN-103)
if worktree_path and not dry_run:
try:
from core.worktree import cleanup_worktree
p_path = str(Path(project_for_wt["path"]).expanduser())
cleanup_worktree(worktree_path, p_path)
except Exception:
pass
return {
"success": False,
"error": error_msg,
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# Worktree merge/cleanup after successful step
if worktree_path and result["success"] and not dry_run:
try:
from core.worktree import merge_worktree, cleanup_worktree
p_path = str(Path(project_for_wt["path"]).expanduser())
merge_result = merge_worktree(worktree_path, p_path)
if not merge_result["success"]:
conflicts = merge_result.get("conflicts", [])
conflict_msg = f"Worktree merge conflict in files: {', '.join(conflicts)}" if conflicts else "Worktree merge failed"
models.update_task(conn, task_id, status="blocked", blocked_reason=conflict_msg)
cleanup_worktree(worktree_path, p_path)
if pipeline:
models.update_pipeline(conn, pipeline["id"], status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration)
return {
"success": False,
"error": conflict_msg,
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
2026-03-17 23:31:24 +02:00
# Fix D: suspicious guard — empty merged_files means agent made no changes
if not merge_result.get("merged_files"):
result["suspicious"] = True
result["suspicious_reason"] = "agent reported success but no file changes detected"
_logger.warning("KIN-117: suspicious step %s — no merged files after worktree merge", role)
try:
if pipeline:
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} suspicious: {role} reported success but merged no files",
level="WARN",
extra={"role": role},
)
except Exception:
pass
cleanup_worktree(worktree_path, p_path)
except Exception:
pass # Worktree errors must never block pipeline
results.append(result)
# Semantic blocked: agent ran successfully but returned status='blocked'
blocked_info = _parse_agent_blocked(result)
if blocked_info:
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
2026-03-17 17:26:31 +02:00
# KIN-084: log step blocked
try:
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} blocked: {blocked_info['reason']}",
level="WARN",
extra={"role": role, "reason": blocked_info["reason"]},
)
except Exception:
pass
models.update_task(
conn, task_id,
status="blocked",
blocked_reason=blocked_info["reason"],
blocked_at=blocked_info["blocked_at"],
blocked_agent_role=role,
blocked_pipeline_step=str(i + 1),
)
try:
from core.telegram import send_telegram_escalation
project = models.get_project(conn, project_id)
project_name = project["name"] if project else project_id
sent = send_telegram_escalation(
task_id=task_id,
project_name=project_name,
agent_role=role,
reason=blocked_info["reason"],
pipeline_step=str(i + 1),
)
if sent:
models.mark_telegram_sent(conn, task_id)
except Exception:
pass # Telegram errors must never block pipeline
error_msg = f"Step {i+1}/{len(steps)} ({role}) blocked: {blocked_info['reason']}"
return {
"success": False,
"error": error_msg,
"blocked_by": role,
"blocked_reason": blocked_info["reason"],
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# Save sysadmin scan results immediately after a successful sysadmin step
if role == "sysadmin" and result["success"] and not dry_run:
try:
_save_sysadmin_output(conn, project_id, task_id, result)
except Exception:
pass # Never block pipeline on sysadmin save errors
# Save decomposer output: create child tasks from task_decomposer JSON
if role == "task_decomposer" and result["success"] and not dry_run:
try:
_save_decomposer_output(conn, project_id, task_id, result)
except Exception:
pass # Never block pipeline on decomposer save errors
2026-03-20 21:56:46 +02:00
# Return analyst: create escalation pipeline if escalate_to_dept_head=true (KIN-135)
if role == "return_analyst" and result["success"] and not dry_run:
try:
_save_return_analyst_output(
conn, task_id, project_id, result,
parent_pipeline_id=pipeline["id"] if pipeline else None,
)
except Exception:
pass # Never block pipeline on analyst escalation errors
2026-03-18 22:11:14 +02:00
# Smoke tester: parse result and escalate if cannot_confirm (KIN-128)
if role == "smoke_tester" and result["success"] and not dry_run:
smoke_output = result.get("output") or result.get("raw_output") or ""
smoke_parsed = None
try:
if isinstance(smoke_output, dict):
smoke_parsed = smoke_output
elif isinstance(smoke_output, str):
smoke_parsed = _try_parse_json(smoke_output)
except Exception:
pass
if isinstance(smoke_parsed, dict):
# Save smoke_test_result regardless of outcome
try:
models.update_task(conn, task_id, smoke_test_result=smoke_parsed)
except Exception:
pass
smoke_status = smoke_parsed.get("status", "")
if smoke_status == "cannot_confirm":
reason = smoke_parsed.get("reason") or "smoke_tester: cannot confirm — no proof of working service"
blocked_reason = f"smoke_test: cannot_confirm — {reason}"
models.update_task(
conn, task_id,
status="blocked",
blocked_reason=blocked_reason,
blocked_agent_role="smoke_tester",
blocked_pipeline_step=str(i + 1),
)
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
try:
models.write_log(
conn, pipeline["id"],
f"Smoke test cannot_confirm: {reason}",
level="WARN",
extra={"role": "smoke_tester", "reason": reason},
)
except Exception:
pass
return {
"success": False,
"error": blocked_reason,
"blocked_by": "smoke_tester",
"blocked_reason": blocked_reason,
"steps_completed": i + 1,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# status == 'confirmed': smoke test passed, continue pipeline
2026-03-19 13:47:49 +02:00
# Constitutional validator: gate before implementation (KIN-DOCS-001)
if role == "constitutional_validator" and result["success"] and not dry_run:
cv_output = result.get("output") or result.get("raw_output") or ""
cv_parsed = None
try:
if isinstance(cv_output, dict):
cv_parsed = cv_output
elif isinstance(cv_output, str):
cv_parsed = _try_parse_json(cv_output)
except Exception:
pass
if isinstance(cv_parsed, dict):
cv_verdict = cv_parsed.get("verdict", "")
if cv_verdict in ("changes_required", "escalated"):
if cv_verdict == "escalated":
reason = cv_parsed.get("escalation_reason") or "constitutional_validator: принципы конфликтуют — требуется решение директора"
blocked_reason = f"constitutional_validator: escalated — {reason}"
else:
violations = cv_parsed.get("violations") or []
if violations:
violations_summary = "; ".join(
f"{v.get('principle', '?')} ({v.get('severity', '?')}): {v.get('description', '')}"
for v in violations[:3]
)
else:
violations_summary = cv_parsed.get("summary") or "changes required"
blocked_reason = f"constitutional_validator: changes_required — {violations_summary}"
models.update_task(
conn, task_id,
status="blocked",
blocked_reason=blocked_reason,
blocked_agent_role="constitutional_validator",
blocked_pipeline_step=str(i + 1),
)
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
try:
models.write_log(
conn, pipeline["id"],
f"Constitutional validator blocked pipeline: {blocked_reason}",
level="WARN",
extra={"role": "constitutional_validator", "verdict": cv_verdict, "reason": blocked_reason},
)
except Exception:
pass
return {
"success": False,
"error": blocked_reason,
"blocked_by": "constitutional_validator",
"blocked_reason": blocked_reason,
"steps_completed": i + 1,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# verdict == 'approved': constitutional check passed, continue pipeline
2026-03-18 22:11:14 +02:00
# Tech debt: create followup child task from dev agent output (KIN-128)
if role in _TECH_DEBT_ROLES and result["success"] and not dry_run:
try:
_save_tech_debt_output(conn, project_id, task_id, result)
except Exception:
pass # Never block pipeline on tech_debt save errors
2026-03-17 14:03:53 +02:00
# Department head: execute sub-pipeline planned by the dept head
if _is_department_head(role) and result["success"] and not dry_run:
# Determine next department for handoff routing
_next_dept = None
if i + 1 < len(steps):
_next_role = steps[i + 1].get("role", "")
if _is_department_head(_next_role):
_next_dept = _next_role.replace("_head", "")
dept_result = _execute_department_head_step(
conn, task_id, project_id,
parent_pipeline_id=pipeline["id"] if pipeline else None,
step=step,
dept_head_result=result,
allow_write=allow_write,
noninteractive=noninteractive,
next_department=_next_dept,
)
# Accumulate sub-pipeline costs
total_cost += dept_result.get("cost_usd") or 0
total_tokens += dept_result.get("tokens_used") or 0
total_duration += dept_result.get("duration_seconds") or 0
if not dept_result.get("success"):
# Sub-pipeline failed — handle as blocked
results.append({"role": role, "_dept_sub": True, **dept_result})
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
blocked_reason = dept_result.get("blocked_reason") or dept_result.get("error") or f"Department {role} sub-pipeline failed"
error_msg = f"Department {role} sub-pipeline failed: {dept_result.get('output', '')[:200]}"
models.update_task(conn, task_id, status="blocked", blocked_reason=blocked_reason)
2026-03-17 14:03:53 +02:00
return {
"success": False,
"error": error_msg,
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# Track last worker role from sub-pipeline for auto_complete eligibility
if dept_result.get("last_sub_role"):
_last_sub_role = dept_result["last_sub_role"]
# Override previous_output with dept handoff summary (not raw dept head JSON)
previous_output = dept_result.get("output")
if isinstance(previous_output, (dict, list)):
previous_output = json.dumps(previous_output, ensure_ascii=False)
continue
2026-03-17 19:30:15 +02:00
# Project-level auto-test: run tests after backend_dev/frontend_dev steps.
# Enabled per project via auto_test_enabled flag (opt-in).
2026-03-17 19:30:15 +02:00
# test_command priority: project.test_command (explicit) → auto-detect → skip.
# On failure, loop fixer up to KIN_AUTO_TEST_MAX_ATTEMPTS times, then block.
if (
not dry_run
and role in _AUTO_TEST_ROLES
and result["success"]
and project_for_wt
and project_for_wt.get("auto_test_enabled")
and project_for_wt.get("path")
):
p_path_str = str(Path(project_for_wt["path"]).expanduser())
2026-03-17 19:30:15 +02:00
p_test_cmd_override = project_for_wt.get("test_command")
if p_test_cmd_override:
p_test_cmd = p_test_cmd_override
else:
2026-03-18 14:06:23 +02:00
p_test_cmd = _detect_test_command(p_path_str, role=role)
2026-03-17 19:30:15 +02:00
if p_test_cmd is None:
# No test framework detected — skip without blocking pipeline
_logger.info("auto-test: no test framework detected in %s, skipping", p_path_str)
results.append({
"role": "_auto_test",
"success": True,
"output": "no test framework detected",
"_project_test": True,
"_skipped": True,
})
else:
max_auto_test_attempts = int(os.environ.get("KIN_AUTO_TEST_MAX_ATTEMPTS") or 3)
2026-03-18 13:33:29 +02:00
auto_test_timeout = int(os.environ.get("KIN_AUTO_TEST_TIMEOUT") or 600)
test_run = _run_project_tests(p_path_str, p_test_cmd, timeout=auto_test_timeout)
results.append({"role": "_auto_test", "success": test_run["success"],
2026-03-17 19:30:15 +02:00
"output": test_run["output"], "_project_test": True})
auto_test_attempt = 0
while not test_run["success"] and auto_test_attempt < max_auto_test_attempts:
auto_test_attempt += 1
fix_context = (
f"Automated project test run ({p_test_cmd}) failed after your changes.\n"
f"Test output:\n{test_run['output'][:4000]}\n"
f"Fix the failing tests. Do NOT modify test files."
)
fix_result = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=fix_context,
dry_run=False,
allow_write=allow_write,
noninteractive=noninteractive,
)
total_cost += fix_result.get("cost_usd") or 0
total_tokens += fix_result.get("tokens_used") or 0
total_duration += fix_result.get("duration_seconds") or 0
results.append({**fix_result, "_auto_test_fix_attempt": auto_test_attempt})
2026-03-18 13:33:29 +02:00
test_run = _run_project_tests(p_path_str, p_test_cmd, timeout=auto_test_timeout)
2026-03-17 19:30:15 +02:00
results.append({"role": "_auto_test", "success": test_run["success"],
"output": test_run["output"], "_project_test": True,
"_attempt": auto_test_attempt})
if not test_run["success"]:
block_reason = (
f"Auto-test ({p_test_cmd}) failed after {auto_test_attempt} fix attempt(s). "
f"Last output: {test_run['output'][:500]}"
)
models.update_task(conn, task_id, status="blocked", blocked_reason=block_reason)
if pipeline:
models.update_pipeline(conn, pipeline["id"], status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration)
return {
"success": False,
"error": block_reason,
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# Auto-test loop: if tester step has auto_fix=true and tests failed,
# call fix_role agent and re-run tester up to max_attempts times.
if (
not dry_run
and step.get("auto_fix")
and role == "tester"
and result["success"]
and _is_test_failure(result)
):
max_attempts = int(step.get("max_attempts", 3))
fix_role = step.get("fix_role", "backend_dev")
fix_model = step.get("fix_model", model)
attempt = 0
while attempt < max_attempts and _is_test_failure(result):
attempt += 1
tester_output = result.get("raw_output") or result.get("output") or ""
if isinstance(tester_output, (dict, list)):
tester_output = json.dumps(tester_output, ensure_ascii=False)
# Run fixer
fix_result = run_agent(
conn, fix_role, task_id, project_id,
model=fix_model,
previous_output=tester_output,
dry_run=False,
allow_write=allow_write,
noninteractive=noninteractive,
)
total_cost += fix_result.get("cost_usd") or 0
total_tokens += fix_result.get("tokens_used") or 0
total_duration += fix_result.get("duration_seconds") or 0
results.append({**fix_result, "_auto_fix_attempt": attempt})
# Re-run tester
fix_output = fix_result.get("raw_output") or fix_result.get("output") or ""
if isinstance(fix_output, (dict, list)):
fix_output = json.dumps(fix_output, ensure_ascii=False)
retest = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=fix_output,
dry_run=False,
allow_write=allow_write,
noninteractive=noninteractive,
)
total_cost += retest.get("cost_usd") or 0
total_tokens += retest.get("tokens_used") or 0
total_duration += retest.get("duration_seconds") or 0
result = retest
results.append({**result, "_auto_retest_attempt": attempt})
# Save final test result regardless of outcome
try:
final_output = result.get("raw_output") or result.get("output") or ""
models.update_task(conn, task_id, test_result={
"output": final_output if isinstance(final_output, str) else str(final_output),
"auto_fix_attempts": attempt,
"passed": not _is_test_failure(result),
})
except Exception:
pass
# Chain output to next step
previous_output = result.get("raw_output") or result.get("output")
if isinstance(previous_output, (dict, list)):
previous_output = json.dumps(previous_output, ensure_ascii=False)
# Pipeline completed
if pipeline and not dry_run:
models.update_pipeline(
conn, pipeline["id"],
status="completed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
2026-03-17 17:26:31 +02:00
# KIN-084: log pipeline completed
try:
models.write_log(
conn, pipeline["id"],
f"Pipeline completed: {len(steps)} steps, cost=${total_cost:.4f}, tokens={total_tokens}, duration={total_duration}s",
extra={
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"steps_count": len(steps),
},
)
except Exception:
pass
task_modules = models.get_modules(conn, project_id)
# Compute changed files for hook filtering (frontend build trigger)
changed_files: list[str] | None = None
project = models.get_project(conn, project_id)
if project and project.get("path"):
p_path = Path(project["path"]).expanduser()
if p_path.is_dir():
changed_files = _get_changed_files(str(p_path))
2026-03-17 23:31:24 +02:00
# Fix D: retroactive suspicious guard — if no files changed after pipeline,
# mark successful dev-agent steps as suspicious (agent may have not written files)
if changed_files is not None and not changed_files:
for _r in results:
if (_r.get("role") in _DEV_GUARD_ROLES
and _r.get("success")
and not _r.get("suspicious")):
_r["suspicious"] = True
_r["suspicious_reason"] = "agent reported success but no file changes detected"
_logger.warning(
"KIN-117: suspicious step %s — no file changes after pipeline",
_r.get("role"),
)
last_role = steps[-1].get("role", "") if steps else ""
2026-03-17 14:03:53 +02:00
# For dept pipelines: if last step is a _head, check the last worker in its sub-pipeline
effective_last_role = _last_sub_role if (_is_department_head(last_role) and _last_sub_role) else last_role
auto_eligible = effective_last_role in {"tester", "reviewer"}
# Guard: re-fetch current status — user may have manually changed it while pipeline ran
current_task = models.get_task(conn, task_id)
current_status = current_task.get("status") if current_task else None
2026-03-17 22:18:19 +02:00
# KIN-116: detect destructive ops — force review even in auto mode
destructive_ops = _detect_destructive_operations(results) if results else []
if destructive_ops and mode == "auto_complete":
mode = "review" # Downgrade to review for this pipeline run
_logger.warning(
"KIN-116: destructive operations detected in pipeline output — "
"forcing task %s to review. Patterns: %s",
task_id,
destructive_ops[:5],
)
try:
models.log_audit_event(
conn,
event_type="destructive_ops_detected",
task_id=task_id,
step_id="runner",
reason=f"Destructive operations detected: {destructive_ops[:5]}",
project_id=project_id,
)
except Exception:
pass
if current_status in ("done", "cancelled"):
pass # User finished manually — don't overwrite
elif mode == "auto_complete" and auto_eligible:
2026-03-19 15:50:52 +02:00
# KIN-133: gate check — if final gate agent rejects, block instead of closing
_gate_result = _find_gate_result(results, effective_last_role)
_cannot_close = (
_parse_gate_cannot_close(_gate_result, effective_last_role)
if _gate_result else None
)
if _cannot_close is not None:
_block_reason = _cannot_close["reason"]
2026-03-21 08:18:11 +02:00
_pipeline_type = (pipeline or {}).get("pipeline_type", "standard")
# KIN-136: auto-return — attempt re-run instead of blocking when:
# - reviewer did not set an exit_condition requiring human intervention
# - not inside an escalation pipeline (guard against recursive loops)
# - not a dry_run
_exit_cond = _parse_exit_condition(_gate_result or {}, effective_last_role)
if _exit_cond is None and _pipeline_type != "escalation" and not dry_run:
_gate_out = (_gate_result or {}).get("output")
_gate_out_json = (
json.dumps(_gate_out, ensure_ascii=False)
if isinstance(_gate_out, dict) else str(_gate_out or "")
)
_ar = _trigger_auto_return(
conn, task_id, project_id, pipeline,
original_steps=steps,
gate_role=effective_last_role,
gate_reason=_block_reason,
allow_write=allow_write,
noninteractive=noninteractive,
gate_output_json=_gate_out_json,
)
if not _ar["should_escalate"]:
_ar_result = _ar["auto_return_result"]
_ar_result["auto_returned"] = True
return _ar_result
# Threshold exceeded — fall through to human escalation
_block_reason = f"{_block_reason} [auto_return_limit_reached]"
# Human escalation path: exit_condition set, escalation pipeline,
# dry_run, or auto-return threshold exceeded
2026-03-19 15:50:52 +02:00
models.update_task(
conn, task_id,
status="blocked",
blocked_reason=_block_reason,
blocked_agent_role=effective_last_role,
blocked_pipeline_step=str(len(steps)),
)
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
2026-03-20 21:56:46 +02:00
# KIN-135: record gate return — skip for escalation pipelines to avoid loops
if _pipeline_type != "escalation":
try:
models.record_task_return(
conn,
task_id=task_id,
reason_category="recurring_quality_fail",
reason_text=f"Gate {effective_last_role}: {_block_reason[:200]}",
returned_by=effective_last_role,
pipeline_id=pipeline["id"] if pipeline else None,
)
except Exception:
pass # Never block pipeline on return tracking errors
2026-03-19 15:50:52 +02:00
try:
models.write_log(
conn, pipeline["id"] if pipeline else None,
f"Gate cannot_close: {effective_last_role} refused to approve — {_block_reason}",
level="WARN",
extra={"role": effective_last_role, "reason": _block_reason},
)
except Exception:
pass
try:
from core.telegram import send_telegram_escalation
_project = models.get_project(conn, project_id)
_project_name = _project["name"] if _project else project_id
_sent = send_telegram_escalation(
task_id=task_id,
project_name=_project_name,
agent_role=effective_last_role,
reason=_block_reason,
pipeline_step=str(len(steps)),
)
if _sent:
models.mark_telegram_sent(conn, task_id)
except Exception:
pass
return {
"success": False,
"error": f"Gate {effective_last_role} refused to close task: {_block_reason}",
"blocked_by": effective_last_role,
"blocked_reason": _block_reason,
"steps_completed": len(steps),
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# Auto-complete mode: gate approved — close task immediately
models.update_task(conn, task_id, status="done")
2026-03-17 17:26:31 +02:00
# KIN-084: log task status
try:
models.write_log(
conn, pipeline["id"],
"Task status: done",
extra={"task_status": "done", "mode": mode},
)
except Exception:
pass
try:
run_hooks(conn, project_id, task_id,
event="task_auto_approved", task_modules=task_modules)
except Exception:
pass
try:
run_hooks(conn, project_id, task_id,
event="task_done", task_modules=task_modules)
except Exception:
pass
# Auto followup: generate tasks, auto-resolve permission issues.
# Guard: skip for followup-sourced tasks to prevent infinite recursion.
task_brief = task.get("brief") or {}
is_followup_task = (
isinstance(task_brief, dict)
and str(task_brief.get("source", "")).startswith("followup:")
)
if not is_followup_task:
try:
from core.followup import generate_followups, auto_resolve_pending_actions
fu_result = generate_followups(conn, task_id)
if fu_result.get("pending_actions"):
auto_resolve_pending_actions(conn, task_id, fu_result["pending_actions"])
except Exception:
pass
else:
# Review mode: wait for manual approval
models.update_task(conn, task_id, status="review", execution_mode="review")
2026-03-17 17:26:31 +02:00
# KIN-084: log task status
try:
models.write_log(
conn, pipeline["id"],
"Task status: review",
extra={"task_status": "review", "mode": mode},
)
except Exception:
pass
# Run post-pipeline hooks (failures don't affect pipeline status)
try:
run_hooks(conn, project_id, task_id,
event="pipeline_completed", task_modules=task_modules,
changed_files=changed_files)
except Exception:
pass # Hook errors must never block pipeline completion
# Auto-learning: extract decisions from pipeline results
if results:
try:
_run_learning_extraction(conn, task_id, project_id, results)
except Exception:
pass # Learning errors must never block pipeline completion
# Auto-commit changes after successful pipeline
try:
_run_autocommit(conn, task_id, project_id)
except Exception:
pass # Autocommit errors must never block pipeline completion
return {
"success": True,
"steps_completed": len(steps),
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
"dry_run": dry_run,
"mode": mode,
}