""" Kin agent runner — launches Claude Code as subprocess with role-specific context. Each agent = separate process with isolated context. """ import json import logging import os import shutil import sqlite3 import subprocess import time from pathlib import Path from typing import Any import re _logger = logging.getLogger("kin.runner") # Extra PATH entries to inject when searching for claude CLI. # launchctl daemons start with a stripped PATH that may omit these. _EXTRA_PATH_DIRS = [ "/opt/homebrew/bin", "/opt/homebrew/sbin", "/usr/local/bin", "/usr/local/sbin", ] def _build_claude_env() -> dict: """Return an env dict with an extended PATH that includes common CLI tool locations. Merges _EXTRA_PATH_DIRS with the current process PATH, deduplicating entries. Also resolves ~/.nvm/versions/node/*/bin globs that launchctl may not expand. """ env = os.environ.copy() existing = env.get("PATH", "").split(":") extra = list(_EXTRA_PATH_DIRS) # Expand nvm node bin dirs dynamically nvm_root = Path.home() / ".nvm" / "versions" / "node" if nvm_root.is_dir(): for node_ver in sorted(nvm_root.iterdir(), reverse=True): bin_dir = node_ver / "bin" if bin_dir.is_dir(): extra.append(str(bin_dir)) seen: set[str] = set() deduped: list[str] = [] for d in extra + existing: if d and d not in seen: seen.add(d) deduped.append(d) env["PATH"] = ":".join(deduped) return env def _resolve_claude_cmd() -> str: """Return the full path to the claude CLI, or 'claude' as fallback.""" extended_env = _build_claude_env() found = shutil.which("claude", path=extended_env["PATH"]) return found or "claude" from core import models from core.context_builder import build_context, format_prompt from core.hooks import run_hooks class ClaudeAuthError(Exception): """Raised when Claude CLI is not authenticated or not available.""" pass def check_claude_auth(timeout: int = 10) -> None: """Check that claude CLI is authenticated before running a pipeline. Runs: claude -p 'ok' --output-format json with timeout. Returns None if auth is confirmed. Raises ClaudeAuthError if: - claude CLI not found in PATH (FileNotFoundError) - stdout/stderr contains 'not logged in' (case-insensitive) - returncode != 0 - is_error=true in parsed JSON output Returns silently on TimeoutExpired (ambiguous — don't block pipeline). """ claude_cmd = _resolve_claude_cmd() env = _build_claude_env() try: proc = subprocess.run( [claude_cmd, "-p", "ok", "--output-format", "json"], capture_output=True, text=True, timeout=timeout, env=env, stdin=subprocess.DEVNULL, ) except FileNotFoundError: raise ClaudeAuthError("claude CLI not found in PATH. Install it or add to PATH.") except subprocess.TimeoutExpired: return # Ambiguous — don't block pipeline on timeout stdout = proc.stdout or "" stderr = proc.stderr or "" combined = stdout + stderr if "not logged in" in combined.lower(): raise ClaudeAuthError("Claude CLI requires login. Run: claude login") if proc.returncode != 0: raise ClaudeAuthError("Claude CLI requires login. Run: claude login") parsed = _try_parse_json(stdout) if isinstance(parsed, dict) and parsed.get("is_error"): raise ClaudeAuthError("Claude CLI requires login. Run: claude login") def run_agent( conn: sqlite3.Connection, role: str, task_id: str, project_id: str, model: str = "sonnet", previous_output: str | None = None, brief_override: str | None = None, dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, ) -> dict: """Run a single Claude Code agent as a subprocess. 1. Build context from DB 2. Format prompt with role template 3. Run: claude -p "{prompt}" --output-format json 4. Log result to agent_logs 5. Return {success, output, tokens_used, duration_seconds, cost_usd} """ # Build context ctx = build_context(conn, task_id, role, project_id) if previous_output: ctx["previous_output"] = previous_output if brief_override: if ctx.get("task"): ctx["task"]["brief"] = brief_override prompt = format_prompt(ctx, role) if dry_run: return { "success": True, "output": None, "prompt": prompt, "role": role, "model": model, "dry_run": True, } # Determine working directory project = models.get_project(conn, project_id) working_dir = None # Operations projects have no local path — sysadmin works via SSH is_operations = project and project.get("project_type") == "operations" if not is_operations and project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security"): project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run claude subprocess start = time.monotonic() result = _run_claude(prompt, model=model, working_dir=working_dir, allow_write=allow_write, noninteractive=noninteractive) duration = int(time.monotonic() - start) # Parse output — ensure output_text is always a string for DB storage raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) output_text = raw_output success = result["returncode"] == 0 parsed_output = _try_parse_json(output_text) # Log FULL output to DB (no truncation) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=output_text or None, tokens_used=result.get("tokens_used"), model=model, cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) return { "success": success, "error": result.get("error") if not success else None, "output": parsed_output if parsed_output else output_text, "raw_output": output_text, "role": role, "model": model, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } def _run_claude( prompt: str, model: str = "sonnet", working_dir: str | None = None, allow_write: bool = False, noninteractive: bool = False, timeout: int | None = None, ) -> dict: """Execute claude CLI as subprocess. Returns dict with output, returncode, etc.""" claude_cmd = _resolve_claude_cmd() cmd = [ claude_cmd, "-p", prompt, "--output-format", "json", "--model", model, ] if allow_write: cmd.append("--dangerously-skip-permissions") is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1" if timeout is None: timeout = int(os.environ.get("KIN_AGENT_TIMEOUT") or 600) env = _build_claude_env() try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, cwd=working_dir, env=env, stdin=subprocess.DEVNULL if is_noninteractive else None, ) except FileNotFoundError: return { "output": "", "error": "claude CLI not found in PATH", "returncode": 127, } except subprocess.TimeoutExpired: return { "output": "", "error": f"Agent timed out after {timeout}s", "returncode": 124, } # Always preserve the full raw stdout raw_stdout = proc.stdout or "" result: dict[str, Any] = { "output": raw_stdout, "error": proc.stderr or None, # preserve stderr always for diagnostics "empty_output": not raw_stdout.strip(), "returncode": proc.returncode, } # Parse JSON wrapper from claude --output-format json # Extract metadata (tokens, cost) but keep output as the full content string parsed = _try_parse_json(raw_stdout) if isinstance(parsed, dict): result["tokens_used"] = parsed.get("usage", {}).get("total_tokens") result["cost_usd"] = parsed.get("cost_usd") # Extract the agent's actual response, converting to string if needed content = parsed.get("result") or parsed.get("content") if content is not None: result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) return result def _try_parse_json(text: str) -> Any: """Try to parse JSON from text. Returns parsed obj or None.""" text = text.strip() if not text: return None # Direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Try to find JSON block in markdown code fences import re m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL) if m: try: return json.loads(m.group(1)) except json.JSONDecodeError: pass # Try to find first { ... } or [ ... ] for start_char, end_char in [("{", "}"), ("[", "]")]: start = text.find(start_char) if start >= 0: # Find matching close depth = 0 for i in range(start, len(text)): if text[i] == start_char: depth += 1 elif text[i] == end_char: depth -= 1 if depth == 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: break return None # --------------------------------------------------------------------------- # Backlog audit # --------------------------------------------------------------------------- PROMPTS_DIR = Path(__file__).parent / "prompts" _LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"} def run_audit( conn: sqlite3.Connection, project_id: str, noninteractive: bool = False, auto_apply: bool = False, ) -> dict: """Audit pending tasks against the actual codebase. auto_apply=True: marks already_done tasks as done in DB. auto_apply=False: returns results only (for API/GUI). Returns {success, already_done, still_pending, unclear, duration_seconds, ...} """ project = models.get_project(conn, project_id) if not project: return {"success": False, "error": f"Project '{project_id}' not found"} pending = models.list_tasks(conn, project_id=project_id, status="pending") if not pending: return { "success": True, "already_done": [], "still_pending": [], "unclear": [], "message": "No pending tasks to audit", } # Build prompt prompt_path = PROMPTS_DIR / "backlog_audit.md" template = prompt_path.read_text() if prompt_path.exists() else ( "You are a QA analyst. Check if pending tasks are already done in the code." ) task_list = [ {"id": t["id"], "title": t["title"], "brief": t.get("brief")} for t in pending ] sections = [ template, "", f"## Project: {project['id']} — {project['name']}", ] if project.get("tech_stack"): sections.append(f"Tech stack: {', '.join(project['tech_stack'])}") sections.append(f"Path: {project['path']}") sections.append("") sections.append(f"## Pending tasks ({len(task_list)}):") sections.append(json.dumps(task_list, ensure_ascii=False, indent=2)) sections.append("") language = project.get("language", "ru") lang_name = _LANG_NAMES.get(language, language) sections.append("## Language") sections.append(f"ALWAYS respond in {lang_name}.") sections.append("") prompt = "\n".join(sections) # Determine working dir working_dir = None project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run agent — allow_write=True so claude can use Read/Bash tools # without interactive permission prompts (critical for noninteractive mode) start = time.monotonic() result = _run_claude(prompt, model="sonnet", working_dir=working_dir, allow_write=True, noninteractive=noninteractive) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=None, agent_role="backlog_audit", action="audit", input_summary=f"project={project_id}, pending_tasks={len(pending)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) if not success: return { "success": False, "error": result.get("error", "Agent failed"), "raw_output": raw_output, "duration_seconds": duration, } # Parse structured output parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return { "success": False, "error": "Agent returned non-JSON output", "raw_output": raw_output, "duration_seconds": duration, } already_done = parsed.get("already_done", []) # Auto-apply: mark already_done tasks as done in DB applied = [] if auto_apply and already_done: for item in already_done: tid = item.get("id") if tid: t = models.get_task(conn, tid) if t and t["project_id"] == project_id and t["status"] == "pending": models.update_task(conn, tid, status="done") applied.append(tid) return { "success": True, "already_done": already_done, "still_pending": parsed.get("still_pending", []), "unclear": parsed.get("unclear", []), "applied": applied, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } # --------------------------------------------------------------------------- # Blocked protocol detection # --------------------------------------------------------------------------- def _parse_agent_blocked(result: dict) -> dict | None: """Detect semantic blocked status from a successful agent result. Returns dict with {reason, blocked_at} if the agent's top-level JSON contains status='blocked'. Returns None otherwise. Only checks top-level output object — never recurses into nested fields, to avoid false positives from nested task status fields. """ from datetime import datetime if not result.get("success"): return None output = result.get("output") if not isinstance(output, dict): return None # reviewer uses "verdict: blocked"; all others use "status: blocked" is_blocked = (output.get("status") == "blocked" or output.get("verdict") == "blocked") if not is_blocked: return None return { "reason": output.get("reason") or output.get("blocked_reason") or "", "blocked_at": output.get("blocked_at") or datetime.now().isoformat(), } # --------------------------------------------------------------------------- # Permission error detection # --------------------------------------------------------------------------- def _is_permission_error(result: dict) -> bool: """Return True if agent result indicates a permission/write failure.""" from core.followup import PERMISSION_PATTERNS output = (result.get("raw_output") or result.get("output") or "") if not isinstance(output, str): output = json.dumps(output, ensure_ascii=False) error = result.get("error") or "" text = output + " " + error return any(re.search(p, text) for p in PERMISSION_PATTERNS) # --------------------------------------------------------------------------- # Autocommit: git add -A && git commit after successful pipeline # --------------------------------------------------------------------------- def _get_changed_files(project_path: str) -> list[str]: """Return files changed in the current pipeline run. Combines unstaged changes, staged changes, and the last commit diff to cover both autocommit-on and autocommit-off scenarios. Returns [] on any git error (e.g. no git repo, first commit). """ env = _build_claude_env() git_cmd = shutil.which("git", path=env["PATH"]) or "git" files: set[str] = set() for git_args in ( ["diff", "--name-only"], # unstaged tracked changes ["diff", "--cached", "--name-only"], # staged changes ["diff", "HEAD~1", "HEAD", "--name-only"], # last commit (post-autocommit) ): try: r = subprocess.run( [git_cmd] + git_args, cwd=project_path, capture_output=True, text=True, timeout=10, env=env, ) if r.returncode == 0: files.update(f.strip() for f in r.stdout.splitlines() if f.strip()) except Exception: pass return list(files) def _run_autocommit( conn: sqlite3.Connection, task_id: str, project_id: str, ) -> None: """Auto-commit changes after successful pipeline completion. Runs: git add -A && git commit -m 'kin: {task_id} {title}'. Silently skips if nothing to commit (exit code 1) or project path not found. Never raises — autocommit errors must never block the pipeline. Uses stderr=subprocess.DEVNULL per decision #30. """ task = models.get_task(conn, task_id) project = models.get_project(conn, project_id) if not task or not project: return if not project.get("autocommit_enabled"): return project_path = Path(project["path"]).expanduser() if not project_path.is_dir(): return working_dir = str(project_path) env = _build_claude_env() git_cmd = shutil.which("git", path=env["PATH"]) or "git" title = (task.get("title") or "").replace('"', "'").replace("\n", " ").replace("\r", "") commit_msg = f"kin: {task_id} {title}" try: subprocess.run( [git_cmd, "add", "-A"], cwd=working_dir, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, ) result = subprocess.run( [git_cmd, "commit", "-m", commit_msg], cwd=working_dir, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, ) if result.returncode == 0: _logger.info("Autocommit: %s", commit_msg) else: _logger.debug("Autocommit: nothing to commit for %s", task_id) except Exception as exc: _logger.warning("Autocommit failed for %s: %s", task_id, exc) # --------------------------------------------------------------------------- # Sysadmin output: save server map to decisions and modules # --------------------------------------------------------------------------- def _save_sysadmin_output( conn: sqlite3.Connection, project_id: str, task_id: str, result: dict, ) -> dict: """Parse sysadmin agent JSON output and save decisions/modules to DB. Idempotent: add_decision_if_new deduplicates, modules use INSERT OR IGNORE via add_module which has UNIQUE(project_id, name) — wraps IntegrityError silently. Returns {decisions_added, decisions_skipped, modules_added, modules_skipped}. """ raw = result.get("raw_output") or result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return {"decisions_added": 0, "decisions_skipped": 0, "modules_added": 0, "modules_skipped": 0} decisions_added = 0 decisions_skipped = 0 for item in (parsed.get("decisions") or []): if not isinstance(item, dict): continue d_type = item.get("type", "decision") if d_type not in VALID_DECISION_TYPES: d_type = "decision" d_title = (item.get("title") or "").strip() d_desc = (item.get("description") or "").strip() if not d_title or not d_desc: continue saved = models.add_decision_if_new( conn, project_id=project_id, type=d_type, title=d_title, description=d_desc, tags=item.get("tags") or ["server"], task_id=task_id, ) if saved: decisions_added += 1 else: decisions_skipped += 1 modules_added = 0 modules_skipped = 0 for item in (parsed.get("modules") or []): if not isinstance(item, dict): continue m_name = (item.get("name") or "").strip() m_type = (item.get("type") or "service").strip() m_path = (item.get("path") or "").strip() if not m_name: continue try: m = models.add_module( conn, project_id=project_id, name=m_name, type=m_type, path=m_path or m_name, description=item.get("description"), owner_role="sysadmin", ) if m.get("_created", True): modules_added += 1 else: modules_skipped += 1 except Exception: modules_skipped += 1 return { "decisions_added": decisions_added, "decisions_skipped": decisions_skipped, "modules_added": modules_added, "modules_skipped": modules_skipped, } # --------------------------------------------------------------------------- # Auto-learning: extract decisions from pipeline results # --------------------------------------------------------------------------- VALID_DECISION_TYPES = {"decision", "gotcha", "convention"} def _run_learning_extraction( conn: sqlite3.Connection, task_id: str, project_id: str, step_results: list[dict], ) -> dict: """Extract and save decisions from completed pipeline results. Calls the learner agent with step outputs + existing decisions, parses the JSON response, and saves new decisions via add_decision_if_new. Returns a summary dict with added/skipped counts. """ learner_prompt_path = PROMPTS_DIR / "learner.md" if not learner_prompt_path.exists(): return {"added": 0, "skipped": 0, "error": "learner.md not found"} template = learner_prompt_path.read_text() # Summarize step outputs (first 2000 chars each) step_summaries = {} for r in step_results: role = r.get("role", "unknown") output = r.get("raw_output") or r.get("output") or "" if isinstance(output, (dict, list)): output = json.dumps(output, ensure_ascii=False) step_summaries[role] = output[:2000] # Fetch existing decisions for dedup hint existing = models.get_decisions(conn, project_id) existing_hints = [ {"title": d["title"], "type": d["type"]} for d in existing ] prompt_parts = [ template, "", "## PIPELINE_OUTPUTS", json.dumps(step_summaries, ensure_ascii=False, indent=2), "", "## EXISTING_DECISIONS", json.dumps(existing_hints, ensure_ascii=False, indent=2), ] prompt = "\n".join(prompt_parts) learner_timeout = int(os.environ.get("KIN_LEARNER_TIMEOUT") or 120) start = time.monotonic() result = _run_claude(prompt, model="sonnet", noninteractive=True, timeout=learner_timeout) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role="learner", action="learn", input_summary=f"project={project_id}, task={task_id}, steps={len(step_results)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return {"added": 0, "skipped": 0, "error": "non-JSON learner output"} decisions = parsed.get("decisions", []) if not isinstance(decisions, list): return {"added": 0, "skipped": 0, "error": "invalid decisions format"} added = 0 skipped = 0 for item in decisions[:5]: if not isinstance(item, dict): continue d_type = item.get("type", "decision") if d_type not in VALID_DECISION_TYPES: d_type = "decision" d_title = (item.get("title") or "").strip() d_desc = (item.get("description") or "").strip() if not d_title or not d_desc: continue saved = models.add_decision_if_new( conn, project_id=project_id, type=d_type, title=d_title, description=d_desc, tags=item.get("tags") or [], task_id=task_id, ) if saved: added += 1 else: skipped += 1 return {"added": added, "skipped": skipped} # --------------------------------------------------------------------------- # Pipeline executor # --------------------------------------------------------------------------- def run_pipeline( conn: sqlite3.Connection, task_id: str, steps: list[dict], dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, ) -> dict: """Execute a multi-step pipeline of agents. steps = [ {"role": "debugger", "model": "opus", "brief": "..."}, {"role": "tester", "depends_on": "debugger", "brief": "..."}, ] Returns {success, steps_completed, total_cost, total_tokens, total_duration, results} """ # Auth check — skip for dry_run (dry_run never calls claude CLI) if not dry_run: try: check_claude_auth() except ClaudeAuthError as exc: return { "success": False, "error": "claude_auth_required", "message": str(exc), "instructions": "Run: claude login", } task = models.get_task(conn, task_id) if not task: return {"success": False, "error": f"Task '{task_id}' not found"} project_id = task["project_id"] # Determine route type from steps or task brief route_type = "custom" if task.get("brief") and isinstance(task["brief"], dict): route_type = task["brief"].get("route_type", "custom") or "custom" # Determine execution mode (auto vs review) mode = models.get_effective_mode(conn, project_id, task_id) # Create pipeline in DB pipeline = None if not dry_run: pipeline = models.create_pipeline( conn, task_id, project_id, route_type, steps, ) models.update_task(conn, task_id, status="in_progress") results = [] total_cost = 0.0 total_tokens = 0 total_duration = 0 previous_output = None for i, step in enumerate(steps): role = step["role"] model = step.get("model", "sonnet") brief = step.get("brief") try: result = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=dry_run, allow_write=allow_write, noninteractive=noninteractive, ) except Exception as exc: exc_msg = f"Step {i+1}/{len(steps)} ({role}) raised exception: {exc}" if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=None, success=False, error_message=exc_msg, ) models.update_task(conn, task_id, status="blocked", blocked_reason=exc_msg) try: from core.telegram import send_telegram_escalation project = models.get_project(conn, project_id) project_name = project["name"] if project else project_id sent = send_telegram_escalation( task_id=task_id, project_name=project_name, agent_role=role, reason=exc_msg, pipeline_step=str(i + 1), ) if sent: models.mark_telegram_sent(conn, task_id) except Exception: pass # Telegram errors must never block pipeline return { "success": False, "error": exc_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } if dry_run: results.append(result) continue # Accumulate stats total_cost += result.get("cost_usd") or 0 total_tokens += result.get("tokens_used") or 0 total_duration += result.get("duration_seconds") or 0 if not result["success"]: # Auto mode: retry once with allow_write on permission error if mode == "auto_complete" and not allow_write and _is_permission_error(result): task_modules = models.get_modules(conn, project_id) try: run_hooks(conn, project_id, task_id, event="task_permission_retry", task_modules=task_modules) except Exception: pass # Audit log: record dangerous skip before retry try: models.log_audit_event( conn, event_type="dangerous_skip", task_id=task_id, step_id=role, reason=f"auto mode permission retry: step {i+1}/{len(steps)} ({role})", project_id=project_id, ) models.update_task(conn, task_id, dangerously_skipped=1) except Exception: pass retry = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=False, allow_write=True, noninteractive=noninteractive, ) allow_write = True # subsequent steps also with allow_write total_cost += retry.get("cost_usd") or 0 total_tokens += retry.get("tokens_used") or 0 total_duration += retry.get("duration_seconds") or 0 if retry["success"]: result = retry if not result["success"]: # Still failed — block regardless of mode results.append(result) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) agent_error = result.get("error") or "" error_msg = f"Step {i+1}/{len(steps)} ({role}) failed" if agent_error: error_msg += f": {agent_error}" models.update_task(conn, task_id, status="blocked", blocked_reason=error_msg) return { "success": False, "error": error_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } results.append(result) # Semantic blocked: agent ran successfully but returned status='blocked' blocked_info = _parse_agent_blocked(result) if blocked_info: if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) models.update_task( conn, task_id, status="blocked", blocked_reason=blocked_info["reason"], blocked_at=blocked_info["blocked_at"], blocked_agent_role=role, blocked_pipeline_step=str(i + 1), ) try: from core.telegram import send_telegram_escalation project = models.get_project(conn, project_id) project_name = project["name"] if project else project_id sent = send_telegram_escalation( task_id=task_id, project_name=project_name, agent_role=role, reason=blocked_info["reason"], pipeline_step=str(i + 1), ) if sent: models.mark_telegram_sent(conn, task_id) except Exception: pass # Telegram errors must never block pipeline error_msg = f"Step {i+1}/{len(steps)} ({role}) blocked: {blocked_info['reason']}" return { "success": False, "error": error_msg, "blocked_by": role, "blocked_reason": blocked_info["reason"], "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Save sysadmin scan results immediately after a successful sysadmin step if role == "sysadmin" and result["success"] and not dry_run: try: _save_sysadmin_output(conn, project_id, task_id, result) except Exception: pass # Never block pipeline on sysadmin save errors # Chain output to next step previous_output = result.get("raw_output") or result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) # Pipeline completed if pipeline and not dry_run: models.update_pipeline( conn, pipeline["id"], status="completed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) task_modules = models.get_modules(conn, project_id) # Compute changed files for hook filtering (frontend build trigger) changed_files: list[str] | None = None project = models.get_project(conn, project_id) if project and project.get("path"): p_path = Path(project["path"]).expanduser() if p_path.is_dir(): changed_files = _get_changed_files(str(p_path)) last_role = steps[-1].get("role", "") if steps else "" auto_eligible = last_role in {"tester", "reviewer"} # Guard: re-fetch current status — user may have manually changed it while pipeline ran current_task = models.get_task(conn, task_id) current_status = current_task.get("status") if current_task else None if current_status in ("done", "cancelled"): pass # User finished manually — don't overwrite elif mode == "auto_complete" and auto_eligible: # Auto-complete mode: last step is tester/reviewer — skip review, approve immediately models.update_task(conn, task_id, status="done") try: run_hooks(conn, project_id, task_id, event="task_auto_approved", task_modules=task_modules) except Exception: pass try: run_hooks(conn, project_id, task_id, event="task_done", task_modules=task_modules) except Exception: pass # Auto followup: generate tasks, auto-resolve permission issues. # Guard: skip for followup-sourced tasks to prevent infinite recursion. task_brief = task.get("brief") or {} is_followup_task = ( isinstance(task_brief, dict) and str(task_brief.get("source", "")).startswith("followup:") ) if not is_followup_task: try: from core.followup import generate_followups, auto_resolve_pending_actions fu_result = generate_followups(conn, task_id) if fu_result.get("pending_actions"): auto_resolve_pending_actions(conn, task_id, fu_result["pending_actions"]) except Exception: pass else: # Review mode: wait for manual approval (don't overwrite execution_mode) models.update_task(conn, task_id, status="review") # Run post-pipeline hooks (failures don't affect pipeline status) try: run_hooks(conn, project_id, task_id, event="pipeline_completed", task_modules=task_modules, changed_files=changed_files) except Exception: pass # Hook errors must never block pipeline completion # Auto-learning: extract decisions from pipeline results if results: try: _run_learning_extraction(conn, task_id, project_id, results) except Exception: pass # Learning errors must never block pipeline completion # Auto-commit changes after successful pipeline try: _run_autocommit(conn, task_id, project_id) except Exception: pass # Autocommit errors must never block pipeline completion return { "success": True, "steps_completed": len(steps), "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, "dry_run": dry_run, "mode": mode, }