kin/agents/runner.py

1158 lines
40 KiB
Python
Raw Normal View History

"""
Kin agent runner launches Claude Code as subprocess with role-specific context.
Each agent = separate process with isolated context.
"""
import json
import logging
import os
import shutil
import sqlite3
import subprocess
import time
from pathlib import Path
from typing import Any
import re
_logger = logging.getLogger("kin.runner")
# Extra PATH entries to inject when searching for claude CLI.
# launchctl daemons start with a stripped PATH that may omit these.
_EXTRA_PATH_DIRS = [
"/opt/homebrew/bin",
"/opt/homebrew/sbin",
"/usr/local/bin",
"/usr/local/sbin",
]
def _build_claude_env() -> dict:
"""Return an env dict with an extended PATH that includes common CLI tool locations.
Merges _EXTRA_PATH_DIRS with the current process PATH, deduplicating entries.
Also resolves ~/.nvm/versions/node/*/bin globs that launchctl may not expand.
"""
env = os.environ.copy()
existing = env.get("PATH", "").split(":")
extra = list(_EXTRA_PATH_DIRS)
# Expand nvm node bin dirs dynamically
nvm_root = Path.home() / ".nvm" / "versions" / "node"
if nvm_root.is_dir():
for node_ver in sorted(nvm_root.iterdir(), reverse=True):
bin_dir = node_ver / "bin"
if bin_dir.is_dir():
extra.append(str(bin_dir))
seen: set[str] = set()
deduped: list[str] = []
for d in extra + existing:
if d and d not in seen:
seen.add(d)
deduped.append(d)
env["PATH"] = ":".join(deduped)
return env
def _resolve_claude_cmd() -> str:
"""Return the full path to the claude CLI, or 'claude' as fallback."""
extended_env = _build_claude_env()
found = shutil.which("claude", path=extended_env["PATH"])
return found or "claude"
from core import models
from core.context_builder import build_context, format_prompt
from core.hooks import run_hooks
class ClaudeAuthError(Exception):
"""Raised when Claude CLI is not authenticated or not available."""
pass
def check_claude_auth(timeout: int = 10) -> None:
"""Check that claude CLI is authenticated before running a pipeline.
Runs: claude -p 'ok' --output-format json with timeout.
Returns None if auth is confirmed.
Raises ClaudeAuthError if:
- claude CLI not found in PATH (FileNotFoundError)
- stdout/stderr contains 'not logged in' (case-insensitive)
- returncode != 0
- is_error=true in parsed JSON output
Returns silently on TimeoutExpired (ambiguous don't block pipeline).
"""
claude_cmd = _resolve_claude_cmd()
env = _build_claude_env()
try:
proc = subprocess.run(
[claude_cmd, "-p", "ok", "--output-format", "json"],
capture_output=True,
text=True,
timeout=timeout,
env=env,
stdin=subprocess.DEVNULL,
)
except FileNotFoundError:
raise ClaudeAuthError("claude CLI not found in PATH. Install it or add to PATH.")
except subprocess.TimeoutExpired:
return # Ambiguous — don't block pipeline on timeout
stdout = proc.stdout or ""
stderr = proc.stderr or ""
combined = stdout + stderr
if "not logged in" in combined.lower():
raise ClaudeAuthError("Claude CLI requires login. Run: claude login")
if proc.returncode != 0:
raise ClaudeAuthError("Claude CLI requires login. Run: claude login")
parsed = _try_parse_json(stdout)
if isinstance(parsed, dict) and parsed.get("is_error"):
raise ClaudeAuthError("Claude CLI requires login. Run: claude login")
def run_agent(
conn: sqlite3.Connection,
role: str,
task_id: str,
project_id: str,
model: str = "sonnet",
previous_output: str | None = None,
brief_override: str | None = None,
dry_run: bool = False,
allow_write: bool = False,
noninteractive: bool = False,
) -> dict:
"""Run a single Claude Code agent as a subprocess.
1. Build context from DB
2. Format prompt with role template
3. Run: claude -p "{prompt}" --output-format json
4. Log result to agent_logs
5. Return {success, output, tokens_used, duration_seconds, cost_usd}
"""
# Build context
ctx = build_context(conn, task_id, role, project_id)
if previous_output:
ctx["previous_output"] = previous_output
if brief_override:
if ctx.get("task"):
ctx["task"]["brief"] = brief_override
prompt = format_prompt(ctx, role)
if dry_run:
return {
"success": True,
"output": None,
"prompt": prompt,
"role": role,
"model": model,
"dry_run": True,
}
# Determine working directory
project = models.get_project(conn, project_id)
working_dir = None
# Operations projects have no local path — sysadmin works via SSH
is_operations = project and project.get("project_type") == "operations"
if not is_operations and project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security"):
project_path = Path(project["path"]).expanduser()
if project_path.is_dir():
working_dir = str(project_path)
# Run claude subprocess
start = time.monotonic()
result = _run_claude(prompt, model=model, working_dir=working_dir,
allow_write=allow_write, noninteractive=noninteractive)
duration = int(time.monotonic() - start)
# Parse output — ensure output_text is always a string for DB storage
raw_output = result.get("output", "")
if not isinstance(raw_output, str):
raw_output = json.dumps(raw_output, ensure_ascii=False)
output_text = raw_output
success = result["returncode"] == 0
parsed_output = _try_parse_json(output_text)
# Log FULL output to DB (no truncation)
models.log_agent_run(
conn,
project_id=project_id,
task_id=task_id,
agent_role=role,
action="execute",
input_summary=f"task={task_id}, model={model}",
output_summary=output_text or None,
tokens_used=result.get("tokens_used"),
model=model,
cost_usd=result.get("cost_usd"),
success=success,
error_message=result.get("error") if not success else None,
duration_seconds=duration,
)
return {
"success": success,
"error": result.get("error") if not success else None,
"output": parsed_output if parsed_output else output_text,
"raw_output": output_text,
"role": role,
"model": model,
"duration_seconds": duration,
"tokens_used": result.get("tokens_used"),
"cost_usd": result.get("cost_usd"),
}
def _run_claude(
prompt: str,
model: str = "sonnet",
working_dir: str | None = None,
allow_write: bool = False,
noninteractive: bool = False,
timeout: int | None = None,
) -> dict:
"""Execute claude CLI as subprocess. Returns dict with output, returncode, etc."""
claude_cmd = _resolve_claude_cmd()
cmd = [
claude_cmd,
"-p", prompt,
"--output-format", "json",
"--model", model,
]
if allow_write:
cmd.append("--dangerously-skip-permissions")
is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1"
if timeout is None:
timeout = int(os.environ.get("KIN_AGENT_TIMEOUT") or 600)
env = _build_claude_env()
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
cwd=working_dir,
env=env,
stdin=subprocess.DEVNULL if is_noninteractive else None,
)
except FileNotFoundError:
return {
"output": "",
"error": "claude CLI not found in PATH",
"returncode": 127,
}
except subprocess.TimeoutExpired:
return {
"output": "",
"error": f"Agent timed out after {timeout}s",
"returncode": 124,
}
# Always preserve the full raw stdout
raw_stdout = proc.stdout or ""
result: dict[str, Any] = {
"output": raw_stdout,
"error": proc.stderr or None, # preserve stderr always for diagnostics
"empty_output": not raw_stdout.strip(),
"returncode": proc.returncode,
}
# Parse JSON wrapper from claude --output-format json
# Extract metadata (tokens, cost) but keep output as the full content string
parsed = _try_parse_json(raw_stdout)
if isinstance(parsed, dict):
result["tokens_used"] = parsed.get("usage", {}).get("total_tokens")
result["cost_usd"] = parsed.get("cost_usd")
# Extract the agent's actual response, converting to string if needed
content = parsed.get("result") or parsed.get("content")
if content is not None:
result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False)
return result
def _try_parse_json(text: str) -> Any:
"""Try to parse JSON from text. Returns parsed obj or None."""
text = text.strip()
if not text:
return None
# Direct parse
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try to find JSON block in markdown code fences
import re
m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
# Try to find first { ... } or [ ... ]
for start_char, end_char in [("{", "}"), ("[", "]")]:
start = text.find(start_char)
if start >= 0:
# Find matching close
depth = 0
for i in range(start, len(text)):
if text[i] == start_char:
depth += 1
elif text[i] == end_char:
depth -= 1
if depth == 0:
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
break
return None
# ---------------------------------------------------------------------------
# Backlog audit
# ---------------------------------------------------------------------------
PROMPTS_DIR = Path(__file__).parent / "prompts"
_LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish",
"de": "German", "fr": "French"}
def run_audit(
conn: sqlite3.Connection,
project_id: str,
noninteractive: bool = False,
auto_apply: bool = False,
) -> dict:
"""Audit pending tasks against the actual codebase.
auto_apply=True: marks already_done tasks as done in DB.
auto_apply=False: returns results only (for API/GUI).
Returns {success, already_done, still_pending, unclear, duration_seconds, ...}
"""
project = models.get_project(conn, project_id)
if not project:
return {"success": False, "error": f"Project '{project_id}' not found"}
pending = models.list_tasks(conn, project_id=project_id, status="pending")
if not pending:
return {
"success": True,
"already_done": [],
"still_pending": [],
"unclear": [],
"message": "No pending tasks to audit",
}
# Build prompt
prompt_path = PROMPTS_DIR / "backlog_audit.md"
template = prompt_path.read_text() if prompt_path.exists() else (
"You are a QA analyst. Check if pending tasks are already done in the code."
)
task_list = [
{"id": t["id"], "title": t["title"], "brief": t.get("brief")}
for t in pending
]
sections = [
template,
"",
f"## Project: {project['id']}{project['name']}",
]
if project.get("tech_stack"):
sections.append(f"Tech stack: {', '.join(project['tech_stack'])}")
sections.append(f"Path: {project['path']}")
sections.append("")
sections.append(f"## Pending tasks ({len(task_list)}):")
sections.append(json.dumps(task_list, ensure_ascii=False, indent=2))
sections.append("")
language = project.get("language", "ru")
lang_name = _LANG_NAMES.get(language, language)
sections.append("## Language")
sections.append(f"ALWAYS respond in {lang_name}.")
sections.append("")
prompt = "\n".join(sections)
# Determine working dir
working_dir = None
project_path = Path(project["path"]).expanduser()
if project_path.is_dir():
working_dir = str(project_path)
# Run agent — allow_write=True so claude can use Read/Bash tools
# without interactive permission prompts (critical for noninteractive mode)
start = time.monotonic()
result = _run_claude(prompt, model="sonnet", working_dir=working_dir,
allow_write=True, noninteractive=noninteractive)
duration = int(time.monotonic() - start)
raw_output = result.get("output", "")
if not isinstance(raw_output, str):
raw_output = json.dumps(raw_output, ensure_ascii=False)
success = result["returncode"] == 0
# Log to agent_logs
models.log_agent_run(
conn,
project_id=project_id,
task_id=None,
agent_role="backlog_audit",
action="audit",
input_summary=f"project={project_id}, pending_tasks={len(pending)}",
output_summary=raw_output or None,
tokens_used=result.get("tokens_used"),
model="sonnet",
cost_usd=result.get("cost_usd"),
success=success,
error_message=result.get("error") if not success else None,
duration_seconds=duration,
)
if not success:
return {
"success": False,
"error": result.get("error", "Agent failed"),
"raw_output": raw_output,
"duration_seconds": duration,
}
# Parse structured output
parsed = _try_parse_json(raw_output)
if not isinstance(parsed, dict):
return {
"success": False,
"error": "Agent returned non-JSON output",
"raw_output": raw_output,
"duration_seconds": duration,
}
already_done = parsed.get("already_done", [])
# Auto-apply: mark already_done tasks as done in DB
applied = []
if auto_apply and already_done:
for item in already_done:
tid = item.get("id")
if tid:
t = models.get_task(conn, tid)
if t and t["project_id"] == project_id and t["status"] == "pending":
models.update_task(conn, tid, status="done")
applied.append(tid)
return {
"success": True,
"already_done": already_done,
"still_pending": parsed.get("still_pending", []),
"unclear": parsed.get("unclear", []),
"applied": applied,
"duration_seconds": duration,
"tokens_used": result.get("tokens_used"),
"cost_usd": result.get("cost_usd"),
}
# ---------------------------------------------------------------------------
# Blocked protocol detection
# ---------------------------------------------------------------------------
def _parse_agent_blocked(result: dict) -> dict | None:
"""Detect semantic blocked status from a successful agent result.
Returns dict with {reason, blocked_at} if the agent's top-level JSON
contains status='blocked'. Returns None otherwise.
Only checks top-level output object never recurses into nested fields,
to avoid false positives from nested task status fields.
"""
from datetime import datetime
if not result.get("success"):
return None
output = result.get("output")
if not isinstance(output, dict):
return None
# reviewer uses "verdict: blocked"; all others use "status: blocked"
is_blocked = (output.get("status") == "blocked" or output.get("verdict") == "blocked")
if not is_blocked:
return None
return {
"reason": output.get("reason") or output.get("blocked_reason") or "",
"blocked_at": output.get("blocked_at") or datetime.now().isoformat(),
}
# ---------------------------------------------------------------------------
# Permission error detection
# ---------------------------------------------------------------------------
def _is_permission_error(result: dict) -> bool:
"""Return True if agent result indicates a permission/write failure."""
from core.followup import PERMISSION_PATTERNS
output = (result.get("raw_output") or result.get("output") or "")
if not isinstance(output, str):
output = json.dumps(output, ensure_ascii=False)
error = result.get("error") or ""
text = output + " " + error
return any(re.search(p, text) for p in PERMISSION_PATTERNS)
# ---------------------------------------------------------------------------
# Autocommit: git add -A && git commit after successful pipeline
# ---------------------------------------------------------------------------
def _get_changed_files(project_path: str) -> list[str]:
"""Return files changed in the current pipeline run.
Combines unstaged changes, staged changes, and the last commit diff
to cover both autocommit-on and autocommit-off scenarios.
Returns [] on any git error (e.g. no git repo, first commit).
"""
env = _build_claude_env()
git_cmd = shutil.which("git", path=env["PATH"]) or "git"
files: set[str] = set()
for git_args in (
["diff", "--name-only"], # unstaged tracked changes
["diff", "--cached", "--name-only"], # staged changes
["diff", "HEAD~1", "HEAD", "--name-only"], # last commit (post-autocommit)
):
try:
r = subprocess.run(
[git_cmd] + git_args,
cwd=project_path,
capture_output=True,
text=True,
timeout=10,
env=env,
)
if r.returncode == 0:
files.update(f.strip() for f in r.stdout.splitlines() if f.strip())
except Exception:
pass
return list(files)
def _run_autocommit(
conn: sqlite3.Connection,
task_id: str,
project_id: str,
) -> None:
"""Auto-commit changes after successful pipeline completion.
Runs: git add -A && git commit -m 'kin: {task_id} {title}'.
Silently skips if nothing to commit (exit code 1) or project path not found.
Never raises autocommit errors must never block the pipeline.
Uses stderr=subprocess.DEVNULL per decision #30.
"""
task = models.get_task(conn, task_id)
project = models.get_project(conn, project_id)
if not task or not project:
return
if not project.get("autocommit_enabled"):
return
project_path = Path(project["path"]).expanduser()
if not project_path.is_dir():
return
working_dir = str(project_path)
env = _build_claude_env()
git_cmd = shutil.which("git", path=env["PATH"]) or "git"
title = (task.get("title") or "").replace('"', "'").replace("\n", " ").replace("\r", "")
commit_msg = f"kin: {task_id} {title}"
try:
subprocess.run(
[git_cmd, "add", "-A"],
cwd=working_dir,
env=env,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
)
result = subprocess.run(
[git_cmd, "commit", "-m", commit_msg],
cwd=working_dir,
env=env,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
)
if result.returncode == 0:
_logger.info("Autocommit: %s", commit_msg)
else:
_logger.debug("Autocommit: nothing to commit for %s", task_id)
except Exception as exc:
_logger.warning("Autocommit failed for %s: %s", task_id, exc)
# ---------------------------------------------------------------------------
# Sysadmin output: save server map to decisions and modules
# ---------------------------------------------------------------------------
def _save_sysadmin_output(
conn: sqlite3.Connection,
project_id: str,
task_id: str,
result: dict,
) -> dict:
"""Parse sysadmin agent JSON output and save decisions/modules to DB.
Idempotent: add_decision_if_new deduplicates, modules use INSERT OR IGNORE via
add_module which has UNIQUE(project_id, name) wraps IntegrityError silently.
Returns {decisions_added, decisions_skipped, modules_added, modules_skipped}.
"""
raw = result.get("raw_output") or result.get("output") or ""
if isinstance(raw, (dict, list)):
raw = json.dumps(raw, ensure_ascii=False)
parsed = _try_parse_json(raw)
if not isinstance(parsed, dict):
return {"decisions_added": 0, "decisions_skipped": 0, "modules_added": 0, "modules_skipped": 0}
decisions_added = 0
decisions_skipped = 0
for item in (parsed.get("decisions") or []):
if not isinstance(item, dict):
continue
d_type = item.get("type", "decision")
if d_type not in VALID_DECISION_TYPES:
d_type = "decision"
d_title = (item.get("title") or "").strip()
d_desc = (item.get("description") or "").strip()
if not d_title or not d_desc:
continue
saved = models.add_decision_if_new(
conn,
project_id=project_id,
type=d_type,
title=d_title,
description=d_desc,
tags=item.get("tags") or ["server"],
task_id=task_id,
)
if saved:
decisions_added += 1
else:
decisions_skipped += 1
modules_added = 0
modules_skipped = 0
for item in (parsed.get("modules") or []):
if not isinstance(item, dict):
continue
m_name = (item.get("name") or "").strip()
m_type = (item.get("type") or "service").strip()
m_path = (item.get("path") or "").strip()
if not m_name:
continue
try:
m = models.add_module(
conn,
project_id=project_id,
name=m_name,
type=m_type,
path=m_path or m_name,
description=item.get("description"),
owner_role="sysadmin",
)
if m.get("_created", True):
modules_added += 1
else:
modules_skipped += 1
except Exception:
modules_skipped += 1
return {
"decisions_added": decisions_added,
"decisions_skipped": decisions_skipped,
"modules_added": modules_added,
"modules_skipped": modules_skipped,
}
# ---------------------------------------------------------------------------
# Auto-learning: extract decisions from pipeline results
# ---------------------------------------------------------------------------
VALID_DECISION_TYPES = {"decision", "gotcha", "convention"}
def _run_learning_extraction(
conn: sqlite3.Connection,
task_id: str,
project_id: str,
step_results: list[dict],
) -> dict:
"""Extract and save decisions from completed pipeline results.
Calls the learner agent with step outputs + existing decisions,
parses the JSON response, and saves new decisions via add_decision_if_new.
Returns a summary dict with added/skipped counts.
"""
learner_prompt_path = PROMPTS_DIR / "learner.md"
if not learner_prompt_path.exists():
return {"added": 0, "skipped": 0, "error": "learner.md not found"}
template = learner_prompt_path.read_text()
# Summarize step outputs (first 2000 chars each)
step_summaries = {}
for r in step_results:
role = r.get("role", "unknown")
output = r.get("raw_output") or r.get("output") or ""
if isinstance(output, (dict, list)):
output = json.dumps(output, ensure_ascii=False)
step_summaries[role] = output[:2000]
# Fetch existing decisions for dedup hint
existing = models.get_decisions(conn, project_id)
existing_hints = [
{"title": d["title"], "type": d["type"]}
for d in existing
]
prompt_parts = [
template,
"",
"## PIPELINE_OUTPUTS",
json.dumps(step_summaries, ensure_ascii=False, indent=2),
"",
"## EXISTING_DECISIONS",
json.dumps(existing_hints, ensure_ascii=False, indent=2),
]
prompt = "\n".join(prompt_parts)
learner_timeout = int(os.environ.get("KIN_LEARNER_TIMEOUT") or 120)
start = time.monotonic()
result = _run_claude(prompt, model="sonnet", noninteractive=True, timeout=learner_timeout)
duration = int(time.monotonic() - start)
raw_output = result.get("output", "")
if not isinstance(raw_output, str):
raw_output = json.dumps(raw_output, ensure_ascii=False)
success = result["returncode"] == 0
# Log to agent_logs
models.log_agent_run(
conn,
project_id=project_id,
task_id=task_id,
agent_role="learner",
action="learn",
input_summary=f"project={project_id}, task={task_id}, steps={len(step_results)}",
output_summary=raw_output or None,
tokens_used=result.get("tokens_used"),
model="sonnet",
cost_usd=result.get("cost_usd"),
success=success,
error_message=result.get("error") if not success else None,
duration_seconds=duration,
)
parsed = _try_parse_json(raw_output)
if not isinstance(parsed, dict):
return {"added": 0, "skipped": 0, "error": "non-JSON learner output"}
decisions = parsed.get("decisions", [])
if not isinstance(decisions, list):
return {"added": 0, "skipped": 0, "error": "invalid decisions format"}
added = 0
skipped = 0
for item in decisions[:5]:
if not isinstance(item, dict):
continue
d_type = item.get("type", "decision")
if d_type not in VALID_DECISION_TYPES:
d_type = "decision"
d_title = (item.get("title") or "").strip()
d_desc = (item.get("description") or "").strip()
if not d_title or not d_desc:
continue
saved = models.add_decision_if_new(
conn,
project_id=project_id,
type=d_type,
title=d_title,
description=d_desc,
tags=item.get("tags") or [],
task_id=task_id,
)
if saved:
added += 1
else:
skipped += 1
return {"added": added, "skipped": skipped}
# ---------------------------------------------------------------------------
# Pipeline executor
# ---------------------------------------------------------------------------
def run_pipeline(
conn: sqlite3.Connection,
task_id: str,
steps: list[dict],
dry_run: bool = False,
allow_write: bool = False,
noninteractive: bool = False,
) -> dict:
"""Execute a multi-step pipeline of agents.
steps = [
{"role": "debugger", "model": "opus", "brief": "..."},
{"role": "tester", "depends_on": "debugger", "brief": "..."},
]
Returns {success, steps_completed, total_cost, total_tokens, total_duration, results}
"""
# Auth check — skip for dry_run (dry_run never calls claude CLI)
if not dry_run:
try:
check_claude_auth()
except ClaudeAuthError as exc:
return {
"success": False,
"error": "claude_auth_required",
"message": str(exc),
"instructions": "Run: claude login",
}
task = models.get_task(conn, task_id)
if not task:
return {"success": False, "error": f"Task '{task_id}' not found"}
project_id = task["project_id"]
# Determine route type from steps or task brief
route_type = "custom"
if task.get("brief") and isinstance(task["brief"], dict):
route_type = task["brief"].get("route_type", "custom") or "custom"
# Determine execution mode (auto vs review)
mode = models.get_effective_mode(conn, project_id, task_id)
# Create pipeline in DB
pipeline = None
if not dry_run:
pipeline = models.create_pipeline(
conn, task_id, project_id, route_type, steps,
)
models.update_task(conn, task_id, status="in_progress")
results = []
total_cost = 0.0
total_tokens = 0
total_duration = 0
previous_output = None
for i, step in enumerate(steps):
role = step["role"]
model = step.get("model", "sonnet")
brief = step.get("brief")
try:
result = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=previous_output,
brief_override=brief,
dry_run=dry_run,
allow_write=allow_write,
noninteractive=noninteractive,
)
except Exception as exc:
exc_msg = f"Step {i+1}/{len(steps)} ({role}) raised exception: {exc}"
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
models.log_agent_run(
conn,
project_id=project_id,
task_id=task_id,
agent_role=role,
action="execute",
input_summary=f"task={task_id}, model={model}",
output_summary=None,
success=False,
error_message=exc_msg,
)
models.update_task(conn, task_id, status="blocked", blocked_reason=exc_msg)
try:
from core.telegram import send_telegram_escalation
project = models.get_project(conn, project_id)
project_name = project["name"] if project else project_id
sent = send_telegram_escalation(
task_id=task_id,
project_name=project_name,
agent_role=role,
reason=exc_msg,
pipeline_step=str(i + 1),
)
if sent:
models.mark_telegram_sent(conn, task_id)
except Exception:
pass # Telegram errors must never block pipeline
return {
"success": False,
"error": exc_msg,
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
if dry_run:
results.append(result)
continue
# Accumulate stats
total_cost += result.get("cost_usd") or 0
total_tokens += result.get("tokens_used") or 0
total_duration += result.get("duration_seconds") or 0
if not result["success"]:
# Auto mode: retry once with allow_write on permission error
if mode == "auto_complete" and not allow_write and _is_permission_error(result):
task_modules = models.get_modules(conn, project_id)
try:
run_hooks(conn, project_id, task_id,
event="task_permission_retry",
task_modules=task_modules)
except Exception:
pass
# Audit log: record dangerous skip before retry
try:
models.log_audit_event(
conn,
event_type="dangerous_skip",
task_id=task_id,
step_id=role,
reason=f"auto mode permission retry: step {i+1}/{len(steps)} ({role})",
project_id=project_id,
)
models.update_task(conn, task_id, dangerously_skipped=1)
except Exception:
pass
retry = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=previous_output,
brief_override=brief,
dry_run=False,
allow_write=True,
noninteractive=noninteractive,
)
allow_write = True # subsequent steps also with allow_write
total_cost += retry.get("cost_usd") or 0
total_tokens += retry.get("tokens_used") or 0
total_duration += retry.get("duration_seconds") or 0
if retry["success"]:
result = retry
if not result["success"]:
# Still failed — block regardless of mode
results.append(result)
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
agent_error = result.get("error") or ""
error_msg = f"Step {i+1}/{len(steps)} ({role}) failed"
if agent_error:
error_msg += f": {agent_error}"
models.update_task(conn, task_id, status="blocked", blocked_reason=error_msg)
return {
"success": False,
"error": error_msg,
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
results.append(result)
# Semantic blocked: agent ran successfully but returned status='blocked'
blocked_info = _parse_agent_blocked(result)
if blocked_info:
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
models.update_task(
conn, task_id,
status="blocked",
blocked_reason=blocked_info["reason"],
blocked_at=blocked_info["blocked_at"],
blocked_agent_role=role,
blocked_pipeline_step=str(i + 1),
)
try:
from core.telegram import send_telegram_escalation
project = models.get_project(conn, project_id)
project_name = project["name"] if project else project_id
sent = send_telegram_escalation(
task_id=task_id,
project_name=project_name,
agent_role=role,
reason=blocked_info["reason"],
pipeline_step=str(i + 1),
)
if sent:
models.mark_telegram_sent(conn, task_id)
except Exception:
pass # Telegram errors must never block pipeline
error_msg = f"Step {i+1}/{len(steps)} ({role}) blocked: {blocked_info['reason']}"
return {
"success": False,
"error": error_msg,
"blocked_by": role,
"blocked_reason": blocked_info["reason"],
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# Save sysadmin scan results immediately after a successful sysadmin step
if role == "sysadmin" and result["success"] and not dry_run:
try:
_save_sysadmin_output(conn, project_id, task_id, result)
except Exception:
pass # Never block pipeline on sysadmin save errors
# Chain output to next step
previous_output = result.get("raw_output") or result.get("output")
if isinstance(previous_output, (dict, list)):
previous_output = json.dumps(previous_output, ensure_ascii=False)
# Pipeline completed
if pipeline and not dry_run:
models.update_pipeline(
conn, pipeline["id"],
status="completed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
task_modules = models.get_modules(conn, project_id)
# Compute changed files for hook filtering (frontend build trigger)
changed_files: list[str] | None = None
project = models.get_project(conn, project_id)
if project and project.get("path"):
p_path = Path(project["path"]).expanduser()
if p_path.is_dir():
changed_files = _get_changed_files(str(p_path))
last_role = steps[-1].get("role", "") if steps else ""
auto_eligible = last_role in {"tester", "reviewer"}
# Guard: re-fetch current status — user may have manually changed it while pipeline ran
current_task = models.get_task(conn, task_id)
current_status = current_task.get("status") if current_task else None
if current_status in ("done", "cancelled"):
pass # User finished manually — don't overwrite
elif mode == "auto_complete" and auto_eligible:
# Auto-complete mode: last step is tester/reviewer — skip review, approve immediately
models.update_task(conn, task_id, status="done")
try:
run_hooks(conn, project_id, task_id,
event="task_auto_approved", task_modules=task_modules)
except Exception:
pass
try:
run_hooks(conn, project_id, task_id,
event="task_done", task_modules=task_modules)
except Exception:
pass
# Auto followup: generate tasks, auto-resolve permission issues.
# Guard: skip for followup-sourced tasks to prevent infinite recursion.
task_brief = task.get("brief") or {}
is_followup_task = (
isinstance(task_brief, dict)
and str(task_brief.get("source", "")).startswith("followup:")
)
if not is_followup_task:
try:
from core.followup import generate_followups, auto_resolve_pending_actions
fu_result = generate_followups(conn, task_id)
if fu_result.get("pending_actions"):
auto_resolve_pending_actions(conn, task_id, fu_result["pending_actions"])
except Exception:
pass
else:
# Review mode: wait for manual approval
models.update_task(conn, task_id, status="review", execution_mode="review")
# Run post-pipeline hooks (failures don't affect pipeline status)
try:
run_hooks(conn, project_id, task_id,
event="pipeline_completed", task_modules=task_modules,
changed_files=changed_files)
except Exception:
pass # Hook errors must never block pipeline completion
# Auto-learning: extract decisions from pipeline results
if results:
try:
_run_learning_extraction(conn, task_id, project_id, results)
except Exception:
pass # Learning errors must never block pipeline completion
# Auto-commit changes after successful pipeline
try:
_run_autocommit(conn, task_id, project_id)
except Exception:
pass # Autocommit errors must never block pipeline completion
return {
"success": True,
"steps_completed": len(steps),
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
"dry_run": dry_run,
"mode": mode,
}