""" Kin agent runner — launches Claude Code as subprocess with role-specific context. Each agent = separate process with isolated context. """ import errno as _errno import json import logging import os import re import shlex import shutil import sqlite3 import subprocess import time from pathlib import Path from typing import Any _logger = logging.getLogger("kin.runner") # Extra PATH entries to inject when searching for claude CLI. # launchctl daemons start with a stripped PATH that may omit these. _EXTRA_PATH_DIRS = [ "/opt/homebrew/bin", "/opt/homebrew/sbin", "/usr/local/bin", "/usr/local/sbin", ] # Default timeouts per model (seconds). Override globally with KIN_AGENT_TIMEOUT # or per role via timeout_seconds in specialists.yaml. _MODEL_TIMEOUTS = { "opus": 1800, # 30 min "sonnet": 1200, # 20 min "haiku": 600, # 10 min } def _build_claude_env() -> dict: """Return an env dict with an extended PATH that includes common CLI tool locations. Merges _EXTRA_PATH_DIRS with the current process PATH, deduplicating entries. Also resolves ~/.nvm/versions/node/*/bin globs that launchctl may not expand. """ env = os.environ.copy() existing = env.get("PATH", "").split(":") extra = list(_EXTRA_PATH_DIRS) # Expand nvm node bin dirs dynamically nvm_root = Path.home() / ".nvm" / "versions" / "node" if nvm_root.is_dir(): for node_ver in sorted(nvm_root.iterdir(), reverse=True): bin_dir = node_ver / "bin" if bin_dir.is_dir(): extra.append(str(bin_dir)) seen: set[str] = set() deduped: list[str] = [] for d in extra + existing: if d and d not in seen: seen.add(d) deduped.append(d) env["PATH"] = ":".join(deduped) # Ensure SSH agent is available for agents that connect via SSH. # Under launchd, SSH_AUTH_SOCK is not inherited — detect macOS system socket. if "SSH_AUTH_SOCK" not in env: import glob socks = glob.glob("/private/tmp/com.apple.launchd.*/Listeners") if socks: env["SSH_AUTH_SOCK"] = socks[0] return env def _resolve_claude_cmd() -> str: """Return the full path to the claude CLI, or 'claude' as fallback.""" extended_env = _build_claude_env() found = shutil.which("claude", path=extended_env["PATH"]) return found or "claude" from core import models from core.context_builder import build_context, format_prompt from core.hooks import run_hooks class ClaudeAuthError(Exception): """Raised when Claude CLI is not authenticated or not available.""" pass def check_claude_auth(timeout: int = 10) -> None: """Check that claude CLI is authenticated before running a pipeline. Runs: claude -p 'ok' --output-format json with timeout. Returns None if auth is confirmed. Raises ClaudeAuthError if: - claude CLI not found in PATH (FileNotFoundError) - stdout/stderr contains 'not logged in' (case-insensitive) - returncode != 0 - is_error=true in parsed JSON output Returns silently on TimeoutExpired (ambiguous — don't block pipeline). """ claude_cmd = _resolve_claude_cmd() env = _build_claude_env() try: proc = subprocess.run( [claude_cmd, "-p", "ok", "--output-format", "json"], capture_output=True, text=True, timeout=timeout, env=env, stdin=subprocess.DEVNULL, ) except FileNotFoundError: raise ClaudeAuthError("claude CLI not found in PATH. Install it or add to PATH.") except subprocess.TimeoutExpired: return # Ambiguous — don't block pipeline on timeout stdout = proc.stdout or "" stderr = proc.stderr or "" combined = stdout + stderr if "not logged in" in combined.lower(): raise ClaudeAuthError("Claude CLI requires login. Run: claude login") if proc.returncode != 0: raise ClaudeAuthError("Claude CLI requires login. Run: claude login") parsed = _try_parse_json(stdout) if isinstance(parsed, dict) and parsed.get("is_error"): raise ClaudeAuthError("Claude CLI requires login. Run: claude login") def run_agent( conn: sqlite3.Connection, role: str, task_id: str, project_id: str, model: str = "sonnet", previous_output: str | None = None, brief_override: str | None = None, dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, working_dir_override: str | None = None, ) -> dict: """Run a single Claude Code agent as a subprocess. 1. Build context from DB 2. Format prompt with role template 3. Run: claude -p "{prompt}" --output-format json 4. Log result to agent_logs 5. Return {success, output, tokens_used, duration_seconds, cost_usd} """ # Build context ctx = build_context(conn, task_id, role, project_id) if previous_output: ctx["previous_output"] = previous_output if brief_override: if ctx.get("task"): ctx["task"]["brief"] = brief_override prompt = format_prompt(ctx, role) if dry_run: return { "success": True, "output": None, "prompt": prompt, "role": role, "model": model, "dry_run": True, } # Determine working directory project = models.get_project(conn, project_id) working_dir = None # Operations projects have no local path — sysadmin works via SSH is_operations = project and project.get("project_type") == "operations" if working_dir_override: working_dir = working_dir_override elif not is_operations and project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security", "constitution", "spec", "task_decomposer"): project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Determine timeout: role-specific (specialists.yaml) > model-based > default role_timeout = None try: from core.context_builder import _load_specialists specs = _load_specialists().get("specialists", {}) role_spec = specs.get(role, {}) if role_spec.get("timeout_seconds"): role_timeout = int(role_spec["timeout_seconds"]) except Exception: pass # Run claude subprocess start = time.monotonic() result = _run_claude(prompt, model=model, working_dir=working_dir, allow_write=allow_write, noninteractive=noninteractive, timeout=role_timeout) duration = int(time.monotonic() - start) # Parse output — ensure output_text is always a string for DB storage raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) output_text = raw_output success = result["returncode"] == 0 parsed_output = _try_parse_json(output_text) # Log FULL output to DB (no truncation) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=output_text or None, tokens_used=result.get("tokens_used"), model=model, cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) return { "success": success, "error": result.get("error") if not success else None, "output": parsed_output if parsed_output else output_text, "raw_output": output_text, "role": role, "model": model, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } def _run_claude( prompt: str, model: str = "sonnet", working_dir: str | None = None, allow_write: bool = False, noninteractive: bool = False, timeout: int | None = None, ) -> dict: """Execute claude CLI as subprocess. Returns dict with output, returncode, etc.""" claude_cmd = _resolve_claude_cmd() cmd = [ claude_cmd, "-p", prompt, "--output-format", "json", "--model", model, ] if allow_write: cmd.append("--dangerously-skip-permissions") is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1" if timeout is None: env_timeout = os.environ.get("KIN_AGENT_TIMEOUT") if env_timeout: timeout = int(env_timeout) else: timeout = _MODEL_TIMEOUTS.get(model, _MODEL_TIMEOUTS["sonnet"]) env = _build_claude_env() try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, cwd=working_dir, env=env, stdin=subprocess.DEVNULL if is_noninteractive else None, ) except FileNotFoundError: return { "output": "", "error": "claude CLI not found in PATH", "returncode": 127, } except subprocess.TimeoutExpired: return { "output": "", "error": f"Agent timed out after {timeout}s", "returncode": 124, } # Always preserve the full raw stdout raw_stdout = proc.stdout or "" result: dict[str, Any] = { "output": raw_stdout, "error": proc.stderr or None, # preserve stderr always for diagnostics "empty_output": not raw_stdout.strip(), "returncode": proc.returncode, } # Parse JSON wrapper from claude --output-format json # Extract metadata (tokens, cost) but keep output as the full content string parsed = _try_parse_json(raw_stdout) if isinstance(parsed, dict): result["tokens_used"] = parsed.get("usage", {}).get("total_tokens") result["cost_usd"] = parsed.get("cost_usd") # Extract the agent's actual response, converting to string if needed content = parsed.get("result") or parsed.get("content") if content is not None: result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) return result def _try_parse_json(text: str) -> Any: """Try to parse JSON from text. Returns parsed obj or None.""" text = text.strip() if not text: return None # Direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Try to find JSON block in markdown code fences import re m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL) if m: try: return json.loads(m.group(1)) except json.JSONDecodeError: pass # Try to find first { ... } or [ ... ] for start_char, end_char in [("{", "}"), ("[", "]")]: start = text.find(start_char) if start >= 0: # Find matching close depth = 0 for i in range(start, len(text)): if text[i] == start_char: depth += 1 elif text[i] == end_char: depth -= 1 if depth == 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: break return None # --------------------------------------------------------------------------- # Backlog audit # --------------------------------------------------------------------------- PROMPTS_DIR = Path(__file__).parent / "prompts" _LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"} def run_audit( conn: sqlite3.Connection, project_id: str, noninteractive: bool = False, auto_apply: bool = False, ) -> dict: """Audit pending tasks against the actual codebase. auto_apply=True: marks already_done tasks as done in DB. auto_apply=False: returns results only (for API/GUI). Returns {success, already_done, still_pending, unclear, duration_seconds, ...} """ project = models.get_project(conn, project_id) if not project: return {"success": False, "error": f"Project '{project_id}' not found"} pending = models.list_tasks(conn, project_id=project_id, status="pending") if not pending: return { "success": True, "already_done": [], "still_pending": [], "unclear": [], "message": "No pending tasks to audit", } # Build prompt prompt_path = PROMPTS_DIR / "backlog_audit.md" template = prompt_path.read_text() if prompt_path.exists() else ( "You are a QA analyst. Check if pending tasks are already done in the code." ) task_list = [ {"id": t["id"], "title": t["title"], "brief": t.get("brief")} for t in pending ] sections = [ template, "", f"## Project: {project['id']} — {project['name']}", ] if project.get("tech_stack"): sections.append(f"Tech stack: {', '.join(project['tech_stack'])}") sections.append(f"Path: {project['path']}") sections.append("") sections.append(f"## Pending tasks ({len(task_list)}):") sections.append(json.dumps(task_list, ensure_ascii=False, indent=2)) sections.append("") language = project.get("language", "ru") lang_name = _LANG_NAMES.get(language, language) sections.append("## Language") sections.append(f"ALWAYS respond in {lang_name}.") sections.append("") prompt = "\n".join(sections) # Determine working dir working_dir = None project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run agent — allow_write=True so claude can use Read/Bash tools # without interactive permission prompts (critical for noninteractive mode) start = time.monotonic() result = _run_claude(prompt, model="sonnet", working_dir=working_dir, allow_write=True, noninteractive=noninteractive) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=None, agent_role="backlog_audit", action="audit", input_summary=f"project={project_id}, pending_tasks={len(pending)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) if not success: return { "success": False, "error": result.get("error", "Agent failed"), "raw_output": raw_output, "duration_seconds": duration, } # Parse structured output parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return { "success": False, "error": "Agent returned non-JSON output", "raw_output": raw_output, "duration_seconds": duration, } already_done = parsed.get("already_done", []) # Auto-apply: mark already_done tasks as done in DB applied = [] if auto_apply and already_done: for item in already_done: tid = item.get("id") if tid: t = models.get_task(conn, tid) if t and t["project_id"] == project_id and t["status"] == "pending": models.update_task(conn, tid, status="done") applied.append(tid) return { "success": True, "already_done": already_done, "still_pending": parsed.get("still_pending", []), "unclear": parsed.get("unclear", []), "applied": applied, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } # --------------------------------------------------------------------------- # Blocked protocol detection # --------------------------------------------------------------------------- def _parse_agent_blocked(result: dict) -> dict | None: """Detect semantic blocked status from a successful agent result. Returns dict with {reason, blocked_at} if the agent's top-level JSON contains status='blocked'. Returns None otherwise. Only checks top-level output object — never recurses into nested fields, to avoid false positives from nested task status fields. """ from datetime import datetime if not result.get("success"): return None output = result.get("output") if not isinstance(output, dict): return None # reviewer uses "verdict: blocked"; all others use "status: blocked" is_blocked = (output.get("status") == "blocked" or output.get("verdict") == "blocked") if not is_blocked: return None return { "reason": output.get("reason") or output.get("blocked_reason") or "", "blocked_at": output.get("blocked_at") or datetime.now().isoformat(), } # --------------------------------------------------------------------------- # Permission error detection # --------------------------------------------------------------------------- def _is_permission_error(result: dict) -> bool: """Return True if agent result indicates a permission/write failure.""" from core.followup import PERMISSION_PATTERNS output = (result.get("raw_output") or result.get("output") or "") if not isinstance(output, str): output = json.dumps(output, ensure_ascii=False) error = result.get("error") or "" text = output + " " + error return any(re.search(p, text) for p in PERMISSION_PATTERNS) # --------------------------------------------------------------------------- # Autocommit: git add -A && git commit after successful pipeline # --------------------------------------------------------------------------- def _get_changed_files(project_path: str) -> list[str]: """Return files changed in the current pipeline run. Combines unstaged changes, staged changes, and the last commit diff to cover both autocommit-on and autocommit-off scenarios. Returns [] on any git error (e.g. no git repo, first commit). """ env = _build_claude_env() git_cmd = shutil.which("git", path=env["PATH"]) or "git" files: set[str] = set() for git_args in ( ["diff", "--name-only"], # unstaged tracked changes ["diff", "--cached", "--name-only"], # staged changes ["diff", "HEAD~1", "HEAD", "--name-only"], # last commit (post-autocommit) ): try: r = subprocess.run( [git_cmd] + git_args, cwd=project_path, capture_output=True, text=True, timeout=10, env=env, ) if r.returncode == 0: files.update(f.strip() for f in r.stdout.splitlines() if f.strip()) except Exception: pass return list(files) def _run_autocommit( conn: sqlite3.Connection, task_id: str, project_id: str, ) -> None: """Auto-commit changes after successful pipeline completion. Runs: git add -A && git commit -m 'kin: {task_id} {title}'. Silently skips if nothing to commit (exit code 1) or project path not found. Never raises — autocommit errors must never block the pipeline. Uses stderr=subprocess.DEVNULL per decision #30. """ task = models.get_task(conn, task_id) project = models.get_project(conn, project_id) if not task or not project: return if not project.get("autocommit_enabled"): return project_path = Path(project["path"]).expanduser() if not project_path.is_dir(): return working_dir = str(project_path) env = _build_claude_env() git_cmd = shutil.which("git", path=env["PATH"]) or "git" title = (task.get("title") or "").replace('"', "'").replace("\n", " ").replace("\r", "") commit_msg = f"kin: {task_id} {title}" try: subprocess.run( [git_cmd, "add", "-A"], cwd=working_dir, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, ) result = subprocess.run( [git_cmd, "commit", "-m", commit_msg], cwd=working_dir, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, ) if result.returncode == 0: _logger.info("Autocommit: %s", commit_msg) else: _logger.debug("Autocommit: nothing to commit for %s", task_id) except Exception as exc: _logger.warning("Autocommit failed for %s: %s", task_id, exc) # --------------------------------------------------------------------------- # Sysadmin output: save server map to decisions and modules # --------------------------------------------------------------------------- def _save_sysadmin_output( conn: sqlite3.Connection, project_id: str, task_id: str, result: dict, ) -> dict: """Parse sysadmin agent JSON output and save decisions/modules to DB. Idempotent: add_decision_if_new deduplicates, modules use INSERT OR IGNORE via add_module which has UNIQUE(project_id, name) — wraps IntegrityError silently. Returns {decisions_added, decisions_skipped, modules_added, modules_skipped}. """ raw = result.get("raw_output") or result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return {"decisions_added": 0, "decisions_skipped": 0, "modules_added": 0, "modules_skipped": 0} decisions_added = 0 decisions_skipped = 0 for item in (parsed.get("decisions") or []): if not isinstance(item, dict): continue d_type = item.get("type", "decision") if d_type not in VALID_DECISION_TYPES: d_type = "decision" d_title = (item.get("title") or "").strip() d_desc = (item.get("description") or "").strip() if not d_title or not d_desc: continue saved = models.add_decision_if_new( conn, project_id=project_id, type=d_type, title=d_title, description=d_desc, tags=item.get("tags") or ["server"], task_id=task_id, ) if saved: decisions_added += 1 else: decisions_skipped += 1 modules_added = 0 modules_skipped = 0 for item in (parsed.get("modules") or []): if not isinstance(item, dict): continue m_name = (item.get("name") or "").strip() m_type = (item.get("type") or "service").strip() m_path = (item.get("path") or "").strip() if not m_name: continue try: m = models.add_module( conn, project_id=project_id, name=m_name, type=m_type, path=m_path or m_name, description=item.get("description"), owner_role="sysadmin", ) if m.get("_created", True): modules_added += 1 else: modules_skipped += 1 except Exception: modules_skipped += 1 return { "decisions_added": decisions_added, "decisions_skipped": decisions_skipped, "modules_added": modules_added, "modules_skipped": modules_skipped, } # --------------------------------------------------------------------------- # Auto-test: detect test failure in agent output # --------------------------------------------------------------------------- _TEST_FAILURE_PATTERNS = [ r"\bFAILED\b", r"\bFAIL\b", r"\d+\s+failed", r"test(?:s)?\s+failed", r"assert(?:ion)?\s*(error|failed)", r"exception(?:s)?\s+occurred", r"returncode\s*[!=]=\s*0", r"Error:\s", r"ERRORS?\b", ] _TEST_SUCCESS_PATTERNS = [ r"no\s+failures", r"all\s+tests?\s+pass", r"0\s+failed", r"passed.*no\s+errors", ] def _is_test_failure(result: dict) -> bool: """Return True if agent output indicates test failures. Checks for failure keywords, guards against false positives from explicit success phrases (e.g. 'no failures'). """ output = result.get("raw_output") or result.get("output") or "" if not isinstance(output, str): output = json.dumps(output, ensure_ascii=False) for p in _TEST_SUCCESS_PATTERNS: if re.search(p, output, re.IGNORECASE): return False for p in _TEST_FAILURE_PATTERNS: if re.search(p, output, re.IGNORECASE): return True return False # --------------------------------------------------------------------------- # Auto-test runner: run project tests via `make test` # --------------------------------------------------------------------------- # Roles that trigger auto-test when project.auto_test_enabled is set _AUTO_TEST_ROLES = {"backend_dev", "frontend_dev"} def _run_project_tests(project_path: str, test_command: str = 'make test', timeout: int = 120) -> dict: """Run test_command in project_path. Returns {success, output, returncode}. Never raises — all errors are captured and returned in output. """ env = _build_claude_env() parts = shlex.split(test_command) if not parts: return {"success": False, "output": "Empty test_command", "returncode": -1} resolved = shutil.which(parts[0], path=env["PATH"]) or parts[0] cmd = [resolved] + parts[1:] try: result = subprocess.run( cmd, cwd=project_path, capture_output=True, text=True, timeout=timeout, env=env, ) output = (result.stdout or "") + (result.stderr or "") return {"success": result.returncode == 0, "output": output, "returncode": result.returncode} except subprocess.TimeoutExpired: return {"success": False, "output": f"{test_command} timed out after {timeout}s", "returncode": 124} except FileNotFoundError: return {"success": False, "output": f"{parts[0]} not found in PATH", "returncode": 127} except Exception as exc: return {"success": False, "output": f"Test run error: {exc}", "returncode": -1} # --------------------------------------------------------------------------- # Decomposer output: create child tasks from task_decomposer JSON # --------------------------------------------------------------------------- def _save_decomposer_output( conn: sqlite3.Connection, project_id: str, parent_task_id: str, result: dict, ) -> dict: """Parse task_decomposer output and create child tasks in DB. Expected output format: {tasks: [{title, brief, priority, category, acceptance_criteria}]} Idempotent: skips tasks with same parent_task_id + title (case-insensitive). Returns {created: int, skipped: int}. """ raw = result.get("raw_output") or result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return {"created": 0, "skipped": 0, "error": "non-JSON decomposer output"} task_list = parsed.get("tasks", []) if not isinstance(task_list, list): return {"created": 0, "skipped": 0, "error": "invalid tasks format"} created = 0 skipped = 0 for item in task_list: if not isinstance(item, dict): continue title = (item.get("title") or "").strip() if not title: continue # Idempotency: skip if same parent + title already exists existing = conn.execute( """SELECT id FROM tasks WHERE parent_task_id = ? AND lower(trim(title)) = lower(trim(?))""", (parent_task_id, title), ).fetchone() if existing: skipped += 1 _logger.info( "task_decomposer: skip duplicate child task '%s' (parent=%s, existing=%s)", title, parent_task_id, existing[0], ) continue category = (item.get("category") or "").strip().upper() if category not in models.TASK_CATEGORIES: category = None task_id = models.next_task_id(conn, project_id, category=category) brief_text = item.get("brief") or "" models.create_task( conn, task_id, project_id, title, priority=item.get("priority", 5), brief={"text": brief_text, "source": f"decomposer:{parent_task_id}"}, category=category, acceptance_criteria=item.get("acceptance_criteria"), parent_task_id=parent_task_id, ) created += 1 return {"created": created, "skipped": skipped} # --------------------------------------------------------------------------- # Auto-learning: extract decisions from pipeline results # --------------------------------------------------------------------------- VALID_DECISION_TYPES = {"decision", "gotcha", "convention"} def _run_learning_extraction( conn: sqlite3.Connection, task_id: str, project_id: str, step_results: list[dict], ) -> dict: """Extract and save decisions from completed pipeline results. Calls the learner agent with step outputs + existing decisions, parses the JSON response, and saves new decisions via add_decision_if_new. Returns a summary dict with added/skipped counts. """ learner_prompt_path = PROMPTS_DIR / "learner.md" if not learner_prompt_path.exists(): return {"added": 0, "skipped": 0, "error": "learner.md not found"} template = learner_prompt_path.read_text() # Summarize step outputs (first 2000 chars each) step_summaries = {} for r in step_results: role = r.get("role", "unknown") output = r.get("raw_output") or r.get("output") or "" if isinstance(output, (dict, list)): output = json.dumps(output, ensure_ascii=False) step_summaries[role] = output[:2000] # Fetch existing decisions for dedup hint existing = models.get_decisions(conn, project_id) existing_hints = [ {"title": d["title"], "type": d["type"]} for d in existing ] prompt_parts = [ template, "", "## PIPELINE_OUTPUTS", json.dumps(step_summaries, ensure_ascii=False, indent=2), "", "## EXISTING_DECISIONS", json.dumps(existing_hints, ensure_ascii=False, indent=2), ] prompt = "\n".join(prompt_parts) learner_timeout = int(os.environ.get("KIN_LEARNER_TIMEOUT") or 120) start = time.monotonic() result = _run_claude(prompt, model="sonnet", noninteractive=True, timeout=learner_timeout) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role="learner", action="learn", input_summary=f"project={project_id}, task={task_id}, steps={len(step_results)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return {"added": 0, "skipped": 0, "error": "non-JSON learner output"} decisions = parsed.get("decisions", []) if not isinstance(decisions, list): return {"added": 0, "skipped": 0, "error": "invalid decisions format"} added = 0 skipped = 0 for item in decisions[:5]: if not isinstance(item, dict): continue d_type = item.get("type", "decision") if d_type not in VALID_DECISION_TYPES: d_type = "decision" d_title = (item.get("title") or "").strip() d_desc = (item.get("description") or "").strip() if not d_title or not d_desc: continue saved = models.add_decision_if_new( conn, project_id=project_id, type=d_type, title=d_title, description=d_desc, tags=item.get("tags") or [], task_id=task_id, ) if saved: added += 1 else: skipped += 1 return {"added": added, "skipped": skipped} # --------------------------------------------------------------------------- # Department head detection # --------------------------------------------------------------------------- # Cache of roles with execution_type=department_head from specialists.yaml _DEPT_HEAD_ROLES: set[str] | None = None def _is_department_head(role: str) -> bool: """Check if a role is a department head. Uses execution_type from specialists.yaml as primary check, falls back to role.endswith('_head') convention. """ global _DEPT_HEAD_ROLES if _DEPT_HEAD_ROLES is None: try: from core.context_builder import _load_specialists specs = _load_specialists() all_specs = specs.get("specialists", {}) _DEPT_HEAD_ROLES = { name for name, spec in all_specs.items() if spec.get("execution_type") == "department_head" } except Exception: _DEPT_HEAD_ROLES = set() return role in _DEPT_HEAD_ROLES or role.endswith("_head") # --------------------------------------------------------------------------- # Department head sub-pipeline execution # --------------------------------------------------------------------------- def _execute_department_head_step( conn: sqlite3.Connection, task_id: str, project_id: str, parent_pipeline_id: int | None, step: dict, dept_head_result: dict, allow_write: bool = False, noninteractive: bool = False, next_department: str | None = None, ) -> dict: """Execute sub-pipeline planned by a department head. Parses the dept head's JSON output, validates the sub_pipeline, creates a child pipeline in DB, runs it, and saves a handoff record. Returns dict with success, output, cost_usd, tokens_used, duration_seconds. """ raw = dept_head_result.get("raw_output") or dept_head_result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return { "success": False, "output": "Department head returned non-JSON output", "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } # Blocked status from dept head if parsed.get("status") == "blocked": reason = parsed.get("blocked_reason", "Department head reported blocked") # KIN-084: log dept head blocked if parent_pipeline_id: try: models.write_log( conn, parent_pipeline_id, f"Dept {step['role']} blocked: {reason}", level="WARN", extra={"role": step["role"], "blocked_reason": reason}, ) except Exception: pass return { "success": False, "output": json.dumps(parsed, ensure_ascii=False), "blocked": True, "blocked_reason": reason, "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } sub_pipeline = parsed.get("sub_pipeline", []) if not isinstance(sub_pipeline, list) or not sub_pipeline: return { "success": False, "output": "Department head returned empty or invalid sub_pipeline", "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } # Recursion guard: no department head roles allowed in sub_pipeline for sub_step in sub_pipeline: if isinstance(sub_step, dict) and _is_department_head(str(sub_step.get("role", ""))): return { "success": False, "output": f"Recursion blocked: sub_pipeline contains _head role '{sub_step['role']}'", "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } role = step["role"] dept_name = role.replace("_head", "") # Build initial context for workers: dept head's plan + artifacts dept_plan_context = json.dumps({ "department_head_plan": { "department": dept_name, "artifacts": parsed.get("artifacts", {}), "handoff_notes": parsed.get("handoff_notes", ""), }, }, ensure_ascii=False) # KIN-084: log sub-pipeline start if parent_pipeline_id: try: sub_roles = [s.get("role", "") for s in sub_pipeline] models.write_log( conn, parent_pipeline_id, f"Sub-pipeline start: dept={dept_name}, steps={len(sub_pipeline)}", extra={"dept_name": dept_name, "sub_steps": len(sub_pipeline), "sub_roles": sub_roles}, ) except Exception: pass # Run the sub-pipeline (noninteractive=True — Opus already reviewed the plan) # pass parent_pipeline_id and department so run_pipeline creates the child # pipeline with correct attributes (route_type='dept_sub') — no double create sub_result = run_pipeline( conn, task_id, sub_pipeline, dry_run=False, allow_write=allow_write, noninteractive=True, initial_previous_output=dept_plan_context, parent_pipeline_id=parent_pipeline_id, department=dept_name, ) # KIN-084: log sub-pipeline done if parent_pipeline_id: try: sub_success = sub_result.get("success", False) models.write_log( conn, parent_pipeline_id, f"Sub-pipeline done: dept={dept_name}, success={sub_success}, steps={sub_result.get('steps_completed', 0)}", level="INFO" if sub_success else "ERROR", extra={"dept_name": dept_name, "success": sub_success, "steps_completed": sub_result.get("steps_completed", 0)}, ) except Exception: pass # Extract decisions from sub-pipeline results for handoff decisions_made = [] sub_results = sub_result.get("results", []) for sr in sub_results: output = sr.get("output") or sr.get("raw_output") or "" if isinstance(output, str): try: output = json.loads(output) except (json.JSONDecodeError, ValueError): pass if isinstance(output, dict): # Reviewer/tester may include decisions or findings for key in ("decisions", "findings", "recommendations"): val = output.get(key) if isinstance(val, list): decisions_made.extend(val) elif isinstance(val, str) and val: decisions_made.append(val) # Determine last worker role for auto_complete tracking last_sub_role = sub_pipeline[-1].get("role", "") if sub_pipeline else "" # Save handoff for inter-department context handoff_status = "done" if sub_result.get("success") else "partial" try: models.create_handoff( conn, pipeline_id=sub_result.get("pipeline_id") or parent_pipeline_id, task_id=task_id, from_department=dept_name, to_department=next_department, artifacts=parsed.get("artifacts", {}), decisions_made=decisions_made, blockers=[], status=handoff_status, ) except Exception: pass # Handoff save errors must never block pipeline # Build summary output for the next pipeline step summary = { "from_department": dept_name, "handoff_notes": parsed.get("handoff_notes", ""), "artifacts": parsed.get("artifacts", {}), "sub_pipeline_summary": { "steps_completed": sub_result.get("steps_completed", 0), "success": sub_result.get("success", False), }, } ret = { "success": sub_result.get("success", False), "output": json.dumps(summary, ensure_ascii=False), "cost_usd": sub_result.get("total_cost_usd", 0), "tokens_used": sub_result.get("total_tokens", 0), "duration_seconds": sub_result.get("total_duration_seconds", 0), "last_sub_role": last_sub_role, } if not sub_result.get("success"): ret["blocked_reason"] = sub_result.get("blocked_reason") or sub_result.get("error") ret["error"] = sub_result.get("error") return ret # --------------------------------------------------------------------------- # Watchdog helpers # --------------------------------------------------------------------------- def _check_parent_alive( conn: sqlite3.Connection, pipeline: dict, task_id: str, project_id: str, ) -> bool: """Check if parent process is alive. Returns True if pipeline should abort. Only treats ESRCH (no such process) as dead parent. PermissionError (pid 1 / init) and ValueError are ignored — pipeline continues. """ ppid = os.getppid() try: os.kill(ppid, 0) except OSError as exc: if exc.errno == _errno.ESRCH: reason = f"Parent process died unexpectedly (PID {ppid})" _logger.warning("Pipeline %s: %s — aborting", pipeline["id"], reason) models.update_pipeline(conn, pipeline["id"], status="failed") models.update_task(conn, task_id, status="blocked", blocked_reason=reason) return True # PermissionError (EPERM) — process exists but we can't signal it: continue return False # --------------------------------------------------------------------------- # Pipeline executor # --------------------------------------------------------------------------- def run_pipeline( conn: sqlite3.Connection, task_id: str, steps: list[dict], dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, initial_previous_output: str | None = None, parent_pipeline_id: int | None = None, department: str | None = None, ) -> dict: """Execute a multi-step pipeline of agents. steps = [ {"role": "debugger", "model": "opus", "brief": "..."}, {"role": "tester", "depends_on": "debugger", "brief": "..."}, ] initial_previous_output: context injected as previous_output for the first step (used by dept head sub-pipelines to pass artifacts/plan to workers). Returns {success, steps_completed, total_cost, total_tokens, total_duration, results} """ # Auth check — skip for dry_run (dry_run never calls claude CLI) if not dry_run: try: check_claude_auth() except ClaudeAuthError as exc: return { "success": False, "error": "claude_auth_required", "message": str(exc), "instructions": "Run: claude login", } task = models.get_task(conn, task_id) if not task: return {"success": False, "error": f"Task '{task_id}' not found"} project_id = task["project_id"] # Determine route type from steps or task brief route_type = "custom" if task.get("brief") and isinstance(task["brief"], dict): route_type = task["brief"].get("route_type", "custom") or "custom" # Determine execution mode (auto vs review) mode = models.get_effective_mode(conn, project_id, task_id) # Create pipeline in DB pipeline = None if not dry_run: effective_route_type = "dept_sub" if parent_pipeline_id else route_type pipeline = models.create_pipeline( conn, task_id, project_id, effective_route_type, steps, parent_pipeline_id=parent_pipeline_id, department=department, ) # Save PID so watchdog can detect dead subprocesses (KIN-099) pipeline = models.update_pipeline(conn, pipeline["id"], pid=os.getpid()) models.update_task(conn, task_id, status="in_progress") # KIN-084: log pipeline start try: models.write_log( conn, pipeline["id"], f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}", extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode}, ) except Exception: pass results = [] total_cost = 0.0 total_tokens = 0 total_duration = 0 previous_output = initial_previous_output _last_sub_role = None # Track last worker role from dept sub-pipelines (for auto_complete) for i, step in enumerate(steps): role = step["role"] model = step.get("model", "sonnet") brief = step.get("brief") # KIN-084: log step start try: if pipeline: brief_preview = (step.get("brief") or "")[:100] models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} start: role={role}, model={model}", level="INFO", extra={"role": role, "model": model, "brief_preview": brief_preview}, ) except Exception: pass # Check parent process is still alive (KIN-099 watchdog) if not dry_run and pipeline: if _check_parent_alive(conn, pipeline, task_id, project_id): return { "success": False, "error": "parent_process_died", "steps_completed": i, "total_cost": total_cost, "total_tokens": total_tokens, "total_duration": total_duration, "results": results, } # Worktree isolation: opt-in per project, for write-capable roles _WORKTREE_ROLES = {"backend_dev", "frontend_dev", "debugger"} worktree_path = None project_for_wt = models.get_project(conn, task["project_id"]) if not dry_run else None use_worktree = ( not dry_run and role in _WORKTREE_ROLES and project_for_wt and project_for_wt.get("worktrees_enabled") and project_for_wt.get("path") ) if use_worktree: try: from core.worktree import create_worktree, ensure_gitignore p_path = str(Path(project_for_wt["path"]).expanduser()) ensure_gitignore(p_path) worktree_path = create_worktree(p_path, task_id, role) except Exception: worktree_path = None # Fall back to normal execution try: result = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=dry_run, allow_write=allow_write, noninteractive=noninteractive, working_dir_override=worktree_path, ) except Exception as exc: exc_msg = f"Step {i+1}/{len(steps)} ({role}) raised exception: {exc}" if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) # KIN-084: log step exception try: models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} exception: {exc}", level="ERROR", extra={"role": role, "exc_type": type(exc).__name__}, ) except Exception: pass models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=None, success=False, error_message=exc_msg, ) models.update_task(conn, task_id, status="blocked", blocked_reason=exc_msg) try: from core.telegram import send_telegram_escalation project = models.get_project(conn, project_id) project_name = project["name"] if project else project_id sent = send_telegram_escalation( task_id=task_id, project_name=project_name, agent_role=role, reason=exc_msg, pipeline_step=str(i + 1), ) if sent: models.mark_telegram_sent(conn, task_id) except Exception: pass # Telegram errors must never block pipeline return { "success": False, "error": exc_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } if dry_run: results.append(result) continue # Accumulate stats total_cost += result.get("cost_usd") or 0 total_tokens += result.get("tokens_used") or 0 total_duration += result.get("duration_seconds") or 0 # KIN-084: log step done try: if pipeline: models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} done: role={role}, success={result.get('success')}", level="INFO", extra={ "role": role, "tokens_used": result.get("tokens_used") or 0, "cost_usd": result.get("cost_usd") or 0, "duration_seconds": result.get("duration_seconds") or 0, }, ) except Exception: pass if not result["success"]: # Auto mode: retry once with allow_write on permission error if mode == "auto_complete" and not allow_write and _is_permission_error(result): task_modules = models.get_modules(conn, project_id) try: run_hooks(conn, project_id, task_id, event="task_permission_retry", task_modules=task_modules) except Exception: pass # Audit log: record dangerous skip before retry try: models.log_audit_event( conn, event_type="dangerous_skip", task_id=task_id, step_id=role, reason=f"auto mode permission retry: step {i+1}/{len(steps)} ({role})", project_id=project_id, ) models.update_task(conn, task_id, dangerously_skipped=1) except Exception: pass retry = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=False, allow_write=True, noninteractive=noninteractive, ) allow_write = True # subsequent steps also with allow_write total_cost += retry.get("cost_usd") or 0 total_tokens += retry.get("tokens_used") or 0 total_duration += retry.get("duration_seconds") or 0 if retry["success"]: result = retry if not result["success"]: # Still failed — block regardless of mode results.append(result) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) agent_error = result.get("error") or "" # KIN-084: log step failed try: if pipeline: models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} failed: {agent_error}", level="ERROR", extra={"role": role, "error": agent_error}, ) except Exception: pass error_msg = f"Step {i+1}/{len(steps)} ({role}) failed" if agent_error: error_msg += f": {agent_error}" models.update_task(conn, task_id, status="blocked", blocked_reason=error_msg) return { "success": False, "error": error_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Worktree merge/cleanup after successful step if worktree_path and result["success"] and not dry_run: try: from core.worktree import merge_worktree, cleanup_worktree p_path = str(Path(project_for_wt["path"]).expanduser()) merge_result = merge_worktree(worktree_path, p_path) if not merge_result["success"]: conflicts = merge_result.get("conflicts", []) conflict_msg = f"Worktree merge conflict in files: {', '.join(conflicts)}" if conflicts else "Worktree merge failed" models.update_task(conn, task_id, status="blocked", blocked_reason=conflict_msg) cleanup_worktree(worktree_path, p_path) if pipeline: models.update_pipeline(conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration) return { "success": False, "error": conflict_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } cleanup_worktree(worktree_path, p_path) except Exception: pass # Worktree errors must never block pipeline elif worktree_path and not dry_run: # Step failed — cleanup worktree without merging try: from core.worktree import cleanup_worktree p_path = str(Path(project_for_wt["path"]).expanduser()) cleanup_worktree(worktree_path, p_path) except Exception: pass results.append(result) # Semantic blocked: agent ran successfully but returned status='blocked' blocked_info = _parse_agent_blocked(result) if blocked_info: if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) # KIN-084: log step blocked try: models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} blocked: {blocked_info['reason']}", level="WARN", extra={"role": role, "reason": blocked_info["reason"]}, ) except Exception: pass models.update_task( conn, task_id, status="blocked", blocked_reason=blocked_info["reason"], blocked_at=blocked_info["blocked_at"], blocked_agent_role=role, blocked_pipeline_step=str(i + 1), ) try: from core.telegram import send_telegram_escalation project = models.get_project(conn, project_id) project_name = project["name"] if project else project_id sent = send_telegram_escalation( task_id=task_id, project_name=project_name, agent_role=role, reason=blocked_info["reason"], pipeline_step=str(i + 1), ) if sent: models.mark_telegram_sent(conn, task_id) except Exception: pass # Telegram errors must never block pipeline error_msg = f"Step {i+1}/{len(steps)} ({role}) blocked: {blocked_info['reason']}" return { "success": False, "error": error_msg, "blocked_by": role, "blocked_reason": blocked_info["reason"], "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Save sysadmin scan results immediately after a successful sysadmin step if role == "sysadmin" and result["success"] and not dry_run: try: _save_sysadmin_output(conn, project_id, task_id, result) except Exception: pass # Never block pipeline on sysadmin save errors # Save decomposer output: create child tasks from task_decomposer JSON if role == "task_decomposer" and result["success"] and not dry_run: try: _save_decomposer_output(conn, project_id, task_id, result) except Exception: pass # Never block pipeline on decomposer save errors # Department head: execute sub-pipeline planned by the dept head if _is_department_head(role) and result["success"] and not dry_run: # Determine next department for handoff routing _next_dept = None if i + 1 < len(steps): _next_role = steps[i + 1].get("role", "") if _is_department_head(_next_role): _next_dept = _next_role.replace("_head", "") dept_result = _execute_department_head_step( conn, task_id, project_id, parent_pipeline_id=pipeline["id"] if pipeline else None, step=step, dept_head_result=result, allow_write=allow_write, noninteractive=noninteractive, next_department=_next_dept, ) # Accumulate sub-pipeline costs total_cost += dept_result.get("cost_usd") or 0 total_tokens += dept_result.get("tokens_used") or 0 total_duration += dept_result.get("duration_seconds") or 0 if not dept_result.get("success"): # Sub-pipeline failed — handle as blocked results.append({"role": role, "_dept_sub": True, **dept_result}) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) blocked_reason = dept_result.get("blocked_reason") or dept_result.get("error") or f"Department {role} sub-pipeline failed" error_msg = f"Department {role} sub-pipeline failed: {dept_result.get('output', '')[:200]}" models.update_task(conn, task_id, status="blocked", blocked_reason=blocked_reason) return { "success": False, "error": error_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Track last worker role from sub-pipeline for auto_complete eligibility if dept_result.get("last_sub_role"): _last_sub_role = dept_result["last_sub_role"] # Override previous_output with dept handoff summary (not raw dept head JSON) previous_output = dept_result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) continue # Project-level auto-test: run `make test` after backend_dev/frontend_dev steps. # Enabled per project via auto_test_enabled flag (opt-in). # On failure, loop fixer up to KIN_AUTO_TEST_MAX_ATTEMPTS times, then block. if ( not dry_run and role in _AUTO_TEST_ROLES and result["success"] and project_for_wt and project_for_wt.get("auto_test_enabled") and project_for_wt.get("path") ): max_auto_test_attempts = int(os.environ.get("KIN_AUTO_TEST_MAX_ATTEMPTS") or 3) p_path_str = str(Path(project_for_wt["path"]).expanduser()) p_test_cmd = project_for_wt.get("test_command") or "make test" test_run = _run_project_tests(p_path_str, p_test_cmd) results.append({"role": "_auto_test", "success": test_run["success"], "output": test_run["output"], "_project_test": True}) auto_test_attempt = 0 while not test_run["success"] and auto_test_attempt < max_auto_test_attempts: auto_test_attempt += 1 fix_context = ( f"Automated project test run ({p_test_cmd}) failed after your changes.\n" f"Test output:\n{test_run['output'][:4000]}\n" f"Fix the failing tests. Do NOT modify test files." ) fix_result = run_agent( conn, role, task_id, project_id, model=model, previous_output=fix_context, dry_run=False, allow_write=allow_write, noninteractive=noninteractive, ) total_cost += fix_result.get("cost_usd") or 0 total_tokens += fix_result.get("tokens_used") or 0 total_duration += fix_result.get("duration_seconds") or 0 results.append({**fix_result, "_auto_test_fix_attempt": auto_test_attempt}) test_run = _run_project_tests(p_path_str, p_test_cmd) results.append({"role": "_auto_test", "success": test_run["success"], "output": test_run["output"], "_project_test": True, "_attempt": auto_test_attempt}) if not test_run["success"]: block_reason = ( f"Auto-test ({p_test_cmd}) failed after {auto_test_attempt} fix attempt(s). " f"Last output: {test_run['output'][:500]}" ) models.update_task(conn, task_id, status="blocked", blocked_reason=block_reason) if pipeline: models.update_pipeline(conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration) return { "success": False, "error": block_reason, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Auto-test loop: if tester step has auto_fix=true and tests failed, # call fix_role agent and re-run tester up to max_attempts times. if ( not dry_run and step.get("auto_fix") and role == "tester" and result["success"] and _is_test_failure(result) ): max_attempts = int(step.get("max_attempts", 3)) fix_role = step.get("fix_role", "backend_dev") fix_model = step.get("fix_model", model) attempt = 0 while attempt < max_attempts and _is_test_failure(result): attempt += 1 tester_output = result.get("raw_output") or result.get("output") or "" if isinstance(tester_output, (dict, list)): tester_output = json.dumps(tester_output, ensure_ascii=False) # Run fixer fix_result = run_agent( conn, fix_role, task_id, project_id, model=fix_model, previous_output=tester_output, dry_run=False, allow_write=allow_write, noninteractive=noninteractive, ) total_cost += fix_result.get("cost_usd") or 0 total_tokens += fix_result.get("tokens_used") or 0 total_duration += fix_result.get("duration_seconds") or 0 results.append({**fix_result, "_auto_fix_attempt": attempt}) # Re-run tester fix_output = fix_result.get("raw_output") or fix_result.get("output") or "" if isinstance(fix_output, (dict, list)): fix_output = json.dumps(fix_output, ensure_ascii=False) retest = run_agent( conn, role, task_id, project_id, model=model, previous_output=fix_output, dry_run=False, allow_write=allow_write, noninteractive=noninteractive, ) total_cost += retest.get("cost_usd") or 0 total_tokens += retest.get("tokens_used") or 0 total_duration += retest.get("duration_seconds") or 0 result = retest results.append({**result, "_auto_retest_attempt": attempt}) # Save final test result regardless of outcome try: final_output = result.get("raw_output") or result.get("output") or "" models.update_task(conn, task_id, test_result={ "output": final_output if isinstance(final_output, str) else str(final_output), "auto_fix_attempts": attempt, "passed": not _is_test_failure(result), }) except Exception: pass # Chain output to next step previous_output = result.get("raw_output") or result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) # Pipeline completed if pipeline and not dry_run: models.update_pipeline( conn, pipeline["id"], status="completed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) # KIN-084: log pipeline completed try: models.write_log( conn, pipeline["id"], f"Pipeline completed: {len(steps)} steps, cost=${total_cost:.4f}, tokens={total_tokens}, duration={total_duration}s", extra={ "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "steps_count": len(steps), }, ) except Exception: pass task_modules = models.get_modules(conn, project_id) # Compute changed files for hook filtering (frontend build trigger) changed_files: list[str] | None = None project = models.get_project(conn, project_id) if project and project.get("path"): p_path = Path(project["path"]).expanduser() if p_path.is_dir(): changed_files = _get_changed_files(str(p_path)) last_role = steps[-1].get("role", "") if steps else "" # For dept pipelines: if last step is a _head, check the last worker in its sub-pipeline effective_last_role = _last_sub_role if (_is_department_head(last_role) and _last_sub_role) else last_role auto_eligible = effective_last_role in {"tester", "reviewer"} # Guard: re-fetch current status — user may have manually changed it while pipeline ran current_task = models.get_task(conn, task_id) current_status = current_task.get("status") if current_task else None if current_status in ("done", "cancelled"): pass # User finished manually — don't overwrite elif mode == "auto_complete" and auto_eligible: # Auto-complete mode: last step is tester/reviewer — skip review, approve immediately models.update_task(conn, task_id, status="done") # KIN-084: log task status try: models.write_log( conn, pipeline["id"], "Task status: done", extra={"task_status": "done", "mode": mode}, ) except Exception: pass try: run_hooks(conn, project_id, task_id, event="task_auto_approved", task_modules=task_modules) except Exception: pass try: run_hooks(conn, project_id, task_id, event="task_done", task_modules=task_modules) except Exception: pass # Auto followup: generate tasks, auto-resolve permission issues. # Guard: skip for followup-sourced tasks to prevent infinite recursion. task_brief = task.get("brief") or {} is_followup_task = ( isinstance(task_brief, dict) and str(task_brief.get("source", "")).startswith("followup:") ) if not is_followup_task: try: from core.followup import generate_followups, auto_resolve_pending_actions fu_result = generate_followups(conn, task_id) if fu_result.get("pending_actions"): auto_resolve_pending_actions(conn, task_id, fu_result["pending_actions"]) except Exception: pass else: # Review mode: wait for manual approval models.update_task(conn, task_id, status="review", execution_mode="review") # KIN-084: log task status try: models.write_log( conn, pipeline["id"], "Task status: review", extra={"task_status": "review", "mode": mode}, ) except Exception: pass # Run post-pipeline hooks (failures don't affect pipeline status) try: run_hooks(conn, project_id, task_id, event="pipeline_completed", task_modules=task_modules, changed_files=changed_files) except Exception: pass # Hook errors must never block pipeline completion # Auto-learning: extract decisions from pipeline results if results: try: _run_learning_extraction(conn, task_id, project_id, results) except Exception: pass # Learning errors must never block pipeline completion # Auto-commit changes after successful pipeline try: _run_autocommit(conn, task_id, project_id) except Exception: pass # Autocommit errors must never block pipeline completion return { "success": True, "steps_completed": len(steps), "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, "dry_run": dry_run, "mode": mode, }