""" Kin agent runner — launches Claude Code as subprocess with role-specific context. Each agent = separate process with isolated context. """ import errno as _errno import json import logging import os import re import shlex import shutil import sqlite3 import sys import subprocess import time from pathlib import Path from typing import Any _logger = logging.getLogger("kin.runner") # Extra PATH entries to inject when searching for claude CLI. # launchctl daemons start with a stripped PATH that may omit these. _EXTRA_PATH_DIRS = [ "/opt/homebrew/bin", "/opt/homebrew/sbin", "/usr/local/bin", "/usr/local/sbin", ] # Default timeouts per model (seconds). Override globally with KIN_AGENT_TIMEOUT # or per role via timeout_seconds in specialists.yaml. _MODEL_TIMEOUTS = { "opus": 2400, # 40 min "sonnet": 2400, # 40 min "haiku": 1200, # 20 min } def _build_claude_env() -> dict: """Return an env dict with an extended PATH that includes common CLI tool locations. Merges _EXTRA_PATH_DIRS with the current process PATH, deduplicating entries. Also resolves ~/.nvm/versions/node/*/bin globs that launchctl may not expand. """ env = os.environ.copy() # Propagate canonical DB path to agent subprocesses so they don't fall back # to module-relative path (which breaks in worktrees — KIN-129). if "KIN_DB_PATH" not in env: env["KIN_DB_PATH"] = str(Path.home() / ".kin" / "kin.db") existing = env.get("PATH", "").split(":") extra = list(_EXTRA_PATH_DIRS) # Expand nvm node bin dirs dynamically nvm_root = Path.home() / ".nvm" / "versions" / "node" if nvm_root.is_dir(): for node_ver in sorted(nvm_root.iterdir(), reverse=True): bin_dir = node_ver / "bin" if bin_dir.is_dir(): extra.append(str(bin_dir)) seen: set[str] = set() deduped: list[str] = [] for d in extra + existing: if d and d not in seen: seen.add(d) deduped.append(d) env["PATH"] = ":".join(deduped) # Ensure SSH agent is available for agents that connect via SSH. # Under launchd, SSH_AUTH_SOCK is not inherited — detect macOS system socket. if "SSH_AUTH_SOCK" not in env: import glob socks = glob.glob("/private/tmp/com.apple.launchd.*/Listeners") if socks: env["SSH_AUTH_SOCK"] = socks[0] return env def _resolve_claude_cmd() -> str: """Return the full path to the claude CLI, or 'claude' as fallback.""" extended_env = _build_claude_env() found = shutil.which("claude", path=extended_env["PATH"]) return found or "claude" from core import models from core.context_builder import build_context, format_prompt from core.hooks import run_hooks class ClaudeAuthError(Exception): """Raised when Claude CLI is not authenticated or not available.""" pass def check_claude_auth(timeout: int = 10) -> None: """Check that claude CLI is authenticated before running a pipeline. Runs: claude -p 'ok' --output-format json with timeout. Returns None if auth is confirmed. Raises ClaudeAuthError if: - claude CLI not found in PATH (FileNotFoundError) - stdout/stderr contains 'not logged in' (case-insensitive) - returncode != 0 - is_error=true in parsed JSON output Returns silently on TimeoutExpired (ambiguous — don't block pipeline). """ claude_cmd = _resolve_claude_cmd() env = _build_claude_env() try: proc = subprocess.run( [claude_cmd, "-p", "ok", "--output-format", "json"], capture_output=True, text=True, timeout=timeout, env=env, stdin=subprocess.DEVNULL, ) except FileNotFoundError: raise ClaudeAuthError("claude CLI not found in PATH. Install it or add to PATH.") except subprocess.TimeoutExpired: return # Ambiguous — don't block pipeline on timeout stdout = proc.stdout or "" stderr = proc.stderr or "" combined = stdout + stderr if "not logged in" in combined.lower(): raise ClaudeAuthError("Claude CLI requires login. Run: claude login") if proc.returncode != 0: raise ClaudeAuthError("Claude CLI requires login. Run: claude login") parsed = _try_parse_json(stdout) if isinstance(parsed, dict) and parsed.get("is_error"): raise ClaudeAuthError("Claude CLI requires login. Run: claude login") def run_agent( conn: sqlite3.Connection, role: str, task_id: str, project_id: str, model: str = "sonnet", previous_output: str | None = None, brief_override: str | None = None, dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, working_dir_override: str | None = None, ) -> dict: """Run a single Claude Code agent as a subprocess. 1. Build context from DB 2. Format prompt with role template 3. Run: claude -p "{prompt}" --output-format json 4. Log result to agent_logs 5. Return {success, output, tokens_used, duration_seconds, cost_usd} """ # Build context ctx = build_context(conn, task_id, role, project_id) if previous_output: ctx["previous_output"] = previous_output if brief_override: if ctx.get("task"): ctx["task"]["brief"] = brief_override prompt = format_prompt(ctx, role) if dry_run: return { "success": True, "output": None, "prompt": prompt, "role": role, "model": model, "dry_run": True, } # Determine working directory project = models.get_project(conn, project_id) working_dir = None # Operations projects have no local path — sysadmin works via SSH is_operations = project and project.get("project_type") == "operations" if working_dir_override: working_dir = working_dir_override elif not is_operations and project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security", "constitution", "spec", "task_decomposer"): project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Determine timeout: role-specific (specialists.yaml) > model-based > default role_timeout = None try: from core.context_builder import _load_specialists specs = _load_specialists().get("specialists", {}) role_spec = specs.get(role, {}) if role_spec.get("timeout_seconds"): role_timeout = int(role_spec["timeout_seconds"]) except Exception: pass # Run claude subprocess start = time.monotonic() result = _run_claude(prompt, model=model, working_dir=working_dir, allow_write=allow_write, noninteractive=noninteractive, timeout=role_timeout) duration = int(time.monotonic() - start) # Parse output — ensure output_text is always a string for DB storage raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) output_text = raw_output success = result["returncode"] == 0 parsed_output = _try_parse_json(output_text) # Log FULL output to DB (no truncation) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=output_text or None, tokens_used=result.get("tokens_used"), model=model, cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) return { "success": success, "error": result.get("error") if not success else None, "output": output_text if parsed_output is None else parsed_output, "raw_output": output_text, "role": role, "model": model, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } def _run_claude( prompt: str, model: str = "sonnet", working_dir: str | None = None, allow_write: bool = False, noninteractive: bool = False, timeout: int | None = None, ) -> dict: """Execute claude CLI as subprocess. Returns dict with output, returncode, etc.""" claude_cmd = _resolve_claude_cmd() cmd = [ claude_cmd, "-p", prompt, "--output-format", "json", "--model", model, ] is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1" if allow_write or is_noninteractive: cmd.append("--dangerously-skip-permissions") if timeout is None: env_timeout = os.environ.get("KIN_AGENT_TIMEOUT") if env_timeout: timeout = int(env_timeout) else: timeout = _MODEL_TIMEOUTS.get(model, _MODEL_TIMEOUTS["sonnet"]) env = _build_claude_env() try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, cwd=working_dir, env=env, stdin=subprocess.DEVNULL if is_noninteractive else None, ) except FileNotFoundError: return { "output": "", "error": "claude CLI not found in PATH", "returncode": 127, } except subprocess.TimeoutExpired: return { "output": "", "error": f"Agent timed out after {timeout}s", "returncode": 124, } # Always preserve the full raw stdout raw_stdout = proc.stdout or "" result: dict[str, Any] = { "output": raw_stdout, "error": proc.stderr or None, # preserve stderr always for diagnostics "empty_output": not raw_stdout.strip(), "returncode": proc.returncode, } # Parse JSON wrapper from claude --output-format json # Extract metadata (tokens, cost) but keep output as the full content string parsed = _try_parse_json(raw_stdout) if isinstance(parsed, dict): result["tokens_used"] = parsed.get("usage", {}).get("total_tokens") result["cost_usd"] = parsed.get("cost_usd") # Extract the agent's actual response, converting to string if needed result_val = parsed.get("result") content = result_val if result_val is not None else parsed.get("content") if content is not None: result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) return result def _try_parse_json(text: str) -> Any: """Try to parse JSON from text. Returns parsed obj or None.""" text = text.strip() if not text: return None # Direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Try to find JSON block in markdown code fences import re m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL) if m: try: return json.loads(m.group(1)) except json.JSONDecodeError: pass # Try to find first { ... } or [ ... ] for start_char, end_char in [("{", "}"), ("[", "]")]: start = text.find(start_char) if start >= 0: # Find matching close depth = 0 for i in range(start, len(text)): if text[i] == start_char: depth += 1 elif text[i] == end_char: depth -= 1 if depth == 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: break return None # --------------------------------------------------------------------------- # Backlog audit # --------------------------------------------------------------------------- PROMPTS_DIR = Path(__file__).parent / "prompts" _LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"} def run_audit( conn: sqlite3.Connection, project_id: str, noninteractive: bool = False, auto_apply: bool = False, ) -> dict: """Audit pending tasks against the actual codebase. auto_apply=True: marks already_done tasks as done in DB. auto_apply=False: returns results only (for API/GUI). Returns {success, already_done, still_pending, unclear, duration_seconds, ...} """ project = models.get_project(conn, project_id) if not project: return {"success": False, "error": f"Project '{project_id}' not found"} pending = models.list_tasks(conn, project_id=project_id, status="pending") if not pending: return { "success": True, "already_done": [], "still_pending": [], "unclear": [], "message": "No pending tasks to audit", } # Build prompt prompt_path = PROMPTS_DIR / "backlog_audit.md" template = prompt_path.read_text() if prompt_path.exists() else ( "You are a QA analyst. Check if pending tasks are already done in the code." ) task_list = [ {"id": t["id"], "title": t["title"], "brief": t.get("brief")} for t in pending ] sections = [ template, "", f"## Project: {project['id']} — {project['name']}", ] if project.get("tech_stack"): sections.append(f"Tech stack: {', '.join(project['tech_stack'])}") sections.append(f"Path: {project['path']}") sections.append("") sections.append(f"## Pending tasks ({len(task_list)}):") sections.append(json.dumps(task_list, ensure_ascii=False, indent=2)) sections.append("") language = project.get("language", "ru") lang_name = _LANG_NAMES.get(language, language) sections.append("## Language") sections.append(f"ALWAYS respond in {lang_name}.") sections.append("") prompt = "\n".join(sections) # Determine working dir working_dir = None project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run agent — allow_write=True so claude can use Read/Bash tools # without interactive permission prompts (critical for noninteractive mode) start = time.monotonic() result = _run_claude(prompt, model="sonnet", working_dir=working_dir, allow_write=True, noninteractive=noninteractive) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=None, agent_role="backlog_audit", action="audit", input_summary=f"project={project_id}, pending_tasks={len(pending)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) if not success: return { "success": False, "error": result.get("error", "Agent failed"), "raw_output": raw_output, "duration_seconds": duration, } # Parse structured output parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return { "success": False, "error": "Agent returned non-JSON output", "raw_output": raw_output, "duration_seconds": duration, } already_done = parsed.get("already_done", []) # Auto-apply: mark already_done tasks as done in DB applied = [] if auto_apply and already_done: for item in already_done: tid = item.get("id") if tid: t = models.get_task(conn, tid) if t and t["project_id"] == project_id and t["status"] == "pending": models.update_task(conn, tid, status="done") applied.append(tid) return { "success": True, "already_done": already_done, "still_pending": parsed.get("still_pending", []), "unclear": parsed.get("unclear", []), "applied": applied, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } # --------------------------------------------------------------------------- # Blocked protocol detection # --------------------------------------------------------------------------- def _parse_agent_blocked(result: dict) -> dict | None: """Detect semantic blocked status from a successful agent result. Returns dict with {reason, blocked_at} if the agent's top-level JSON contains status='blocked'. Returns None otherwise. Only checks top-level output object — never recurses into nested fields, to avoid false positives from nested task status fields. """ from datetime import datetime if not result.get("success"): return None output = result.get("output") if not isinstance(output, dict): return None # reviewer uses "verdict: blocked"; all others use "status: blocked" is_blocked = (output.get("status") == "blocked" or output.get("verdict") == "blocked") if not is_blocked: return None return { "reason": output.get("reason") or output.get("blocked_reason") or "", "blocked_at": output.get("blocked_at") or datetime.now().isoformat(), } # --------------------------------------------------------------------------- # Gate cannot-close detection (KIN-133) # --------------------------------------------------------------------------- def _find_gate_result(results: list[dict], role: str) -> dict | None: """Return the last successful result for the given gate role. Iterates results in reverse order to handle auto_fix retry loops where tester may appear multiple times; returns the latest successful attempt. """ for r in reversed(results): if r.get("role") == role and r.get("success"): return r return None def _parse_gate_cannot_close(result: dict, role: str) -> dict | None: """Detect gate rejection from a final gate agent (reviewer or tester). Returns dict with {reason} if the gate agent signals the task must NOT be closed. Returns None if the gate approves or if output format is unrecognised (fail-open: never block on unknown formats). Reviewer: verdict must be 'approved'; anything else ('changes_requested', 'revise', 'blocked') is a rejection. Note: 'blocked' is handled earlier by _parse_agent_blocked(), but if it somehow reaches here we treat it as cannot_close too. Tester: status must be 'passed'; 'failed' or 'blocked' are rejections. """ from datetime import datetime output = result.get("output") if not isinstance(output, dict): return None # fail-open: non-dict output → don't block if role == "reviewer": verdict = output.get("verdict") if verdict is None: return None # fail-open: no verdict field if verdict == "approved": return None # approved → close reason = output.get("reason") or output.get("summary") or f"Reviewer verdict: {verdict}" return {"reason": reason} if role == "tester": status = output.get("status") if status is None: return None # fail-open: no status field if status == "passed": return None # passed → close reason = output.get("reason") or output.get("blocked_reason") or f"Tester status: {status}" return {"reason": reason} return None # unknown gate role → fail-open # --------------------------------------------------------------------------- # Destructive operation detection (KIN-116) # --------------------------------------------------------------------------- # Patterns that indicate destructive operations in agent output. # Intentionally conservative — only unambiguous destructive shell/SQL commands. _DESTRUCTIVE_PATTERNS = [ # Shell: rm with recursive or force flags (alone or combined) r"\brm\s+(-[a-zA-Z]*[rf][a-zA-Z]*\s+|--recursive\s+|--force\s+)", # Shell: unlink (removes a file) r"\bunlink\s+\S", # SQL: DROP TABLE / DATABASE / INDEX / VIEW / SCHEMA r"\bDROP\s+(TABLE|DATABASE|INDEX|VIEW|SCHEMA)\b", # SQL: DELETE FROM without WHERE — full table delete is the risky form. # DELETE FROM ... WHERE ... is a targeted operation and is NOT flagged. r"\bDELETE\s+FROM\b(?![^;]*\bWHERE\b)", # Python: shutil.rmtree r"\bshutil\.rmtree\s*\(", # Python: os.remove / os.unlink r"\bos\.(remove|unlink)\s*\(", ] _DESTRUCTIVE_RE = [re.compile(p, re.DOTALL | re.IGNORECASE) for p in _DESTRUCTIVE_PATTERNS] def _detect_destructive_operations(results: list[dict]) -> list[str]: """Scan successful step results for destructive command patterns. Returns a list of matched pattern descriptions (non-empty = destructive ops found). Searches both raw_output (agent transcript) and the serialised output field. """ found: list[str] = [] for r in results: if not r.get("success"): continue raw = r.get("raw_output") or "" out = r.get("output") or "" if not isinstance(raw, str): raw = json.dumps(raw, ensure_ascii=False) if not isinstance(out, str): out = json.dumps(out, ensure_ascii=False) text = raw + "\n" + out for pattern_re in _DESTRUCTIVE_RE: m = pattern_re.search(text) if m: found.append(m.group(0).strip()) return found # --------------------------------------------------------------------------- # Permission error detection # --------------------------------------------------------------------------- def _is_permission_error(result: dict) -> bool: """Return True if agent result indicates a permission/write failure.""" from core.followup import PERMISSION_PATTERNS output = (result.get("raw_output") or result.get("output") or "") if not isinstance(output, str): output = json.dumps(output, ensure_ascii=False) error = result.get("error") or "" text = output + " " + error return any(re.search(p, text) for p in PERMISSION_PATTERNS) # --------------------------------------------------------------------------- # Autocommit: git add -A && git commit after successful pipeline # --------------------------------------------------------------------------- def _get_changed_files(project_path: str) -> list[str]: """Return files changed in the current pipeline run. Combines unstaged changes, staged changes, and the last commit diff to cover both autocommit-on and autocommit-off scenarios. Returns [] on any git error (e.g. no git repo, first commit). """ env = _build_claude_env() git_cmd = shutil.which("git", path=env["PATH"]) or "git" files: set[str] = set() for git_args in ( ["diff", "--name-only"], # unstaged tracked changes ["diff", "--cached", "--name-only"], # staged changes ["diff", "HEAD~1", "HEAD", "--name-only"], # last commit (post-autocommit) ): try: r = subprocess.run( [git_cmd] + git_args, cwd=project_path, capture_output=True, text=True, timeout=10, env=env, ) if r.returncode == 0: files.update(f.strip() for f in r.stdout.splitlines() if f.strip()) except Exception: pass return list(files) def _run_autocommit( conn: sqlite3.Connection, task_id: str, project_id: str, ) -> None: """Auto-commit changes after successful pipeline completion. Runs: git add -A && git commit -m 'kin: {task_id} {title}'. Silently skips if nothing to commit (exit code 1) or project path not found. Never raises — autocommit errors must never block the pipeline. Uses stderr=subprocess.DEVNULL per decision #30. """ task = models.get_task(conn, task_id) project = models.get_project(conn, project_id) if not task or not project: return if not project.get("autocommit_enabled"): return project_path = Path(project["path"]).expanduser() if not project_path.is_dir(): return working_dir = str(project_path) env = _build_claude_env() git_cmd = shutil.which("git", path=env["PATH"]) or "git" title = (task.get("title") or "").replace('"', "'").replace("\n", " ").replace("\r", "") commit_msg = f"kin: {task_id} {title}" try: subprocess.run( [git_cmd, "add", "-A"], cwd=working_dir, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, ) result = subprocess.run( [git_cmd, "commit", "-m", commit_msg], cwd=working_dir, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, ) if result.returncode == 0: _logger.info("Autocommit: %s", commit_msg) else: _logger.debug("Autocommit: nothing to commit for %s", task_id) except Exception as exc: _logger.warning("Autocommit failed for %s: %s", task_id, exc) # --------------------------------------------------------------------------- # Sysadmin output: save server map to decisions and modules # --------------------------------------------------------------------------- def _save_sysadmin_output( conn: sqlite3.Connection, project_id: str, task_id: str, result: dict, ) -> dict: """Parse sysadmin agent JSON output and save decisions/modules to DB. Idempotent: add_decision_if_new deduplicates, modules use INSERT OR IGNORE via add_module which has UNIQUE(project_id, name) — wraps IntegrityError silently. Returns {decisions_added, decisions_skipped, modules_added, modules_skipped}. """ raw = result.get("raw_output") or result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return {"decisions_added": 0, "decisions_skipped": 0, "modules_added": 0, "modules_skipped": 0} decisions_added = 0 decisions_skipped = 0 for item in (parsed.get("decisions") or []): if not isinstance(item, dict): continue d_type = item.get("type", "decision") if d_type not in VALID_DECISION_TYPES: d_type = "decision" d_title = (item.get("title") or "").strip() d_desc = (item.get("description") or "").strip() if not d_title or not d_desc: continue saved = models.add_decision_if_new( conn, project_id=project_id, type=d_type, title=d_title, description=d_desc, tags=item.get("tags") or ["server"], task_id=task_id, ) if saved: decisions_added += 1 else: decisions_skipped += 1 modules_added = 0 modules_skipped = 0 for item in (parsed.get("modules") or []): if not isinstance(item, dict): continue m_name = (item.get("name") or "").strip() m_type = (item.get("type") or "service").strip() m_path = (item.get("path") or "").strip() if not m_name: continue try: m = models.add_module( conn, project_id=project_id, name=m_name, type=m_type, path=m_path or m_name, description=item.get("description"), owner_role="sysadmin", ) if m.get("_created", True): modules_added += 1 else: modules_skipped += 1 except Exception: modules_skipped += 1 return { "decisions_added": decisions_added, "decisions_skipped": decisions_skipped, "modules_added": modules_added, "modules_skipped": modules_skipped, } # --------------------------------------------------------------------------- # Auto-test: detect test failure in agent output # --------------------------------------------------------------------------- _TEST_FAILURE_PATTERNS = [ r"\bFAILED\b", r"\bFAIL\b", r"\d+\s+failed", r"test(?:s)?\s+failed", r"assert(?:ion)?\s*(error|failed)", r"exception(?:s)?\s+occurred", r"returncode\s*[!=]=\s*0", r"Error:\s", r"ERRORS?\b", ] _TEST_SUCCESS_PATTERNS = [ r"no\s+failures", r"all\s+tests?\s+pass", r"0\s+failed", r"passed.*no\s+errors", ] def _is_test_failure(result: dict) -> bool: """Return True if agent output indicates test failures. Checks for failure keywords, guards against false positives from explicit success phrases (e.g. 'no failures'). """ output = result.get("raw_output") or result.get("output") or "" if not isinstance(output, str): output = json.dumps(output, ensure_ascii=False) for p in _TEST_SUCCESS_PATTERNS: if re.search(p, output, re.IGNORECASE): return False for p in _TEST_FAILURE_PATTERNS: if re.search(p, output, re.IGNORECASE): return True return False # --------------------------------------------------------------------------- # Auto-test runner: run project tests via `make test` # --------------------------------------------------------------------------- # Roles that trigger auto-test when project.auto_test_enabled is set _AUTO_TEST_ROLES = {"backend_dev", "frontend_dev"} _WORKTREE_ROLES = {"backend_dev", "frontend_dev", "debugger"} _DEV_GUARD_ROLES = {"backend_dev", "frontend_dev", "debugger"} def _detect_test_command(project_path: str, role: str | None = None) -> str | None: """Auto-detect test command by inspecting project files. Candidates (in priority order): 1. make test — Makefile exists and has a 'test' target 2. npm test — package.json exists and has scripts.test 3. pytest — pyproject.toml or setup.py exists 4. npx tsc --noEmit — tsconfig.json exists When role='backend_dev' and a Python project marker (pyproject.toml / setup.py) is present, pytest is returned directly — bypassing make test. This prevents false-positive failures in mixed projects whose Makefile test target also runs frontend (e.g. vitest) commands that may be unrelated to backend changes. Returns the first matching command, or None if no framework is detected. """ path = Path(project_path) # For backend_dev: Python project marker takes precedence over Makefile. # Rationale: make test in mixed projects often runs frontend tests too; # backend changes should only be validated by the Python test runner. if role == "backend_dev": if (path / "pyproject.toml").is_file() or (path / "setup.py").is_file(): return f"{sys.executable} -m pytest" # 1. make test makefile = path / "Makefile" if makefile.is_file(): try: content = makefile.read_text(errors="ignore") if re.search(r"^test\s*:", content, re.MULTILINE): return "make test" except OSError: pass # 2. npm test pkg_json = path / "package.json" if pkg_json.is_file(): try: pkg = json.loads(pkg_json.read_text()) if pkg.get("scripts", {}).get("test"): return "npm test" except (json.JSONDecodeError, OSError): pass # 3. pytest if (path / "pyproject.toml").is_file() or (path / "setup.py").is_file(): return f"{sys.executable} -m pytest" # 4. npx tsc --noEmit if (path / "tsconfig.json").is_file(): return "npx tsc --noEmit" return None def _run_project_tests(project_path: str, test_command: str = 'make test', timeout: int | None = None) -> dict: """Run test_command in project_path. Returns {success, output, returncode}. Never raises — all errors are captured and returned in output. """ if timeout is None: timeout = int(os.environ.get("KIN_AUTO_TEST_TIMEOUT") or 600) env = _build_claude_env() parts = shlex.split(test_command) if not parts: return {"success": False, "output": "Empty test_command", "returncode": -1} resolved = shutil.which(parts[0], path=env["PATH"]) or parts[0] cmd = [resolved] + parts[1:] try: result = subprocess.run( cmd, cwd=project_path, capture_output=True, text=True, timeout=timeout, env=env, ) output = (result.stdout or "") + (result.stderr or "") return {"success": result.returncode == 0, "output": output, "returncode": result.returncode} except subprocess.TimeoutExpired: return {"success": False, "output": f"{test_command} timed out after {timeout}s", "returncode": 124} except FileNotFoundError: return {"success": False, "output": f"{parts[0]} not found in PATH", "returncode": 127} except Exception as exc: return {"success": False, "output": f"Test run error: {exc}", "returncode": -1} # --------------------------------------------------------------------------- # Decomposer output: create child tasks from task_decomposer JSON # --------------------------------------------------------------------------- def _save_decomposer_output( conn: sqlite3.Connection, project_id: str, parent_task_id: str, result: dict, ) -> dict: """Parse task_decomposer output and create child tasks in DB. Expected output format: {tasks: [{title, brief, priority, category, acceptance_criteria}]} Idempotent: skips tasks with same parent_task_id + title (case-insensitive). Returns {created: int, skipped: int}. """ raw = result.get("raw_output") or result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return {"created": 0, "skipped": 0, "error": "non-JSON decomposer output"} task_list = parsed.get("tasks", []) if not isinstance(task_list, list): return {"created": 0, "skipped": 0, "error": "invalid tasks format"} created = 0 skipped = 0 for item in task_list: if not isinstance(item, dict): continue title = (item.get("title") or "").strip() if not title: continue # Idempotency: skip if same parent + title already exists existing = conn.execute( """SELECT id FROM tasks WHERE parent_task_id = ? AND lower(trim(title)) = lower(trim(?))""", (parent_task_id, title), ).fetchone() if existing: skipped += 1 _logger.info( "task_decomposer: skip duplicate child task '%s' (parent=%s, existing=%s)", title, parent_task_id, existing[0], ) continue category = (item.get("category") or "").strip().upper() if category not in models.TASK_CATEGORIES: category = None task_id = models.next_task_id(conn, project_id, category=category) brief_text = item.get("brief") or "" models.create_task( conn, task_id, project_id, title, priority=item.get("priority", 5), brief={"text": brief_text, "source": f"decomposer:{parent_task_id}"}, category=category, acceptance_criteria=item.get("acceptance_criteria"), parent_task_id=parent_task_id, ) created += 1 return {"created": created, "skipped": skipped} # --------------------------------------------------------------------------- # Tech debt: create followup child task from dev agent output # --------------------------------------------------------------------------- # Roles whose output is parsed for tech_debt (KIN-128) _TECH_DEBT_ROLES = {"backend_dev", "frontend_dev", "debugger", "sysadmin"} def _save_tech_debt_output( conn: sqlite3.Connection, project_id: str, task_id: str, result: dict, ) -> dict: """Parse dev agent JSON output for tech_debt field and create a child task. If the agent output contains a non-empty 'tech_debt' object with a 'description', creates one child task with title='[TECH DEBT] {description}'. At most 1 tech_debt task per call (prevents runaway task creation). Returns {created: bool, task_id: str | None}. """ raw = result.get("raw_output") or result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) try: parsed = _try_parse_json(raw) except Exception: return {"created": False, "task_id": None} if not isinstance(parsed, dict): return {"created": False, "task_id": None} tech_debt = parsed.get("tech_debt") if not isinstance(tech_debt, dict): return {"created": False, "task_id": None} description = (tech_debt.get("description") or "").strip() if not description: return {"created": False, "task_id": None} reason_temporary = (tech_debt.get("reason_temporary") or "").strip() proper_fix = (tech_debt.get("proper_fix") or "").strip() # Idempotency: skip if a [TECH DEBT] child with same description already exists title = f"[TECH DEBT] {description}" existing = conn.execute( """SELECT id FROM tasks WHERE parent_task_id = ? AND lower(trim(title)) = lower(trim(?))""", (task_id, title), ).fetchone() if existing: return {"created": False, "task_id": existing[0]} category = (tech_debt.get("category") or "").strip().upper() if category not in models.TASK_CATEGORIES: category = "FIX" brief_text = f"Технический долг из задачи {task_id}." if reason_temporary: brief_text += f"\n\nПричина временного решения: {reason_temporary}" if proper_fix: brief_text += f"\n\nПравильный фикс: {proper_fix}" new_task_id = models.next_task_id(conn, project_id, category=category) models.create_task( conn, new_task_id, project_id, title, priority=7, brief={"text": brief_text, "source": f"tech_debt:{task_id}"}, category=category, parent_task_id=task_id, ) _logger.info("tech_debt: created task %s for parent %s", new_task_id, task_id) return {"created": True, "task_id": new_task_id} # --------------------------------------------------------------------------- # Auto-learning: extract decisions from pipeline results # --------------------------------------------------------------------------- VALID_DECISION_TYPES = {"decision", "gotcha", "convention"} def _run_learning_extraction( conn: sqlite3.Connection, task_id: str, project_id: str, step_results: list[dict], ) -> dict: """Extract and save decisions from completed pipeline results. Calls the learner agent with step outputs + existing decisions, parses the JSON response, and saves new decisions via add_decision_if_new. Returns a summary dict with added/skipped counts. """ learner_prompt_path = PROMPTS_DIR / "learner.md" if not learner_prompt_path.exists(): return {"added": 0, "skipped": 0, "error": "learner.md not found"} template = learner_prompt_path.read_text() # Summarize step outputs (first 2000 chars each) step_summaries = {} for r in step_results: role = r.get("role", "unknown") output = r.get("raw_output") or r.get("output") or "" if isinstance(output, (dict, list)): output = json.dumps(output, ensure_ascii=False) step_summaries[role] = output[:2000] # Fetch existing decisions for dedup hint existing = models.get_decisions(conn, project_id) existing_hints = [ {"title": d["title"], "type": d["type"]} for d in existing ] prompt_parts = [ template, "", "## PIPELINE_OUTPUTS", json.dumps(step_summaries, ensure_ascii=False, indent=2), "", "## EXISTING_DECISIONS", json.dumps(existing_hints, ensure_ascii=False, indent=2), ] prompt = "\n".join(prompt_parts) learner_timeout = int(os.environ.get("KIN_LEARNER_TIMEOUT") or 120) start = time.monotonic() result = _run_claude(prompt, model="sonnet", noninteractive=True, timeout=learner_timeout) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role="learner", action="learn", input_summary=f"project={project_id}, task={task_id}, steps={len(step_results)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return {"added": 0, "skipped": 0, "error": "non-JSON learner output"} decisions = parsed.get("decisions", []) if not isinstance(decisions, list): return {"added": 0, "skipped": 0, "error": "invalid decisions format"} added = 0 skipped = 0 for item in decisions[:5]: if not isinstance(item, dict): continue d_type = item.get("type", "decision") if d_type not in VALID_DECISION_TYPES: d_type = "decision" d_title = (item.get("title") or "").strip() d_desc = (item.get("description") or "").strip() if not d_title or not d_desc: continue saved = models.add_decision_if_new( conn, project_id=project_id, type=d_type, title=d_title, description=d_desc, tags=item.get("tags") or [], task_id=task_id, ) if saved: added += 1 else: skipped += 1 return {"added": added, "skipped": skipped} # --------------------------------------------------------------------------- # Department head detection # --------------------------------------------------------------------------- # Cache of roles with execution_type=department_head from specialists.yaml _DEPT_HEAD_ROLES: set[str] | None = None def _is_department_head(role: str) -> bool: """Check if a role is a department head. Uses execution_type from specialists.yaml as primary check, falls back to role.endswith('_head') convention. """ global _DEPT_HEAD_ROLES if _DEPT_HEAD_ROLES is None: try: from core.context_builder import _load_specialists specs = _load_specialists() all_specs = specs.get("specialists", {}) _DEPT_HEAD_ROLES = { name for name, spec in all_specs.items() if spec.get("execution_type") == "department_head" } except Exception: _DEPT_HEAD_ROLES = set() return role in _DEPT_HEAD_ROLES or role.endswith("_head") # --------------------------------------------------------------------------- # Department head sub-pipeline execution # --------------------------------------------------------------------------- def _execute_department_head_step( conn: sqlite3.Connection, task_id: str, project_id: str, parent_pipeline_id: int | None, step: dict, dept_head_result: dict, allow_write: bool = False, noninteractive: bool = False, next_department: str | None = None, ) -> dict: """Execute sub-pipeline planned by a department head. Parses the dept head's JSON output, validates the sub_pipeline, creates a child pipeline in DB, runs it, and saves a handoff record. Returns dict with success, output, cost_usd, tokens_used, duration_seconds. """ raw = dept_head_result.get("raw_output") or dept_head_result.get("output") or "" if isinstance(raw, (dict, list)): raw = json.dumps(raw, ensure_ascii=False) parsed = _try_parse_json(raw) if not isinstance(parsed, dict): return { "success": False, "output": "Department head returned non-JSON output", "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } # Blocked status from dept head if parsed.get("status") == "blocked": reason = parsed.get("blocked_reason", "Department head reported blocked") # KIN-084: log dept head blocked if parent_pipeline_id: try: models.write_log( conn, parent_pipeline_id, f"Dept {step['role']} blocked: {reason}", level="WARN", extra={"role": step["role"], "blocked_reason": reason}, ) except Exception: pass return { "success": False, "output": json.dumps(parsed, ensure_ascii=False), "blocked": True, "blocked_reason": reason, "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } sub_pipeline = parsed.get("sub_pipeline", []) if not isinstance(sub_pipeline, list) or not sub_pipeline: return { "success": False, "output": "Department head returned empty or invalid sub_pipeline", "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } # Recursion guard: no department head roles allowed in sub_pipeline for sub_step in sub_pipeline: if isinstance(sub_step, dict) and _is_department_head(str(sub_step.get("role", ""))): return { "success": False, "output": f"Recursion blocked: sub_pipeline contains _head role '{sub_step['role']}'", "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, } role = step["role"] dept_name = role.replace("_head", "") # Build initial context for workers: dept head's plan + artifacts dept_plan_context = json.dumps({ "department_head_plan": { "department": dept_name, "artifacts": parsed.get("artifacts", {}), "handoff_notes": parsed.get("handoff_notes", ""), }, }, ensure_ascii=False) # KIN-084: log sub-pipeline start if parent_pipeline_id: try: sub_roles = [s.get("role", "") for s in sub_pipeline] models.write_log( conn, parent_pipeline_id, f"Sub-pipeline start: dept={dept_name}, steps={len(sub_pipeline)}", extra={"dept_name": dept_name, "sub_steps": len(sub_pipeline), "sub_roles": sub_roles}, ) except Exception: pass # Run the sub-pipeline (noninteractive=True — Opus already reviewed the plan) # pass parent_pipeline_id and department so run_pipeline creates the child # pipeline with correct attributes (route_type='dept_sub') — no double create sub_result = run_pipeline( conn, task_id, sub_pipeline, dry_run=False, allow_write=allow_write, noninteractive=True, initial_previous_output=dept_plan_context, parent_pipeline_id=parent_pipeline_id, department=dept_name, ) # KIN-084: log sub-pipeline done if parent_pipeline_id: try: sub_success = sub_result.get("success", False) models.write_log( conn, parent_pipeline_id, f"Sub-pipeline done: dept={dept_name}, success={sub_success}, steps={sub_result.get('steps_completed', 0)}", level="INFO" if sub_success else "ERROR", extra={"dept_name": dept_name, "success": sub_success, "steps_completed": sub_result.get("steps_completed", 0)}, ) except Exception: pass # Extract decisions from sub-pipeline results for handoff decisions_made = [] sub_results = sub_result.get("results", []) for sr in sub_results: output = sr.get("output") or sr.get("raw_output") or "" if isinstance(output, str): try: output = json.loads(output) except (json.JSONDecodeError, ValueError): pass if isinstance(output, dict): # Reviewer/tester may include decisions or findings for key in ("decisions", "findings", "recommendations"): val = output.get(key) if isinstance(val, list): decisions_made.extend(val) elif isinstance(val, str) and val: decisions_made.append(val) # Determine last worker role for auto_complete tracking last_sub_role = sub_pipeline[-1].get("role", "") if sub_pipeline else "" # Save handoff for inter-department context handoff_status = "done" if sub_result.get("success") else "partial" try: models.create_handoff( conn, pipeline_id=sub_result.get("pipeline_id") or parent_pipeline_id, task_id=task_id, from_department=dept_name, to_department=next_department, artifacts=parsed.get("artifacts", {}), decisions_made=decisions_made, blockers=[], status=handoff_status, ) except Exception: pass # Handoff save errors must never block pipeline # Build summary output for the next pipeline step summary = { "from_department": dept_name, "handoff_notes": parsed.get("handoff_notes", ""), "artifacts": parsed.get("artifacts", {}), "sub_pipeline_summary": { "steps_completed": sub_result.get("steps_completed", 0), "success": sub_result.get("success", False), }, } ret = { "success": sub_result.get("success", False), "output": json.dumps(summary, ensure_ascii=False), "cost_usd": sub_result.get("total_cost_usd", 0), "tokens_used": sub_result.get("total_tokens", 0), "duration_seconds": sub_result.get("total_duration_seconds", 0), "last_sub_role": last_sub_role, } if not sub_result.get("success"): ret["blocked_reason"] = sub_result.get("blocked_reason") or sub_result.get("error") ret["error"] = sub_result.get("error") return ret # --------------------------------------------------------------------------- # Watchdog helpers # --------------------------------------------------------------------------- def _check_parent_alive( conn: sqlite3.Connection, pipeline: dict, task_id: str, project_id: str, ) -> bool: """Check if parent process is alive. Returns True if pipeline should abort. Only treats ESRCH (no such process) as dead parent. PermissionError (pid 1 / init) and ValueError are ignored — pipeline continues. """ ppid = os.getppid() try: os.kill(ppid, 0) except OSError as exc: if exc.errno == _errno.ESRCH: reason = f"Parent process died unexpectedly (PID {ppid})" _logger.warning("Pipeline %s: %s — aborting", pipeline["id"], reason) models.update_pipeline(conn, pipeline["id"], status="failed") models.update_task(conn, task_id, status="blocked", blocked_reason=reason) return True # PermissionError (EPERM) — process exists but we can't signal it: continue return False # --------------------------------------------------------------------------- # Pipeline executor # --------------------------------------------------------------------------- def run_pipeline( conn: sqlite3.Connection, task_id: str, steps: list[dict], dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, initial_previous_output: str | None = None, parent_pipeline_id: int | None = None, department: str | None = None, pm_result: dict | None = None, pm_started_at: str | None = None, pm_ended_at: str | None = None, ) -> dict: """Execute a multi-step pipeline of agents. steps = [ {"role": "debugger", "model": "opus", "brief": "..."}, {"role": "tester", "depends_on": "debugger", "brief": "..."}, ] initial_previous_output: context injected as previous_output for the first step (used by dept head sub-pipelines to pass artifacts/plan to workers). Returns {success, steps_completed, total_cost, total_tokens, total_duration, results} """ # Guard: empty pipeline — return immediately without touching DB or task state if not steps: return {"success": False, "error": "empty_pipeline"} # Auth check — skip for dry_run (dry_run never calls claude CLI) if not dry_run: try: check_claude_auth() except ClaudeAuthError as exc: return { "success": False, "error": "claude_auth_required", "message": str(exc), "instructions": "Run: claude login", } task = models.get_task(conn, task_id) if not task: return {"success": False, "error": f"Task '{task_id}' not found"} project_id = task["project_id"] # Determine route type from steps or task brief route_type = "custom" if task.get("brief") and isinstance(task["brief"], dict): route_type = task["brief"].get("route_type", "custom") or "custom" # Determine execution mode (auto vs review) mode = models.get_effective_mode(conn, project_id, task_id) # Create pipeline in DB pipeline = None if not dry_run: effective_route_type = "dept_sub" if parent_pipeline_id else route_type pipeline = models.create_pipeline( conn, task_id, project_id, effective_route_type, steps, parent_pipeline_id=parent_pipeline_id, department=department, ) # Save PID so watchdog can detect dead subprocesses (KIN-099) pipeline = models.update_pipeline(conn, pipeline["id"], pid=os.getpid()) models.update_task(conn, task_id, status="in_progress") # KIN-084: log pipeline start try: models.write_log( conn, pipeline["id"], f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}", extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode}, ) except Exception: pass # KIN-OBS-025: log PM step retrospectively (after create_pipeline FK is satisfied) # Only for top-level pipelines — sub-pipelines have no PM step. if pm_result is not None and not parent_pipeline_id: try: models.write_log( conn, pipeline["id"], "PM start: task planning", level="INFO", ts=pm_started_at, extra={"role": "pm"}, ) except Exception: pass try: _n_steps = len(steps) _success = pm_result.get("success") _cost = pm_result.get("cost_usd") or 0.0 _tokens = pm_result.get("tokens_used") or 0 models.write_log( conn, pipeline["id"], f"PM done: {_n_steps} steps planned, success={_success}, cost=${_cost:.4f}, tokens={_tokens}", level="INFO", ts=pm_ended_at, extra={ "role": "pm", "steps_count": _n_steps, "success": _success, "cost_usd": _cost, "tokens_used": _tokens, }, ) except Exception: pass results = [] total_cost = 0.0 total_tokens = 0 total_duration = 0 previous_output = initial_previous_output _last_sub_role = None # Track last worker role from dept sub-pipelines (for auto_complete) for i, step in enumerate(steps): role = step["role"] model = step.get("model", "sonnet") brief = step.get("brief") # KIN-084: log step start try: if pipeline: brief_preview = (step.get("brief") or "")[:100] models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} start: role={role}, model={model}", level="INFO", extra={"role": role, "model": model, "brief_preview": brief_preview}, ) except Exception: pass # Check parent process is still alive (KIN-099 watchdog) if not dry_run and pipeline: if _check_parent_alive(conn, pipeline, task_id, project_id): return { "success": False, "error": "parent_process_died", "steps_completed": i, "total_cost": total_cost, "total_tokens": total_tokens, "total_duration": total_duration, "results": results, } # Worktree isolation: opt-in per project, for write-capable roles worktree_path = None project_for_wt = models.get_project(conn, task["project_id"]) if not dry_run else None use_worktree = ( not dry_run and role in _WORKTREE_ROLES and project_for_wt and project_for_wt.get("worktrees_enabled") and project_for_wt.get("path") ) if use_worktree: try: from core.worktree import create_worktree, ensure_gitignore p_path = str(Path(project_for_wt["path"]).expanduser()) ensure_gitignore(p_path) worktree_path = create_worktree(p_path, task_id, role) except Exception: worktree_path = None # Fall back to normal execution try: result = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=dry_run, allow_write=allow_write, noninteractive=noninteractive, working_dir_override=worktree_path, ) except Exception as exc: exc_msg = f"Step {i+1}/{len(steps)} ({role}) raised exception: {exc}" if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) # KIN-084: log step exception try: models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} exception: {exc}", level="ERROR", extra={"role": role, "exc_type": type(exc).__name__}, ) except Exception: pass models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=None, success=False, error_message=exc_msg, ) models.update_task(conn, task_id, status="blocked", blocked_reason=exc_msg) try: from core.telegram import send_telegram_escalation project = models.get_project(conn, project_id) project_name = project["name"] if project else project_id sent = send_telegram_escalation( task_id=task_id, project_name=project_name, agent_role=role, reason=exc_msg, pipeline_step=str(i + 1), ) if sent: models.mark_telegram_sent(conn, task_id) except Exception: pass # Telegram errors must never block pipeline return { "success": False, "error": exc_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } if dry_run: results.append(result) continue # Accumulate stats total_cost += result.get("cost_usd") or 0 total_tokens += result.get("tokens_used") or 0 total_duration += result.get("duration_seconds") or 0 # KIN-084: log step done try: if pipeline: models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} done: role={role}, success={result.get('success')}", level="INFO", extra={ "role": role, "tokens_used": result.get("tokens_used") or 0, "cost_usd": result.get("cost_usd") or 0, "duration_seconds": result.get("duration_seconds") or 0, }, ) except Exception: pass if not result["success"]: # Auto mode: retry once with allow_write on permission error if mode == "auto_complete" and not allow_write and _is_permission_error(result): task_modules = models.get_modules(conn, project_id) try: run_hooks(conn, project_id, task_id, event="task_permission_retry", task_modules=task_modules) except Exception: pass # Audit log: record dangerous skip before retry try: models.log_audit_event( conn, event_type="dangerous_skip", task_id=task_id, step_id=role, reason=f"auto mode permission retry: step {i+1}/{len(steps)} ({role})", project_id=project_id, ) models.update_task(conn, task_id, dangerously_skipped=1) except Exception: pass retry = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=False, allow_write=True, noninteractive=noninteractive, working_dir_override=worktree_path, ) allow_write = True # subsequent steps also with allow_write total_cost += retry.get("cost_usd") or 0 total_tokens += retry.get("tokens_used") or 0 total_duration += retry.get("duration_seconds") or 0 if retry["success"]: result = retry if not result["success"]: # Still failed — block regardless of mode results.append(result) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) agent_error = result.get("error") or "" # KIN-084: log step failed try: if pipeline: models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} failed: {agent_error}", level="ERROR", extra={"role": role, "error": agent_error}, ) except Exception: pass error_msg = f"Step {i+1}/{len(steps)} ({role}) failed" if agent_error: error_msg += f": {agent_error}" models.update_task(conn, task_id, status="blocked", blocked_reason=error_msg) # Worktree cleanup on step failure (KIN-103) if worktree_path and not dry_run: try: from core.worktree import cleanup_worktree p_path = str(Path(project_for_wt["path"]).expanduser()) cleanup_worktree(worktree_path, p_path) except Exception: pass return { "success": False, "error": error_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Worktree merge/cleanup after successful step if worktree_path and result["success"] and not dry_run: try: from core.worktree import merge_worktree, cleanup_worktree p_path = str(Path(project_for_wt["path"]).expanduser()) merge_result = merge_worktree(worktree_path, p_path) if not merge_result["success"]: conflicts = merge_result.get("conflicts", []) conflict_msg = f"Worktree merge conflict in files: {', '.join(conflicts)}" if conflicts else "Worktree merge failed" models.update_task(conn, task_id, status="blocked", blocked_reason=conflict_msg) cleanup_worktree(worktree_path, p_path) if pipeline: models.update_pipeline(conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration) return { "success": False, "error": conflict_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Fix D: suspicious guard — empty merged_files means agent made no changes if not merge_result.get("merged_files"): result["suspicious"] = True result["suspicious_reason"] = "agent reported success but no file changes detected" _logger.warning("KIN-117: suspicious step %s — no merged files after worktree merge", role) try: if pipeline: models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} suspicious: {role} reported success but merged no files", level="WARN", extra={"role": role}, ) except Exception: pass cleanup_worktree(worktree_path, p_path) except Exception: pass # Worktree errors must never block pipeline results.append(result) # Semantic blocked: agent ran successfully but returned status='blocked' blocked_info = _parse_agent_blocked(result) if blocked_info: if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) # KIN-084: log step blocked try: models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} blocked: {blocked_info['reason']}", level="WARN", extra={"role": role, "reason": blocked_info["reason"]}, ) except Exception: pass models.update_task( conn, task_id, status="blocked", blocked_reason=blocked_info["reason"], blocked_at=blocked_info["blocked_at"], blocked_agent_role=role, blocked_pipeline_step=str(i + 1), ) try: from core.telegram import send_telegram_escalation project = models.get_project(conn, project_id) project_name = project["name"] if project else project_id sent = send_telegram_escalation( task_id=task_id, project_name=project_name, agent_role=role, reason=blocked_info["reason"], pipeline_step=str(i + 1), ) if sent: models.mark_telegram_sent(conn, task_id) except Exception: pass # Telegram errors must never block pipeline error_msg = f"Step {i+1}/{len(steps)} ({role}) blocked: {blocked_info['reason']}" return { "success": False, "error": error_msg, "blocked_by": role, "blocked_reason": blocked_info["reason"], "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Save sysadmin scan results immediately after a successful sysadmin step if role == "sysadmin" and result["success"] and not dry_run: try: _save_sysadmin_output(conn, project_id, task_id, result) except Exception: pass # Never block pipeline on sysadmin save errors # Save decomposer output: create child tasks from task_decomposer JSON if role == "task_decomposer" and result["success"] and not dry_run: try: _save_decomposer_output(conn, project_id, task_id, result) except Exception: pass # Never block pipeline on decomposer save errors # Smoke tester: parse result and escalate if cannot_confirm (KIN-128) if role == "smoke_tester" and result["success"] and not dry_run: smoke_output = result.get("output") or result.get("raw_output") or "" smoke_parsed = None try: if isinstance(smoke_output, dict): smoke_parsed = smoke_output elif isinstance(smoke_output, str): smoke_parsed = _try_parse_json(smoke_output) except Exception: pass if isinstance(smoke_parsed, dict): # Save smoke_test_result regardless of outcome try: models.update_task(conn, task_id, smoke_test_result=smoke_parsed) except Exception: pass smoke_status = smoke_parsed.get("status", "") if smoke_status == "cannot_confirm": reason = smoke_parsed.get("reason") or "smoke_tester: cannot confirm — no proof of working service" blocked_reason = f"smoke_test: cannot_confirm — {reason}" models.update_task( conn, task_id, status="blocked", blocked_reason=blocked_reason, blocked_agent_role="smoke_tester", blocked_pipeline_step=str(i + 1), ) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) try: models.write_log( conn, pipeline["id"], f"Smoke test cannot_confirm: {reason}", level="WARN", extra={"role": "smoke_tester", "reason": reason}, ) except Exception: pass return { "success": False, "error": blocked_reason, "blocked_by": "smoke_tester", "blocked_reason": blocked_reason, "steps_completed": i + 1, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # status == 'confirmed': smoke test passed, continue pipeline # Constitutional validator: gate before implementation (KIN-DOCS-001) if role == "constitutional_validator" and result["success"] and not dry_run: cv_output = result.get("output") or result.get("raw_output") or "" cv_parsed = None try: if isinstance(cv_output, dict): cv_parsed = cv_output elif isinstance(cv_output, str): cv_parsed = _try_parse_json(cv_output) except Exception: pass if isinstance(cv_parsed, dict): cv_verdict = cv_parsed.get("verdict", "") if cv_verdict in ("changes_required", "escalated"): if cv_verdict == "escalated": reason = cv_parsed.get("escalation_reason") or "constitutional_validator: принципы конфликтуют — требуется решение директора" blocked_reason = f"constitutional_validator: escalated — {reason}" else: violations = cv_parsed.get("violations") or [] if violations: violations_summary = "; ".join( f"{v.get('principle', '?')} ({v.get('severity', '?')}): {v.get('description', '')}" for v in violations[:3] ) else: violations_summary = cv_parsed.get("summary") or "changes required" blocked_reason = f"constitutional_validator: changes_required — {violations_summary}" models.update_task( conn, task_id, status="blocked", blocked_reason=blocked_reason, blocked_agent_role="constitutional_validator", blocked_pipeline_step=str(i + 1), ) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) try: models.write_log( conn, pipeline["id"], f"Constitutional validator blocked pipeline: {blocked_reason}", level="WARN", extra={"role": "constitutional_validator", "verdict": cv_verdict, "reason": blocked_reason}, ) except Exception: pass return { "success": False, "error": blocked_reason, "blocked_by": "constitutional_validator", "blocked_reason": blocked_reason, "steps_completed": i + 1, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # verdict == 'approved': constitutional check passed, continue pipeline # Tech debt: create followup child task from dev agent output (KIN-128) if role in _TECH_DEBT_ROLES and result["success"] and not dry_run: try: _save_tech_debt_output(conn, project_id, task_id, result) except Exception: pass # Never block pipeline on tech_debt save errors # Department head: execute sub-pipeline planned by the dept head if _is_department_head(role) and result["success"] and not dry_run: # Determine next department for handoff routing _next_dept = None if i + 1 < len(steps): _next_role = steps[i + 1].get("role", "") if _is_department_head(_next_role): _next_dept = _next_role.replace("_head", "") dept_result = _execute_department_head_step( conn, task_id, project_id, parent_pipeline_id=pipeline["id"] if pipeline else None, step=step, dept_head_result=result, allow_write=allow_write, noninteractive=noninteractive, next_department=_next_dept, ) # Accumulate sub-pipeline costs total_cost += dept_result.get("cost_usd") or 0 total_tokens += dept_result.get("tokens_used") or 0 total_duration += dept_result.get("duration_seconds") or 0 if not dept_result.get("success"): # Sub-pipeline failed — handle as blocked results.append({"role": role, "_dept_sub": True, **dept_result}) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) blocked_reason = dept_result.get("blocked_reason") or dept_result.get("error") or f"Department {role} sub-pipeline failed" error_msg = f"Department {role} sub-pipeline failed: {dept_result.get('output', '')[:200]}" models.update_task(conn, task_id, status="blocked", blocked_reason=blocked_reason) return { "success": False, "error": error_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Track last worker role from sub-pipeline for auto_complete eligibility if dept_result.get("last_sub_role"): _last_sub_role = dept_result["last_sub_role"] # Override previous_output with dept handoff summary (not raw dept head JSON) previous_output = dept_result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) continue # Project-level auto-test: run tests after backend_dev/frontend_dev steps. # Enabled per project via auto_test_enabled flag (opt-in). # test_command priority: project.test_command (explicit) → auto-detect → skip. # On failure, loop fixer up to KIN_AUTO_TEST_MAX_ATTEMPTS times, then block. if ( not dry_run and role in _AUTO_TEST_ROLES and result["success"] and project_for_wt and project_for_wt.get("auto_test_enabled") and project_for_wt.get("path") ): p_path_str = str(Path(project_for_wt["path"]).expanduser()) p_test_cmd_override = project_for_wt.get("test_command") if p_test_cmd_override: p_test_cmd = p_test_cmd_override else: p_test_cmd = _detect_test_command(p_path_str, role=role) if p_test_cmd is None: # No test framework detected — skip without blocking pipeline _logger.info("auto-test: no test framework detected in %s, skipping", p_path_str) results.append({ "role": "_auto_test", "success": True, "output": "no test framework detected", "_project_test": True, "_skipped": True, }) else: max_auto_test_attempts = int(os.environ.get("KIN_AUTO_TEST_MAX_ATTEMPTS") or 3) auto_test_timeout = int(os.environ.get("KIN_AUTO_TEST_TIMEOUT") or 600) test_run = _run_project_tests(p_path_str, p_test_cmd, timeout=auto_test_timeout) results.append({"role": "_auto_test", "success": test_run["success"], "output": test_run["output"], "_project_test": True}) auto_test_attempt = 0 while not test_run["success"] and auto_test_attempt < max_auto_test_attempts: auto_test_attempt += 1 fix_context = ( f"Automated project test run ({p_test_cmd}) failed after your changes.\n" f"Test output:\n{test_run['output'][:4000]}\n" f"Fix the failing tests. Do NOT modify test files." ) fix_result = run_agent( conn, role, task_id, project_id, model=model, previous_output=fix_context, dry_run=False, allow_write=allow_write, noninteractive=noninteractive, ) total_cost += fix_result.get("cost_usd") or 0 total_tokens += fix_result.get("tokens_used") or 0 total_duration += fix_result.get("duration_seconds") or 0 results.append({**fix_result, "_auto_test_fix_attempt": auto_test_attempt}) test_run = _run_project_tests(p_path_str, p_test_cmd, timeout=auto_test_timeout) results.append({"role": "_auto_test", "success": test_run["success"], "output": test_run["output"], "_project_test": True, "_attempt": auto_test_attempt}) if not test_run["success"]: block_reason = ( f"Auto-test ({p_test_cmd}) failed after {auto_test_attempt} fix attempt(s). " f"Last output: {test_run['output'][:500]}" ) models.update_task(conn, task_id, status="blocked", blocked_reason=block_reason) if pipeline: models.update_pipeline(conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration) return { "success": False, "error": block_reason, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Auto-test loop: if tester step has auto_fix=true and tests failed, # call fix_role agent and re-run tester up to max_attempts times. if ( not dry_run and step.get("auto_fix") and role == "tester" and result["success"] and _is_test_failure(result) ): max_attempts = int(step.get("max_attempts", 3)) fix_role = step.get("fix_role", "backend_dev") fix_model = step.get("fix_model", model) attempt = 0 while attempt < max_attempts and _is_test_failure(result): attempt += 1 tester_output = result.get("raw_output") or result.get("output") or "" if isinstance(tester_output, (dict, list)): tester_output = json.dumps(tester_output, ensure_ascii=False) # Run fixer fix_result = run_agent( conn, fix_role, task_id, project_id, model=fix_model, previous_output=tester_output, dry_run=False, allow_write=allow_write, noninteractive=noninteractive, ) total_cost += fix_result.get("cost_usd") or 0 total_tokens += fix_result.get("tokens_used") or 0 total_duration += fix_result.get("duration_seconds") or 0 results.append({**fix_result, "_auto_fix_attempt": attempt}) # Re-run tester fix_output = fix_result.get("raw_output") or fix_result.get("output") or "" if isinstance(fix_output, (dict, list)): fix_output = json.dumps(fix_output, ensure_ascii=False) retest = run_agent( conn, role, task_id, project_id, model=model, previous_output=fix_output, dry_run=False, allow_write=allow_write, noninteractive=noninteractive, ) total_cost += retest.get("cost_usd") or 0 total_tokens += retest.get("tokens_used") or 0 total_duration += retest.get("duration_seconds") or 0 result = retest results.append({**result, "_auto_retest_attempt": attempt}) # Save final test result regardless of outcome try: final_output = result.get("raw_output") or result.get("output") or "" models.update_task(conn, task_id, test_result={ "output": final_output if isinstance(final_output, str) else str(final_output), "auto_fix_attempts": attempt, "passed": not _is_test_failure(result), }) except Exception: pass # Chain output to next step previous_output = result.get("raw_output") or result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) # Pipeline completed if pipeline and not dry_run: models.update_pipeline( conn, pipeline["id"], status="completed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) # KIN-084: log pipeline completed try: models.write_log( conn, pipeline["id"], f"Pipeline completed: {len(steps)} steps, cost=${total_cost:.4f}, tokens={total_tokens}, duration={total_duration}s", extra={ "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "steps_count": len(steps), }, ) except Exception: pass task_modules = models.get_modules(conn, project_id) # Compute changed files for hook filtering (frontend build trigger) changed_files: list[str] | None = None project = models.get_project(conn, project_id) if project and project.get("path"): p_path = Path(project["path"]).expanduser() if p_path.is_dir(): changed_files = _get_changed_files(str(p_path)) # Fix D: retroactive suspicious guard — if no files changed after pipeline, # mark successful dev-agent steps as suspicious (agent may have not written files) if changed_files is not None and not changed_files: for _r in results: if (_r.get("role") in _DEV_GUARD_ROLES and _r.get("success") and not _r.get("suspicious")): _r["suspicious"] = True _r["suspicious_reason"] = "agent reported success but no file changes detected" _logger.warning( "KIN-117: suspicious step %s — no file changes after pipeline", _r.get("role"), ) last_role = steps[-1].get("role", "") if steps else "" # For dept pipelines: if last step is a _head, check the last worker in its sub-pipeline effective_last_role = _last_sub_role if (_is_department_head(last_role) and _last_sub_role) else last_role auto_eligible = effective_last_role in {"tester", "reviewer"} # Guard: re-fetch current status — user may have manually changed it while pipeline ran current_task = models.get_task(conn, task_id) current_status = current_task.get("status") if current_task else None # KIN-116: detect destructive ops — force review even in auto mode destructive_ops = _detect_destructive_operations(results) if results else [] if destructive_ops and mode == "auto_complete": mode = "review" # Downgrade to review for this pipeline run _logger.warning( "KIN-116: destructive operations detected in pipeline output — " "forcing task %s to review. Patterns: %s", task_id, destructive_ops[:5], ) try: models.log_audit_event( conn, event_type="destructive_ops_detected", task_id=task_id, step_id="runner", reason=f"Destructive operations detected: {destructive_ops[:5]}", project_id=project_id, ) except Exception: pass if current_status in ("done", "cancelled"): pass # User finished manually — don't overwrite elif mode == "auto_complete" and auto_eligible: # KIN-133: gate check — if final gate agent rejects, block instead of closing _gate_result = _find_gate_result(results, effective_last_role) _cannot_close = ( _parse_gate_cannot_close(_gate_result, effective_last_role) if _gate_result else None ) if _cannot_close is not None: _block_reason = _cannot_close["reason"] models.update_task( conn, task_id, status="blocked", blocked_reason=_block_reason, blocked_agent_role=effective_last_role, blocked_pipeline_step=str(len(steps)), ) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) try: models.write_log( conn, pipeline["id"] if pipeline else None, f"Gate cannot_close: {effective_last_role} refused to approve — {_block_reason}", level="WARN", extra={"role": effective_last_role, "reason": _block_reason}, ) except Exception: pass try: from core.telegram import send_telegram_escalation _project = models.get_project(conn, project_id) _project_name = _project["name"] if _project else project_id _sent = send_telegram_escalation( task_id=task_id, project_name=_project_name, agent_role=effective_last_role, reason=_block_reason, pipeline_step=str(len(steps)), ) if _sent: models.mark_telegram_sent(conn, task_id) except Exception: pass return { "success": False, "error": f"Gate {effective_last_role} refused to close task: {_block_reason}", "blocked_by": effective_last_role, "blocked_reason": _block_reason, "steps_completed": len(steps), "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Auto-complete mode: gate approved — close task immediately models.update_task(conn, task_id, status="done") # KIN-084: log task status try: models.write_log( conn, pipeline["id"], "Task status: done", extra={"task_status": "done", "mode": mode}, ) except Exception: pass try: run_hooks(conn, project_id, task_id, event="task_auto_approved", task_modules=task_modules) except Exception: pass try: run_hooks(conn, project_id, task_id, event="task_done", task_modules=task_modules) except Exception: pass # Auto followup: generate tasks, auto-resolve permission issues. # Guard: skip for followup-sourced tasks to prevent infinite recursion. task_brief = task.get("brief") or {} is_followup_task = ( isinstance(task_brief, dict) and str(task_brief.get("source", "")).startswith("followup:") ) if not is_followup_task: try: from core.followup import generate_followups, auto_resolve_pending_actions fu_result = generate_followups(conn, task_id) if fu_result.get("pending_actions"): auto_resolve_pending_actions(conn, task_id, fu_result["pending_actions"]) except Exception: pass else: # Review mode: wait for manual approval models.update_task(conn, task_id, status="review", execution_mode="review") # KIN-084: log task status try: models.write_log( conn, pipeline["id"], "Task status: review", extra={"task_status": "review", "mode": mode}, ) except Exception: pass # Run post-pipeline hooks (failures don't affect pipeline status) try: run_hooks(conn, project_id, task_id, event="pipeline_completed", task_modules=task_modules, changed_files=changed_files) except Exception: pass # Hook errors must never block pipeline completion # Auto-learning: extract decisions from pipeline results if results: try: _run_learning_extraction(conn, task_id, project_id, results) except Exception: pass # Learning errors must never block pipeline completion # Auto-commit changes after successful pipeline try: _run_autocommit(conn, task_id, project_id) except Exception: pass # Autocommit errors must never block pipeline completion return { "success": True, "steps_completed": len(steps), "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, "dry_run": dry_run, "mode": mode, }