""" Kin agent runner — launches Claude Code as subprocess with role-specific context. Each agent = separate process with isolated context. """ import json import sqlite3 import subprocess import time from pathlib import Path from typing import Any from core import models from core.context_builder import build_context, format_prompt def run_agent( conn: sqlite3.Connection, role: str, task_id: str, project_id: str, model: str = "sonnet", previous_output: str | None = None, brief_override: str | None = None, dry_run: bool = False, allow_write: bool = False, ) -> dict: """Run a single Claude Code agent as a subprocess. 1. Build context from DB 2. Format prompt with role template 3. Run: claude -p "{prompt}" --output-format json 4. Log result to agent_logs 5. Return {success, output, tokens_used, duration_seconds, cost_usd} """ # Build context ctx = build_context(conn, task_id, role, project_id) if previous_output: ctx["previous_output"] = previous_output if brief_override: if ctx.get("task"): ctx["task"]["brief"] = brief_override prompt = format_prompt(ctx, role) if dry_run: return { "success": True, "output": None, "prompt": prompt, "role": role, "model": model, "dry_run": True, } # Determine working directory project = models.get_project(conn, project_id) working_dir = None if project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security"): project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run claude subprocess start = time.monotonic() result = _run_claude(prompt, model=model, working_dir=working_dir, allow_write=allow_write) duration = int(time.monotonic() - start) # Parse output — ensure output_text is always a string for DB storage raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) output_text = raw_output success = result["returncode"] == 0 parsed_output = _try_parse_json(output_text) # Log FULL output to DB (no truncation) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=output_text or None, tokens_used=result.get("tokens_used"), model=model, cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) return { "success": success, "output": parsed_output if parsed_output else output_text, "raw_output": output_text, "role": role, "model": model, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } def _run_claude( prompt: str, model: str = "sonnet", working_dir: str | None = None, allow_write: bool = False, ) -> dict: """Execute claude CLI as subprocess. Returns dict with output, returncode, etc.""" cmd = [ "claude", "-p", prompt, "--output-format", "json", "--model", model, ] if allow_write: cmd.append("--dangerously-skip-permissions") try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=600, # 10 min max cwd=working_dir, ) except FileNotFoundError: return { "output": "", "error": "claude CLI not found in PATH", "returncode": 127, } except subprocess.TimeoutExpired: return { "output": "", "error": "Agent timed out after 600s", "returncode": 124, } # Always preserve the full raw stdout raw_stdout = proc.stdout or "" result: dict[str, Any] = { "output": raw_stdout, "error": proc.stderr if proc.returncode != 0 else None, "returncode": proc.returncode, } # Parse JSON wrapper from claude --output-format json # Extract metadata (tokens, cost) but keep output as the full content string parsed = _try_parse_json(raw_stdout) if isinstance(parsed, dict): result["tokens_used"] = parsed.get("usage", {}).get("total_tokens") result["cost_usd"] = parsed.get("cost_usd") # Extract the agent's actual response, converting to string if needed content = parsed.get("result") or parsed.get("content") if content is not None: result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) return result def _try_parse_json(text: str) -> Any: """Try to parse JSON from text. Returns parsed obj or None.""" text = text.strip() if not text: return None # Direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Try to find JSON block in markdown code fences import re m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL) if m: try: return json.loads(m.group(1)) except json.JSONDecodeError: pass # Try to find first { ... } or [ ... ] for start_char, end_char in [("{", "}"), ("[", "]")]: start = text.find(start_char) if start >= 0: # Find matching close depth = 0 for i in range(start, len(text)): if text[i] == start_char: depth += 1 elif text[i] == end_char: depth -= 1 if depth == 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: break return None # --------------------------------------------------------------------------- # Pipeline executor # --------------------------------------------------------------------------- def run_pipeline( conn: sqlite3.Connection, task_id: str, steps: list[dict], dry_run: bool = False, allow_write: bool = False, ) -> dict: """Execute a multi-step pipeline of agents. steps = [ {"role": "debugger", "model": "opus", "brief": "..."}, {"role": "tester", "depends_on": "debugger", "brief": "..."}, ] Returns {success, steps_completed, total_cost, total_tokens, total_duration, results} """ task = models.get_task(conn, task_id) if not task: return {"success": False, "error": f"Task '{task_id}' not found"} project_id = task["project_id"] # Determine route type from steps or task brief route_type = "custom" if task.get("brief") and isinstance(task["brief"], dict): route_type = task["brief"].get("route_type", "custom") or "custom" # Create pipeline in DB pipeline = None if not dry_run: pipeline = models.create_pipeline( conn, task_id, project_id, route_type, steps, ) models.update_task(conn, task_id, status="in_progress") results = [] total_cost = 0.0 total_tokens = 0 total_duration = 0 previous_output = None for i, step in enumerate(steps): role = step["role"] model = step.get("model", "sonnet") brief = step.get("brief") result = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=dry_run, allow_write=allow_write, ) results.append(result) if dry_run: continue # Accumulate stats total_cost += result.get("cost_usd") or 0 total_tokens += result.get("tokens_used") or 0 total_duration += result.get("duration_seconds") or 0 if not result["success"]: # Pipeline failed — stop and mark as failed if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) models.update_task(conn, task_id, status="blocked") return { "success": False, "error": f"Step {i+1}/{len(steps)} ({role}) failed", "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Chain output to next step previous_output = result.get("raw_output") or result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) # Pipeline completed if pipeline and not dry_run: models.update_pipeline( conn, pipeline["id"], status="completed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) models.update_task(conn, task_id, status="review") return { "success": True, "steps_completed": len(steps), "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, "dry_run": dry_run, }