""" Kin agent runner — launches Claude Code as subprocess with role-specific context. Each agent = separate process with isolated context. """ import json import logging import os import shlex import shutil import sqlite3 import subprocess import time from pathlib import Path from typing import Any import re _logger = logging.getLogger("kin.runner") # Extra PATH entries to inject when searching for claude CLI. # launchctl daemons start with a stripped PATH that may omit these. _EXTRA_PATH_DIRS = [ "/opt/homebrew/bin", "/opt/homebrew/sbin", "/usr/local/bin", "/usr/local/sbin", ] # Default timeouts per model (seconds). Override globally with KIN_AGENT_TIMEOUT # or per role via timeout_seconds in specialists.yaml. _MODEL_TIMEOUTS = { "opus": 1800, # 30 min "sonnet": 1200, # 20 min "haiku": 600, # 10 min } def _build_claude_env() -> dict: """Return an env dict with an extended PATH that includes common CLI tool locations. Merges _EXTRA_PATH_DIRS with the current process PATH, deduplicating entries. Also resolves ~/.nvm/versions/node/*/bin globs that launchctl may not expand. """ env = os.environ.copy() existing = env.get("PATH", "").split(":") extra = list(_EXTRA_PATH_DIRS) # Expand nvm node bin dirs dynamically nvm_root = Path.home() / ".nvm" / "versions" / "node" if nvm_root.is_dir(): for node_ver in sorted(nvm_root.iterdir(), reverse=True): bin_dir = node_ver / "bin" if bin_dir.is_dir(): extra.append(str(bin_dir)) seen: set[str] = set() deduped: list[str] = [] for d in extra + existing: if d and d not in seen: seen.add(d) deduped.append(d) env["PATH"] = ":".join(deduped) # Ensure SSH agent is available for agents that connect via SSH. # Under launchd, SSH_AUTH_SOCK is not inherited — detect macOS system socket. if "SSH_AUTH_SOCK" not in env: import glob socks = glob.glob("/private/tmp/com.apple.launchd.*/Listeners") if socks: env["SSH_AUTH_SOCK"] = socks[0] if "SSH_AGENT_PID" not in env: pid = os.environ.get("SSH_AGENT_PID") if pid: env["SSH_AGENT_PID"] = pid return env def _resolve_claude_cmd() -> str: """Return the full path to the claude CLI, or 'claude' as fallback.""" extended_env = _build_claude_env() found = shutil.which("claude", path=extended_env["PATH"]) return found or "claude" from core import models from core.context_builder import build_context, format_prompt from core.hooks import run_hooks class ClaudeAuthError(Exception): """Raised when Claude CLI is not authenticated or not available.""" pass def check_claude_auth(timeout: int = 10) -> None: """Check that claude CLI is authenticated before running a pipeline. Runs: claude -p 'ok' --output-format json with timeout. Returns None if auth is confirmed. Raises ClaudeAuthError if: - claude CLI not found in PATH (FileNotFoundError) - stdout/stderr contains 'not logged in' (case-insensitive) - returncode != 0 - is_error=true in parsed JSON output Returns silently on TimeoutExpired (ambiguous — don't block pipeline). """ claude_cmd = _resolve_claude_cmd() env = _build_claude_env() try: proc = subprocess.run( [claude_cmd, "-p", "ok", "--output-format", "json"], capture_output=True, text=True, timeout=timeout, env=env, stdin=subprocess.DEVNULL, ) except FileNotFoundError: raise ClaudeAuthError("claude CLI not found in PATH. Install it or add to PATH.") except subprocess.TimeoutExpired: return # Ambiguous — don't block pipeline on timeout stdout = proc.stdout or "" stderr = proc.stderr or "" combined = stdout + stderr if "not logged in" in combined.lower(): raise ClaudeAuthError("Claude CLI requires login. Run: claude login") if proc.returncode != 0: raise ClaudeAuthError("Claude CLI requires login. Run: claude login") parsed = _try_parse_json(stdout) if isinstance(parsed, dict) and parsed.get("is_error"): raise ClaudeAuthError("Claude CLI requires login. Run: claude login") def run_agent( conn: sqlite3.Connection, role: str, task_id: str, project_id: str, model: str = "sonnet", previous_output: str | None = None, brief_override: str | None = None, dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, working_dir_override: str | None = None, ) -> dict: """Run a single Claude Code agent as a subprocess. 1. Build context from DB 2. Format prompt with role template 3. Run: claude -p "{prompt}" --output-format json 4. Log result to agent_logs 5. Return {success, output, tokens_used, duration_seconds, cost_usd} """ # Build context ctx = build_context(conn, task_id, role, project_id) if previous_output: ctx["previous_output"] = previous_output if brief_override: if ctx.get("task"): ctx["task"]["brief"] = brief_override prompt = format_prompt(ctx, role) if dry_run: return { "success": True, "output": None, "prompt": prompt, "role": role, "model": model, "dry_run": True, } # Determine working directory project = models.get_project(conn, project_id) working_dir = None # Operations projects have no local path — sysadmin works via SSH is_operations = project and project.get("project_type") == "operations" if working_dir_override: working_dir = working_dir_override elif not is_operations and project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security", "constitution", "spec", "task_decomposer"): project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Determine timeout: role-specific (specialists.yaml) > model-based > default role_timeout = None try: from core.context_builder import _load_specialists specs = _load_specialists().get("specialists", {}) role_spec = specs.get(role, {}) if role_spec.get("timeout_seconds"): role_timeout = int(role_spec["timeout_seconds"]) except Exception: pass # Run claude subprocess start = time.monotonic() result = _run_claude(prompt, model=model, working_dir=working_dir, allow_write=allow_write, noninteractive=noninteractive, timeout=role_timeout) duration = int(time.monotonic() - start) # Parse output — ensure output_text is always a string for DB storage raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) output_text = raw_output success = result["returncode"] == 0 parsed_output = _try_parse_json(output_text) # Log FULL output to DB (no truncation) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=output_text or None, tokens_used=result.get("tokens_used"), model=model, cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) return { "success": success, "error": result.get("error") if not success else None, "output": parsed_output if parsed_output else output_text, "raw_output": output_text, "role": role, "model": model, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } def _run_claude( prompt: str, model: str = "sonnet", working_dir: str | None = None, allow_write: bool = False, noninteractive: bool = False, timeout: int | None = None, ) -> dict: """Execute claude CLI as subprocess. Returns dict with output, returncode, etc.""" claude_cmd = _resolve_claude_cmd() cmd = [ claude_cmd, "-p", prompt, "--output-format", "json", "--model", model, ] if allow_write: cmd.append("--dangerously-skip-permissions") is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1" if timeout is None: env_timeout = os.environ.get("KIN_AGENT_TIMEOUT") if env_timeout: timeout = int(env_timeout) else: timeout = _MODEL_TIMEOUTS.get(model, _MODEL_TIMEOUTS["sonnet"]) env = _build_claude_env() try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, cwd=working_dir, env=env, stdin=subprocess.DEVNULL if is_noninteractive else None, ) except FileNotFoundError: return { "output": "", "error": "claude CLI not found in PATH", "returncode": 127, } except subprocess.TimeoutExpired: return { "output": "", "error": f"Agent timed out after {timeout}s", "returncode": 124, } # Always preserve the full raw stdout raw_stdout = proc.stdout or "" result: dict[str, Any] = { "output": raw_stdout, "error": proc.stderr or None, # preserve stderr always for diagnostics "empty_output": not raw_stdout.strip(), "returncode": proc.returncode, } # Parse JSON wrapper from claude --output-format json # Extract metadata (tokens, cost) but keep output as the full content string parsed = _try_parse_json(raw_stdout) if isinstance(parsed, dict): result["tokens_used"] = parsed.get("usage", {}).get("total_tokens") result["cost_usd"] = parsed.get("cost_usd") # Extract the agent's actual response, converting to string if needed content = parsed.get("result") or parsed.get("content") if content is not None: result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) return result def _try_parse_json(text: str) -> Any: """Try to parse JSON from text. Returns parsed obj or None.""" text = text.strip() if not text: return None # Direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Try to find JSON block in markdown code fences import re m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL) if m: try: return json.loads(m.group(1)) except json.JSONDecodeError: pass # Try to find first { ... } or [ ... ] for start_char, end_char in [("{", "}"), ("[", "]")]: start = text.find(start_char) if start >= 0: # Find matching close depth = 0 for i in range(start, len(text)): if text[i] == start_char: depth += 1 elif text[i] == end_char: depth -= 1 if depth == 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: break return None # --------------------------------------------------------------------------- # Backlog audit # --------------------------------------------------------------------------- PROMPTS_DIR = Path(__file__).parent / "prompts" _LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"} def run_audit( conn: sqlite3.Connection, project_id: str, noninteractive: bool = False, auto_apply: bool = False, ) -> dict: """Audit pending tasks against the actual codebase. auto_apply=True: marks already_done tasks as done in DB. auto_apply=False: returns results only (for API/GUI). Returns {success, already_done, still_pending, unclear, duration_seconds, ...} """ project = models.get_project(conn, project_id) if not project: return {"success": False, "error": f"Project '{project_id}' not found"} pending = models.list_tasks(conn, project_id=project_id, status="pending") if not pending: return { "success": True, "already_done": [], "still_pending": [], "unclear": [], "message": "No pending tasks to audit", } # Build prompt prompt_path = PROMPTS_DIR / "backlog_audit.md" template = prompt_path.read_text() if prompt_path.exists() else ( "You are a QA analyst. Check if pending tasks are already done in the code." ) task_list = [ {"id": t["id"], "title": t["title"], "brief": t.get("brief")} for t in pending ] sections = [ template, "", f"## Project: {project['id']} — {project['name']}", ] if project.get("tech_stack"): sections.append(f"Tech stack: {', '.join(project['tech_stack'])}") sections.append(f"Path: {project['path']}") sections.append("") sections.append(f"## Pending tasks ({len(task_list)}):") sections.append(json.dumps(task_list, ensure_ascii=False, indent=2)) sections.append("") language = project.get("language", "ru") lang_name = _LANG_NAMES.get(language, language) sections.append("## Language") sections.append(f"ALWAYS respond in {lang_name}.") sections.append("") prompt = "\n".join(sections) # Determine working dir working_dir = None project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run agent — allow_write=True so claude can use Read/Bash tools # without interactive permission prompts (critical for noninteractive mode) start = time.monotonic() result = _run_claude(prompt, model="sonnet", working_dir=working_dir, allow_write=True, noninteractive=noninteractive) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=None, agent_role="backlog_audit", action="audit", input_summary=f"project={project_id}, pending_tasks={len(pending)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) if not success: return { "success": False, "error": result.get("error", "Agent failed"), "raw_output": raw_output, "duration_seconds": duration, } # Parse structured output parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return { "success": False, "error": "Agent returned non-JSON output", "raw_output": raw_output, "duration_seconds": duration, } already_done = parsed.get("already_done", []) # Auto-apply: mark already_done tasks as done in DB applied = [] if auto_apply and already_done: for item in already_done: tid = item.get("id") if tid: t = models.get_task(conn, tid) if t and t["project_id"] == project_id and t["status"] == "pending": models.update_task(conn, tid, status="done") applied.append(tid) return { "success": True, "already_done": already_done, "still_pending": parsed.get("still_pending", []), "unclear": parsed.get("unclear", []), "applied": applied, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } # --------------------------------------------------------------------------- # Blocked protocol detection # --------------------------------------------------------------------------- def _parse_agent_blocked(result: dict) -> dict | None: """Detect semantic blocked status from a successful agent result. Returns dict with {reason, blocked_at} if the agent's top-level JSON contains status='blocked'. Returns None otherwise. Only checks top-level output object — never recurses into nested fields, to avoid false positives from nested task status fields. """ from datetime import datetime if not result.get("success"): return None output = result.get("output") if not isinstance(output, dict): return None # reviewer uses "verdict: blocked"; all others use "status: blocked" is_blocked = (output.get("status") == "blocked" or output.get("verdict") == "blocked") if not is_blocked: return None return { "reason": output.get("reason") or output.get("blocked_reason") or "", "blocked_at": output.get("blocked_at") or datetime.now().isoformat(), } # --------------------------------------------------------------------------- # Permission error detection # --------------------------------------------------------------------------- def _is_permission_error(result: dict) -> bool: """Return True if agent result indicates a permission/write failure.""" from core.followup import PERMISSION_PATTERNS output = (result.get("raw_output") or result.get("output") or "") if not isinstance(output, str): output = json.dumps(output, ensure_ascii=False) error = result.get("error") or "" text = output + " " + error return any(re.search(p, text) for p in PERMISSION_PATTERNS) # --------------------------------------------------------------------------- # Autocommit: git add -A && git commit after successful pipeline # --------------------------------------------------------------------------- def _get_changed_files(project_path: str) -> list[str]: """Return files changed in the current pipeline run. Combines unstaged changes, staged changes, and the last commit diff to cover both autocommit-on and autocommit-off scenarios. Returns [] on any git error (e.g. no git repo, first commit). """ env = _build_claude_env() git_cmd = shutil.which("git", path=env["PATH"]) or "git" files: set[str] = set() for git_args in ( ["diff", "--name-only"], # unstaged tracked changes ["diff", "--cached", "--name-only"], # staged changes ["diff", "HEAD~1", "HEAD", "--name-only"], # last commit (post-autocommit) ): try: r = subprocess.run( [git_cmd] + git_args, cwd=project_path, capture_output=True, text=True, timeout=10, env=env, ) if r.returncode == 0: files.update(f.strip() for f in r.stdout.splitlines() if f.strip()) except Exception: pass return list(files) def _run_autocommit( conn: sqlite3.Connection, task_id: str, project_id: str, ) -> None: """Auto-commit changes after successful pipeline completion. Runs: git add -A && git commit -m 'kin: {task_id} {title}'. Silently skips if nothing to commit (exit code 1) or project path not found. Never raises — autocommit errors must never block the pipeline. Uses stderr=subprocess.DEVNULL per decision #30. """ task = models.get_task(conn, task_id) project = models.get_project(conn, project_id) if not task or not project: return if not project.get("autocommit_enabled"): return project_path = Path(project["path"]).expanduser() if not project_path.is_dir(): return working_dir = str(project_path) env = _build_claude_env() git_cmd = shutil.which("git", path=env["PATH"]) or "git" title = (task.get("title") or "").replace('"', "'").replace("\n", " ").replace("\r", "") commit_msg = f"kin: {task_id} {title}" try: subprocess.run( [git_cmd, "add", "-A"], cwd=working_dir, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, ) result = subprocess.run( [git_cmd, "commit", "-m", commit_msg], cwd=working_dir, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, ) if result.returncode == 0: _logger.info("Autocommit: %s", commit_msg) else: _logger.debug("Autocommit: nothing to commit for %s", task_id) except Exception as exc: _logger.warning("Autocommit failed for %s: %s", task_id, exc) # --------------------------------------------------------------------------- # Sysadmin output: save server map to decisions and modules # --------------------------------------------------------------------------- def _save_sysadmin_output( conn: sqlite3.Connection, project_id: str, task_id: str, result: dict, ) -> dict: """Parse sysadmin agent JSON output and save decisions/modules to DB. Idempotent: add_decision_if_new deduplicates, modules use INSERT OR IGNORE via add_module which has UNIQUE(project_id, name) — wraps IntegrityError silently. Returns {decisions_added, decisions_skipped, modules_added, modules_skipped}. """ raw = result.get("raw_output") or result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return {"decisions_added": 0, "decisions_skipped": 0, "modules_added": 0, "modules_skipped": 0} decisions_added = 0 decisions_skipped = 0 for item in (parsed.get("decisions") or []): if not isinstance(item, dict): continue d_type = item.get("type", "decision") if d_type not in VALID_DECISION_TYPES: d_type = "decision" d_title = (item.get("title") or "").strip() d_desc = (item.get("description") or "").strip() if not d_title or not d_desc: continue saved = models.add_decision_if_new( conn, project_id=project_id, type=d_type, title=d_title, description=d_desc, tags=item.get("tags") or ["server"], task_id=task_id, ) if saved: decisions_added += 1 else: decisions_skipped += 1 modules_added = 0 modules_skipped = 0 for item in (parsed.get("modules") or []): if not isinstance(item, dict): continue m_name = (item.get("name") or "").strip() m_type = (item.get("type") or "service").strip() m_path = (item.get("path") or "").strip() if not m_name: continue try: m = models.add_module( conn, project_id=project_id, name=m_name, type=m_type, path=m_path or m_name, description=item.get("description"), owner_role="sysadmin", ) if m.get("_created", True): modules_added += 1 else: modules_skipped += 1 except Exception: modules_skipped += 1 return { "decisions_added": decisions_added, "decisions_skipped": decisions_skipped, "modules_added": modules_added, "modules_skipped": modules_skipped, } # --------------------------------------------------------------------------- # Auto-test: detect test failure in agent output # --------------------------------------------------------------------------- _TEST_FAILURE_PATTERNS = [ r"\bFAILED\b", r"\bFAIL\b", r"\d+\s+failed", r"test(?:s)?\s+failed", r"assert(?:ion)?\s*(error|failed)", r"exception(?:s)?\s+occurred", r"returncode\s*[!=]=\s*0", r"Error:\s", r"ERRORS?\b", ] _TEST_SUCCESS_PATTERNS = [ r"no\s+failures", r"all\s+tests?\s+pass", r"0\s+failed", r"passed.*no\s+errors", ] def _is_test_failure(result: dict) -> bool: """Return True if agent output indicates test failures. Checks for failure keywords, guards against false positives from explicit success phrases (e.g. 'no failures'). """ output = result.get("raw_output") or result.get("output") or "" if not isinstance(output, str): output = json.dumps(output, ensure_ascii=False) for p in _TEST_SUCCESS_PATTERNS: if re.search(p, output, re.IGNORECASE): return False for p in _TEST_FAILURE_PATTERNS: if re.search(p, output, re.IGNORECASE): return True return False # --------------------------------------------------------------------------- # Auto-test runner: run project tests via `make test` # --------------------------------------------------------------------------- # Roles that trigger auto-test when project.auto_test_enabled is set _AUTO_TEST_ROLES = {"backend_dev", "frontend_dev"} def _run_project_tests(project_path: str, test_command: str = 'make test', timeout: int = 120) -> dict: """Run test_command in project_path. Returns {success, output, returncode}. Never raises — all errors are captured and returned in output. """ env = _build_claude_env() parts = shlex.split(test_command) if not parts: return {"success": False, "output": "Empty test_command", "returncode": -1} resolved = shutil.which(parts[0], path=env["PATH"]) or parts[0] cmd = [resolved] + parts[1:] try: result = subprocess.run( cmd, cwd=project_path, capture_output=True, text=True, timeout=timeout, env=env, ) output = (result.stdout or "") + (result.stderr or "") return {"success": result.returncode == 0, "output": output, "returncode": result.returncode} except subprocess.TimeoutExpired: return {"success": False, "output": f"{test_command} timed out after {timeout}s", "returncode": 124} except FileNotFoundError: return {"success": False, "output": f"{parts[0]} not found in PATH", "returncode": 127} except Exception as exc: return {"success": False, "output": f"Test run error: {exc}", "returncode": -1} # --------------------------------------------------------------------------- # Decomposer output: create child tasks from task_decomposer JSON # --------------------------------------------------------------------------- def _save_decomposer_output( conn: sqlite3.Connection, project_id: str, parent_task_id: str, result: dict, ) -> dict: """Parse task_decomposer output and create child tasks in DB. Expected output format: {tasks: [{title, brief, priority, category, acceptance_criteria}]} Idempotent: skips tasks with same parent_task_id + title (case-insensitive). Returns {created: int, skipped: int}. """ raw = result.get("raw_output") or result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return {"created": 0, "skipped": 0, "error": "non-JSON decomposer output"} task_list = parsed.get("tasks", []) if not isinstance(task_list, list): return {"created": 0, "skipped": 0, "error": "invalid tasks format"} created = 0 skipped = 0 for item in task_list: if not isinstance(item, dict): continue title = (item.get("title") or "").strip() if not title: continue # Idempotency: skip if same parent + title already exists existing = conn.execute( """SELECT id FROM tasks WHERE parent_task_id = ? AND lower(trim(title)) = lower(trim(?))""", (parent_task_id, title), ).fetchone() if existing: skipped += 1 _logger.info( "task_decomposer: skip duplicate child task '%s' (parent=%s, existing=%s)", title, parent_task_id, existing[0], ) continue category = (item.get("category") or "").strip().upper() if category not in models.TASK_CATEGORIES: category = None task_id = models.next_task_id(conn, project_id, category=category) brief_text = item.get("brief") or "" models.create_task( conn, task_id, project_id, title, priority=item.get("priority", 5), brief={"text": brief_text, "source": f"decomposer:{parent_task_id}"}, category=category, acceptance_criteria=item.get("acceptance_criteria"), parent_task_id=parent_task_id, ) created += 1 return {"created": created, "skipped": skipped} # --------------------------------------------------------------------------- # Auto-learning: extract decisions from pipeline results # --------------------------------------------------------------------------- VALID_DECISION_TYPES = {"decision", "gotcha", "convention"} def _run_learning_extraction( conn: sqlite3.Connection, task_id: str, project_id: str, step_results: list[dict], ) -> dict: """Extract and save decisions from completed pipeline results. Calls the learner agent with step outputs + existing decisions, parses the JSON response, and saves new decisions via add_decision_if_new. Returns a summary dict with added/skipped counts. """ learner_prompt_path = PROMPTS_DIR / "learner.md" if not learner_prompt_path.exists(): return {"added": 0, "skipped": 0, "error": "learner.md not found"} template = learner_prompt_path.read_text() # Summarize step outputs (first 2000 chars each) step_summaries = {} for r in step_results: role = r.get("role", "unknown") output = r.get("raw_output") or r.get("output") or "" if isinstance(output, (dict, list)): output = json.dumps(output, ensure_ascii=False) step_summaries[role] = output[:2000] # Fetch existing decisions for dedup hint existing = models.get_decisions(conn, project_id) existing_hints = [ {"title": d["title"], "type": d["type"]} for d in existing ] prompt_parts = [ template, "", "## PIPELINE_OUTPUTS", json.dumps(step_summaries, ensure_ascii=False, indent=2), "", "## EXISTING_DECISIONS", json.dumps(existing_hints, ensure_ascii=False, indent=2), ] prompt = "\n".join(prompt_parts) learner_timeout = int(os.environ.get("KIN_LEARNER_TIMEOUT") or 120) start = time.monotonic() result = _run_claude(prompt, model="sonnet", noninteractive=True, timeout=learner_timeout) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role="learner", action="learn", input_summary=f"project={project_id}, task={task_id}, steps={len(step_results)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return {"added": 0, "skipped": 0, "error": "non-JSON learner output"} decisions = parsed.get("decisions", []) if not isinstance(decisions, list): return {"added": 0, "skipped": 0, "error": "invalid decisions format"} added = 0 skipped = 0 for item in decisions[:5]: if not isinstance(item, dict): continue d_type = item.get("type", "decision") if d_type not in VALID_DECISION_TYPES: d_type = "decision" d_title = (item.get("title") or "").strip() d_desc = (item.get("description") or "").strip() if not d_title or not d_desc: continue saved = models.add_decision_if_new( conn, project_id=project_id, type=d_type, title=d_title, description=d_desc, tags=item.get("tags") or [], task_id=task_id, ) if saved: added += 1 else: skipped += 1 return {"added": added, "skipped": skipped} # --------------------------------------------------------------------------- # Department head detection # --------------------------------------------------------------------------- # Cache of roles with execution_type=department_head from specialists.yaml _DEPT_HEAD_ROLES: set[str] | None = None def _is_department_head(role: str) -> bool: """Check if a role is a department head. Uses execution_type from specialists.yaml as primary check, falls back to role.endswith('_head') convention. """ global _DEPT_HEAD_ROLES if _DEPT_HEAD_ROLES is None: try: from core.context_builder import _load_specialists specs = _load_specialists() all_specs = specs.get("specialists", {}) _DEPT_HEAD_ROLES = { name for name, spec in all_specs.items() if spec.get("execution_type") == "department_head" } except Exception: _DEPT_HEAD_ROLES = set() return role in _DEPT_HEAD_ROLES or role.endswith("_head") # --------------------------------------------------------------------------- # Department head sub-pipeline execution # --------------------------------------------------------------------------- def _execute_department_head_step( conn: sqlite3.Connection, task_id: str, project_id: str, parent_pipeline_id: int | None, step: dict, dept_head_result: dict, allow_write: bool = False, noninteractive: bool = False, next_department: str | None = None, ) -> dict: """Execute sub-pipeline planned by a department head. Parses the dept head's JSON output, validates the sub_pipeline, creates a child pipeline in DB, runs it, and saves a handoff record. Returns dict with success, output, cost_usd, tokens_used, duration_seconds. """ raw = dept_head_result.get("raw_output") or dept_head_result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return { "success": False, "output": "Department head returned non-JSON output", "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } # Blocked status from dept head if parsed.get("status") == "blocked": reason = parsed.get("blocked_reason", "Department head reported blocked") return { "success": False, "output": json.dumps(parsed, ensure_ascii=False), "blocked": True, "blocked_reason": reason, "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } sub_pipeline = parsed.get("sub_pipeline", []) if not isinstance(sub_pipeline, list) or not sub_pipeline: return { "success": False, "output": "Department head returned empty or invalid sub_pipeline", "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } # Recursion guard: no department head roles allowed in sub_pipeline for sub_step in sub_pipeline: if isinstance(sub_step, dict) and _is_department_head(str(sub_step.get("role", ""))): return { "success": False, "output": f"Recursion blocked: sub_pipeline contains _head role '{sub_step['role']}'", "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } role = step["role"] dept_name = role.replace("_head", "") # Build initial context for workers: dept head's plan + artifacts dept_plan_context = json.dumps({ "department_head_plan": { "department": dept_name, "artifacts": parsed.get("artifacts", {}), "handoff_notes": parsed.get("handoff_notes", ""), }, }, ensure_ascii=False) # Run the sub-pipeline (noninteractive=True — Opus already reviewed the plan) # pass parent_pipeline_id and department so run_pipeline creates the child # pipeline with correct attributes (route_type='dept_sub') — no double create sub_result = run_pipeline( conn, task_id, sub_pipeline, dry_run=False, allow_write=allow_write, noninteractive=True, initial_previous_output=dept_plan_context, parent_pipeline_id=parent_pipeline_id, department=dept_name, ) # Extract decisions from sub-pipeline results for handoff decisions_made = [] sub_results = sub_result.get("results", []) for sr in sub_results: output = sr.get("output") or sr.get("raw_output") or "" if isinstance(output, str): try: output = json.loads(output) except (json.JSONDecodeError, ValueError): pass if isinstance(output, dict): # Reviewer/tester may include decisions or findings for key in ("decisions", "findings", "recommendations"): val = output.get(key) if isinstance(val, list): decisions_made.extend(val) elif isinstance(val, str) and val: decisions_made.append(val) # Determine last worker role for auto_complete tracking last_sub_role = sub_pipeline[-1].get("role", "") if sub_pipeline else "" # Save handoff for inter-department context handoff_status = "done" if sub_result.get("success") else "partial" try: models.create_handoff( conn, pipeline_id=parent_pipeline_id or child_pipeline["id"], task_id=task_id, from_department=dept_name, to_department=next_department, artifacts=parsed.get("artifacts", {}), decisions_made=decisions_made, blockers=[], status=handoff_status, ) except Exception: pass # Handoff save errors must never block pipeline # Build summary output for the next pipeline step summary = { "from_department": dept_name, "handoff_notes": parsed.get("handoff_notes", ""), "artifacts": parsed.get("artifacts", {}), "sub_pipeline_summary": { "steps_completed": sub_result.get("steps_completed", 0), "success": sub_result.get("success", False), }, } return { "success": sub_result.get("success", False), "output": json.dumps(summary, ensure_ascii=False), "cost_usd": sub_result.get("total_cost_usd", 0), "tokens_used": sub_result.get("total_tokens", 0), "duration_seconds": sub_result.get("total_duration_seconds", 0), "last_sub_role": last_sub_role, } # --------------------------------------------------------------------------- # Watchdog helpers # --------------------------------------------------------------------------- import errno as _errno def _check_parent_alive( conn: sqlite3.Connection, pipeline: dict, task_id: str, project_id: str, ) -> bool: """Check if parent process is alive. Returns True if pipeline should abort. Only treats ESRCH (no such process) as dead parent. PermissionError (pid 1 / init) and ValueError are ignored — pipeline continues. """ ppid = os.getppid() try: os.kill(ppid, 0) except OSError as exc: if exc.errno == _errno.ESRCH: reason = f"Parent process died unexpectedly (PID {ppid})" _logger.warning("Pipeline %s: %s — aborting", pipeline["id"], reason) models.update_pipeline(conn, pipeline["id"], status="failed") models.update_task(conn, task_id, status="blocked", blocked_reason=reason) return True # PermissionError (EPERM) — process exists but we can't signal it: continue return False # --------------------------------------------------------------------------- # Pipeline executor # --------------------------------------------------------------------------- def run_pipeline( conn: sqlite3.Connection, task_id: str, steps: list[dict], dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, initial_previous_output: str | None = None, ) -> dict: """Execute a multi-step pipeline of agents. steps = [ {"role": "debugger", "model": "opus", "brief": "..."}, {"role": "tester", "depends_on": "debugger", "brief": "..."}, ] initial_previous_output: context injected as previous_output for the first step (used by dept head sub-pipelines to pass artifacts/plan to workers). Returns {success, steps_completed, total_cost, total_tokens, total_duration, results} """ # Auth check — skip for dry_run (dry_run never calls claude CLI) if not dry_run: try: check_claude_auth() except ClaudeAuthError as exc: return { "success": False, "error": "claude_auth_required", "message": str(exc), "instructions": "Run: claude login", } task = models.get_task(conn, task_id) if not task: return {"success": False, "error": f"Task '{task_id}' not found"} project_id = task["project_id"] # Determine route type from steps or task brief route_type = "custom" if task.get("brief") and isinstance(task["brief"], dict): route_type = task["brief"].get("route_type", "custom") or "custom" # Determine execution mode (auto vs review) mode = models.get_effective_mode(conn, project_id, task_id) # Create pipeline in DB pipeline = None if not dry_run: pipeline = models.create_pipeline( conn, task_id, project_id, route_type, steps, ) # Save PID so watchdog can detect dead subprocesses (KIN-099) models.update_pipeline(conn, pipeline["id"], pid=os.getpid()) pipeline["pid"] = os.getpid() models.update_task(conn, task_id, status="in_progress") results = [] total_cost = 0.0 total_tokens = 0 total_duration = 0 previous_output = initial_previous_output _last_sub_role = None # Track last worker role from dept sub-pipelines (for auto_complete) for i, step in enumerate(steps): role = step["role"] model = step.get("model", "sonnet") brief = step.get("brief") # Check parent process is still alive (KIN-099 watchdog) if not dry_run and pipeline: if _check_parent_alive(conn, pipeline, task_id, project_id): return { "success": False, "error": "parent_process_died", "steps_completed": i, "total_cost": total_cost, "total_tokens": total_tokens, "total_duration": total_duration, "results": results, } # Worktree isolation: opt-in per project, for write-capable roles _WORKTREE_ROLES = {"backend_dev", "frontend_dev", "debugger"} worktree_path = None project_for_wt = models.get_project(conn, task["project_id"]) if not dry_run else None use_worktree = ( not dry_run and role in _WORKTREE_ROLES and project_for_wt and project_for_wt.get("worktrees_enabled") and project_for_wt.get("path") ) if use_worktree: try: from core.worktree import create_worktree, ensure_gitignore p_path = str(Path(project_for_wt["path"]).expanduser()) ensure_gitignore(p_path) worktree_path = create_worktree(p_path, task_id, role) except Exception: worktree_path = None # Fall back to normal execution try: result = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=dry_run, allow_write=allow_write, noninteractive=noninteractive, working_dir_override=worktree_path, ) except Exception as exc: exc_msg = f"Step {i+1}/{len(steps)} ({role}) raised exception: {exc}" if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=None, success=False, error_message=exc_msg, ) models.update_task(conn, task_id, status="blocked", blocked_reason=exc_msg) try: from core.telegram import send_telegram_escalation project = models.get_project(conn, project_id) project_name = project["name"] if project else project_id sent = send_telegram_escalation( task_id=task_id, project_name=project_name, agent_role=role, reason=exc_msg, pipeline_step=str(i + 1), ) if sent: models.mark_telegram_sent(conn, task_id) except Exception: pass # Telegram errors must never block pipeline return { "success": False, "error": exc_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } if dry_run: results.append(result) continue # Accumulate stats total_cost += result.get("cost_usd") or 0 total_tokens += result.get("tokens_used") or 0 total_duration += result.get("duration_seconds") or 0 if not result["success"]: # Auto mode: retry once with allow_write on permission error if mode == "auto_complete" and not allow_write and _is_permission_error(result): task_modules = models.get_modules(conn, project_id) try: run_hooks(conn, project_id, task_id, event="task_permission_retry", task_modules=task_modules) except Exception: pass # Audit log: record dangerous skip before retry try: models.log_audit_event( conn, event_type="dangerous_skip", task_id=task_id, step_id=role, reason=f"auto mode permission retry: step {i+1}/{len(steps)} ({role})", project_id=project_id, ) models.update_task(conn, task_id, dangerously_skipped=1) except Exception: pass retry = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=False, allow_write=True, noninteractive=noninteractive, ) allow_write = True # subsequent steps also with allow_write total_cost += retry.get("cost_usd") or 0 total_tokens += retry.get("tokens_used") or 0 total_duration += retry.get("duration_seconds") or 0 if retry["success"]: result = retry if not result["success"]: # Still failed — block regardless of mode results.append(result) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) agent_error = result.get("error") or "" error_msg = f"Step {i+1}/{len(steps)} ({role}) failed" if agent_error: error_msg += f": {agent_error}" models.update_task(conn, task_id, status="blocked", blocked_reason=error_msg) return { "success": False, "error": error_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Worktree merge/cleanup after successful step if worktree_path and result["success"] and not dry_run: try: from core.worktree import merge_worktree, cleanup_worktree p_path = str(Path(project_for_wt["path"]).expanduser()) merge_result = merge_worktree(worktree_path, p_path) if not merge_result["success"]: conflicts = merge_result.get("conflicts", []) conflict_msg = f"Worktree merge conflict in files: {', '.join(conflicts)}" if conflicts else "Worktree merge failed" models.update_task(conn, task_id, status="blocked", blocked_reason=conflict_msg) cleanup_worktree(worktree_path, p_path) if pipeline: models.update_pipeline(conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration) return { "success": False, "error": conflict_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } cleanup_worktree(worktree_path, p_path) except Exception: pass # Worktree errors must never block pipeline elif worktree_path and not dry_run: # Step failed — cleanup worktree without merging try: from core.worktree import cleanup_worktree p_path = str(Path(project_for_wt["path"]).expanduser()) cleanup_worktree(worktree_path, p_path) except Exception: pass results.append(result) # Semantic blocked: agent ran successfully but returned status='blocked' blocked_info = _parse_agent_blocked(result) if blocked_info: if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) models.update_task( conn, task_id, status="blocked", blocked_reason=blocked_info["reason"], blocked_at=blocked_info["blocked_at"], blocked_agent_role=role, blocked_pipeline_step=str(i + 1), ) try: from core.telegram import send_telegram_escalation project = models.get_project(conn, project_id) project_name = project["name"] if project else project_id sent = send_telegram_escalation( task_id=task_id, project_name=project_name, agent_role=role, reason=blocked_info["reason"], pipeline_step=str(i + 1), ) if sent: models.mark_telegram_sent(conn, task_id) except Exception: pass # Telegram errors must never block pipeline error_msg = f"Step {i+1}/{len(steps)} ({role}) blocked: {blocked_info['reason']}" return { "success": False, "error": error_msg, "blocked_by": role, "blocked_reason": blocked_info["reason"], "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Save sysadmin scan results immediately after a successful sysadmin step if role == "sysadmin" and result["success"] and not dry_run: try: _save_sysadmin_output(conn, project_id, task_id, result) except Exception: pass # Never block pipeline on sysadmin save errors # Save decomposer output: create child tasks from task_decomposer JSON if role == "task_decomposer" and result["success"] and not dry_run: try: _save_decomposer_output(conn, project_id, task_id, result) except Exception: pass # Never block pipeline on decomposer save errors # Department head: execute sub-pipeline planned by the dept head if _is_department_head(role) and result["success"] and not dry_run: # Determine next department for handoff routing _next_dept = None if i + 1 < len(steps): _next_role = steps[i + 1].get("role", "") if _is_department_head(_next_role): _next_dept = _next_role.replace("_head", "") dept_result = _execute_department_head_step( conn, task_id, project_id, parent_pipeline_id=pipeline["id"] if pipeline else None, step=step, dept_head_result=result, allow_write=allow_write, noninteractive=noninteractive, next_department=_next_dept, ) # Accumulate sub-pipeline costs total_cost += dept_result.get("cost_usd") or 0 total_tokens += dept_result.get("tokens_used") or 0 total_duration += dept_result.get("duration_seconds") or 0 if not dept_result.get("success"): # Sub-pipeline failed — handle as blocked results.append({"role": role, "_dept_sub": True, **dept_result}) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) error_msg = f"Department {role} sub-pipeline failed" models.update_task(conn, task_id, status="blocked", blocked_reason=error_msg) return { "success": False, "error": error_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Track last worker role from sub-pipeline for auto_complete eligibility if dept_result.get("last_sub_role"): _last_sub_role = dept_result["last_sub_role"] # Override previous_output with dept handoff summary (not raw dept head JSON) previous_output = dept_result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) continue # Project-level auto-test: run `make test` after backend_dev/frontend_dev steps. # Enabled per project via auto_test_enabled flag (opt-in). # On failure, loop fixer up to KIN_AUTO_TEST_MAX_ATTEMPTS times, then block. if ( not dry_run and role in _AUTO_TEST_ROLES and result["success"] and project_for_wt and project_for_wt.get("auto_test_enabled") and project_for_wt.get("path") ): max_auto_test_attempts = int(os.environ.get("KIN_AUTO_TEST_MAX_ATTEMPTS") or 3) p_path_str = str(Path(project_for_wt["path"]).expanduser()) p_test_cmd = project_for_wt.get("test_command") or "make test" test_run = _run_project_tests(p_path_str, p_test_cmd) results.append({"role": "_auto_test", "success": test_run["success"], "output": test_run["output"], "_project_test": True}) auto_test_attempt = 0 while not test_run["success"] and auto_test_attempt < max_auto_test_attempts: auto_test_attempt += 1 fix_context = ( f"Automated project test run ({p_test_cmd}) failed after your changes.\n" f"Test output:\n{test_run['output'][:4000]}\n" f"Fix the failing tests. Do NOT modify test files." ) fix_result = run_agent( conn, role, task_id, project_id, model=model, previous_output=fix_context, dry_run=False, allow_write=allow_write, noninteractive=noninteractive, ) total_cost += fix_result.get("cost_usd") or 0 total_tokens += fix_result.get("tokens_used") or 0 total_duration += fix_result.get("duration_seconds") or 0 results.append({**fix_result, "_auto_test_fix_attempt": auto_test_attempt}) test_run = _run_project_tests(p_path_str, p_test_cmd) results.append({"role": "_auto_test", "success": test_run["success"], "output": test_run["output"], "_project_test": True, "_attempt": auto_test_attempt}) if not test_run["success"]: block_reason = ( f"Auto-test ({p_test_cmd}) failed after {auto_test_attempt} fix attempt(s). " f"Last output: {test_run['output'][:500]}" ) models.update_task(conn, task_id, status="blocked", blocked_reason=block_reason) if pipeline: models.update_pipeline(conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration) return { "success": False, "error": block_reason, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Auto-test loop: if tester step has auto_fix=true and tests failed, # call fix_role agent and re-run tester up to max_attempts times. if ( not dry_run and step.get("auto_fix") and role == "tester" and result["success"] and _is_test_failure(result) ): max_attempts = int(step.get("max_attempts", 3)) fix_role = step.get("fix_role", "backend_dev") fix_model = step.get("fix_model", model) attempt = 0 while attempt < max_attempts and _is_test_failure(result): attempt += 1 tester_output = result.get("raw_output") or result.get("output") or "" if isinstance(tester_output, (dict, list)): tester_output = json.dumps(tester_output, ensure_ascii=False) # Run fixer fix_result = run_agent( conn, fix_role, task_id, project_id, model=fix_model, previous_output=tester_output, dry_run=False, allow_write=allow_write, noninteractive=noninteractive, ) total_cost += fix_result.get("cost_usd") or 0 total_tokens += fix_result.get("tokens_used") or 0 total_duration += fix_result.get("duration_seconds") or 0 results.append({**fix_result, "_auto_fix_attempt": attempt}) # Re-run tester fix_output = fix_result.get("raw_output") or fix_result.get("output") or "" if isinstance(fix_output, (dict, list)): fix_output = json.dumps(fix_output, ensure_ascii=False) retest = run_agent( conn, role, task_id, project_id, model=model, previous_output=fix_output, dry_run=False, allow_write=allow_write, noninteractive=noninteractive, ) total_cost += retest.get("cost_usd") or 0 total_tokens += retest.get("tokens_used") or 0 total_duration += retest.get("duration_seconds") or 0 result = retest results.append({**result, "_auto_retest_attempt": attempt}) # Save final test result regardless of outcome try: final_output = result.get("raw_output") or result.get("output") or "" models.update_task(conn, task_id, test_result={ "output": final_output if isinstance(final_output, str) else str(final_output), "auto_fix_attempts": attempt, "passed": not _is_test_failure(result), }) except Exception: pass # Chain output to next step previous_output = result.get("raw_output") or result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) # Pipeline completed if pipeline and not dry_run: models.update_pipeline( conn, pipeline["id"], status="completed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) task_modules = models.get_modules(conn, project_id) # Compute changed files for hook filtering (frontend build trigger) changed_files: list[str] | None = None project = models.get_project(conn, project_id) if project and project.get("path"): p_path = Path(project["path"]).expanduser() if p_path.is_dir(): changed_files = _get_changed_files(str(p_path)) last_role = steps[-1].get("role", "") if steps else "" # For dept pipelines: if last step is a _head, check the last worker in its sub-pipeline effective_last_role = _last_sub_role if (_is_department_head(last_role) and _last_sub_role) else last_role auto_eligible = effective_last_role in {"tester", "reviewer"} # Guard: re-fetch current status — user may have manually changed it while pipeline ran current_task = models.get_task(conn, task_id) current_status = current_task.get("status") if current_task else None if current_status in ("done", "cancelled"): pass # User finished manually — don't overwrite elif mode == "auto_complete" and auto_eligible: # Auto-complete mode: last step is tester/reviewer — skip review, approve immediately models.update_task(conn, task_id, status="done") try: run_hooks(conn, project_id, task_id, event="task_auto_approved", task_modules=task_modules) except Exception: pass try: run_hooks(conn, project_id, task_id, event="task_done", task_modules=task_modules) except Exception: pass # Auto followup: generate tasks, auto-resolve permission issues. # Guard: skip for followup-sourced tasks to prevent infinite recursion. task_brief = task.get("brief") or {} is_followup_task = ( isinstance(task_brief, dict) and str(task_brief.get("source", "")).startswith("followup:") ) if not is_followup_task: try: from core.followup import generate_followups, auto_resolve_pending_actions fu_result = generate_followups(conn, task_id) if fu_result.get("pending_actions"): auto_resolve_pending_actions(conn, task_id, fu_result["pending_actions"]) except Exception: pass else: # Review mode: wait for manual approval models.update_task(conn, task_id, status="review", execution_mode="review") # Run post-pipeline hooks (failures don't affect pipeline status) try: run_hooks(conn, project_id, task_id, event="pipeline_completed", task_modules=task_modules, changed_files=changed_files) except Exception: pass # Hook errors must never block pipeline completion # Auto-learning: extract decisions from pipeline results if results: try: _run_learning_extraction(conn, task_id, project_id, results) except Exception: pass # Learning errors must never block pipeline completion # Auto-commit changes after successful pipeline try: _run_autocommit(conn, task_id, project_id) except Exception: pass # Autocommit errors must never block pipeline completion return { "success": True, "steps_completed": len(steps), "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, "dry_run": dry_run, "mode": mode, }