""" Kin agent runner — launches Claude Code as subprocess with role-specific context. Each agent = separate process with isolated context. """ import json import os import sqlite3 import subprocess import time from pathlib import Path from typing import Any import re from core import models from core.context_builder import build_context, format_prompt from core.hooks import run_hooks def run_agent( conn: sqlite3.Connection, role: str, task_id: str, project_id: str, model: str = "sonnet", previous_output: str | None = None, brief_override: str | None = None, dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, ) -> dict: """Run a single Claude Code agent as a subprocess. 1. Build context from DB 2. Format prompt with role template 3. Run: claude -p "{prompt}" --output-format json 4. Log result to agent_logs 5. Return {success, output, tokens_used, duration_seconds, cost_usd} """ # Build context ctx = build_context(conn, task_id, role, project_id) if previous_output: ctx["previous_output"] = previous_output if brief_override: if ctx.get("task"): ctx["task"]["brief"] = brief_override prompt = format_prompt(ctx, role) if dry_run: return { "success": True, "output": None, "prompt": prompt, "role": role, "model": model, "dry_run": True, } # Determine working directory project = models.get_project(conn, project_id) working_dir = None if project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security"): project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run claude subprocess start = time.monotonic() result = _run_claude(prompt, model=model, working_dir=working_dir, allow_write=allow_write, noninteractive=noninteractive) duration = int(time.monotonic() - start) # Parse output — ensure output_text is always a string for DB storage raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) output_text = raw_output success = result["returncode"] == 0 parsed_output = _try_parse_json(output_text) # Log FULL output to DB (no truncation) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=output_text or None, tokens_used=result.get("tokens_used"), model=model, cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) return { "success": success, "error": result.get("error") if not success else None, "output": parsed_output if parsed_output else output_text, "raw_output": output_text, "role": role, "model": model, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } def _run_claude( prompt: str, model: str = "sonnet", working_dir: str | None = None, allow_write: bool = False, noninteractive: bool = False, ) -> dict: """Execute claude CLI as subprocess. Returns dict with output, returncode, etc.""" cmd = [ "claude", "-p", prompt, "--output-format", "json", "--model", model, ] if allow_write: cmd.append("--dangerously-skip-permissions") is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1" timeout = 300 if is_noninteractive else 600 try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, cwd=working_dir, stdin=subprocess.DEVNULL if is_noninteractive else None, ) except FileNotFoundError: return { "output": "", "error": "claude CLI not found in PATH", "returncode": 127, } except subprocess.TimeoutExpired: return { "output": "", "error": f"Agent timed out after {timeout}s", "returncode": 124, } # Always preserve the full raw stdout raw_stdout = proc.stdout or "" result: dict[str, Any] = { "output": raw_stdout, "error": proc.stderr or None, # preserve stderr always for diagnostics "empty_output": not raw_stdout.strip(), "returncode": proc.returncode, } # Parse JSON wrapper from claude --output-format json # Extract metadata (tokens, cost) but keep output as the full content string parsed = _try_parse_json(raw_stdout) if isinstance(parsed, dict): result["tokens_used"] = parsed.get("usage", {}).get("total_tokens") result["cost_usd"] = parsed.get("cost_usd") # Extract the agent's actual response, converting to string if needed content = parsed.get("result") or parsed.get("content") if content is not None: result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) return result def _try_parse_json(text: str) -> Any: """Try to parse JSON from text. Returns parsed obj or None.""" text = text.strip() if not text: return None # Direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Try to find JSON block in markdown code fences import re m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL) if m: try: return json.loads(m.group(1)) except json.JSONDecodeError: pass # Try to find first { ... } or [ ... ] for start_char, end_char in [("{", "}"), ("[", "]")]: start = text.find(start_char) if start >= 0: # Find matching close depth = 0 for i in range(start, len(text)): if text[i] == start_char: depth += 1 elif text[i] == end_char: depth -= 1 if depth == 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: break return None # --------------------------------------------------------------------------- # Backlog audit # --------------------------------------------------------------------------- PROMPTS_DIR = Path(__file__).parent / "prompts" _LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"} def run_audit( conn: sqlite3.Connection, project_id: str, noninteractive: bool = False, auto_apply: bool = False, ) -> dict: """Audit pending tasks against the actual codebase. auto_apply=True: marks already_done tasks as done in DB. auto_apply=False: returns results only (for API/GUI). Returns {success, already_done, still_pending, unclear, duration_seconds, ...} """ project = models.get_project(conn, project_id) if not project: return {"success": False, "error": f"Project '{project_id}' not found"} pending = models.list_tasks(conn, project_id=project_id, status="pending") if not pending: return { "success": True, "already_done": [], "still_pending": [], "unclear": [], "message": "No pending tasks to audit", } # Build prompt prompt_path = PROMPTS_DIR / "backlog_audit.md" template = prompt_path.read_text() if prompt_path.exists() else ( "You are a QA analyst. Check if pending tasks are already done in the code." ) task_list = [ {"id": t["id"], "title": t["title"], "brief": t.get("brief")} for t in pending ] sections = [ template, "", f"## Project: {project['id']} — {project['name']}", ] if project.get("tech_stack"): sections.append(f"Tech stack: {', '.join(project['tech_stack'])}") sections.append(f"Path: {project['path']}") sections.append("") sections.append(f"## Pending tasks ({len(task_list)}):") sections.append(json.dumps(task_list, ensure_ascii=False, indent=2)) sections.append("") language = project.get("language", "ru") lang_name = _LANG_NAMES.get(language, language) sections.append("## Language") sections.append(f"ALWAYS respond in {lang_name}.") sections.append("") prompt = "\n".join(sections) # Determine working dir working_dir = None project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run agent — allow_write=True so claude can use Read/Bash tools # without interactive permission prompts (critical for noninteractive mode) start = time.monotonic() result = _run_claude(prompt, model="sonnet", working_dir=working_dir, allow_write=True, noninteractive=noninteractive) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=None, agent_role="backlog_audit", action="audit", input_summary=f"project={project_id}, pending_tasks={len(pending)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) if not success: return { "success": False, "error": result.get("error", "Agent failed"), "raw_output": raw_output, "duration_seconds": duration, } # Parse structured output parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return { "success": False, "error": "Agent returned non-JSON output", "raw_output": raw_output, "duration_seconds": duration, } already_done = parsed.get("already_done", []) # Auto-apply: mark already_done tasks as done in DB applied = [] if auto_apply and already_done: for item in already_done: tid = item.get("id") if tid: t = models.get_task(conn, tid) if t and t["project_id"] == project_id and t["status"] == "pending": models.update_task(conn, tid, status="done") applied.append(tid) return { "success": True, "already_done": already_done, "still_pending": parsed.get("still_pending", []), "unclear": parsed.get("unclear", []), "applied": applied, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } # --------------------------------------------------------------------------- # Permission error detection # --------------------------------------------------------------------------- def _is_permission_error(result: dict) -> bool: """Return True if agent result indicates a permission/write failure.""" from core.followup import PERMISSION_PATTERNS output = (result.get("raw_output") or result.get("output") or "") if not isinstance(output, str): output = json.dumps(output, ensure_ascii=False) error = result.get("error") or "" text = output + " " + error return any(re.search(p, text) for p in PERMISSION_PATTERNS) # --------------------------------------------------------------------------- # Pipeline executor # --------------------------------------------------------------------------- def run_pipeline( conn: sqlite3.Connection, task_id: str, steps: list[dict], dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, ) -> dict: """Execute a multi-step pipeline of agents. steps = [ {"role": "debugger", "model": "opus", "brief": "..."}, {"role": "tester", "depends_on": "debugger", "brief": "..."}, ] Returns {success, steps_completed, total_cost, total_tokens, total_duration, results} """ task = models.get_task(conn, task_id) if not task: return {"success": False, "error": f"Task '{task_id}' not found"} project_id = task["project_id"] # Determine route type from steps or task brief route_type = "custom" if task.get("brief") and isinstance(task["brief"], dict): route_type = task["brief"].get("route_type", "custom") or "custom" # Determine execution mode (auto vs review) mode = models.get_effective_mode(conn, project_id, task_id) # Create pipeline in DB pipeline = None if not dry_run: pipeline = models.create_pipeline( conn, task_id, project_id, route_type, steps, ) models.update_task(conn, task_id, status="in_progress") results = [] total_cost = 0.0 total_tokens = 0 total_duration = 0 previous_output = None for i, step in enumerate(steps): role = step["role"] model = step.get("model", "sonnet") brief = step.get("brief") try: result = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=dry_run, allow_write=allow_write, noninteractive=noninteractive, ) except Exception as exc: exc_msg = f"Step {i+1}/{len(steps)} ({role}) raised exception: {exc}" if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=None, success=False, error_message=exc_msg, ) models.update_task(conn, task_id, status="blocked", blocked_reason=exc_msg) return { "success": False, "error": exc_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } if dry_run: results.append(result) continue # Accumulate stats total_cost += result.get("cost_usd") or 0 total_tokens += result.get("tokens_used") or 0 total_duration += result.get("duration_seconds") or 0 if not result["success"]: # Auto mode: retry once with allow_write on permission error if mode == "auto" and not allow_write and _is_permission_error(result): task_modules = models.get_modules(conn, project_id) try: run_hooks(conn, project_id, task_id, event="task_permission_retry", task_modules=task_modules) except Exception: pass retry = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=False, allow_write=True, noninteractive=noninteractive, ) allow_write = True # subsequent steps also with allow_write total_cost += retry.get("cost_usd") or 0 total_tokens += retry.get("tokens_used") or 0 total_duration += retry.get("duration_seconds") or 0 if retry["success"]: result = retry if not result["success"]: # Still failed — block regardless of mode results.append(result) if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) agent_error = result.get("error") or "" error_msg = f"Step {i+1}/{len(steps)} ({role}) failed" if agent_error: error_msg += f": {agent_error}" models.update_task(conn, task_id, status="blocked", blocked_reason=error_msg) return { "success": False, "error": error_msg, "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } results.append(result) # Chain output to next step previous_output = result.get("raw_output") or result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) # Pipeline completed if pipeline and not dry_run: models.update_pipeline( conn, pipeline["id"], status="completed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) task_modules = models.get_modules(conn, project_id) if mode == "auto": # Auto mode: skip review, approve immediately models.update_task(conn, task_id, status="done") try: run_hooks(conn, project_id, task_id, event="task_auto_approved", task_modules=task_modules) except Exception: pass try: run_hooks(conn, project_id, task_id, event="task_done", task_modules=task_modules) except Exception: pass # Auto followup: generate tasks, auto-resolve permission issues. # Guard: skip for followup-sourced tasks to prevent infinite recursion. task_brief = task.get("brief") or {} is_followup_task = ( isinstance(task_brief, dict) and str(task_brief.get("source", "")).startswith("followup:") ) if not is_followup_task: try: from core.followup import generate_followups, auto_resolve_pending_actions fu_result = generate_followups(conn, task_id) if fu_result.get("pending_actions"): auto_resolve_pending_actions(conn, task_id, fu_result["pending_actions"]) except Exception: pass else: # Review mode: wait for manual approval models.update_task(conn, task_id, status="review") # Run post-pipeline hooks (failures don't affect pipeline status) try: run_hooks(conn, project_id, task_id, event="pipeline_completed", task_modules=task_modules) except Exception: pass # Hook errors must never block pipeline completion return { "success": True, "steps_completed": len(steps), "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, "dry_run": dry_run, "mode": mode, }