""" Kin agent runner — launches Claude Code as subprocess with role-specific context. Each agent = separate process with isolated context. """ import json import os import sqlite3 import subprocess import time from pathlib import Path from typing import Any from core import models from core.context_builder import build_context, format_prompt from core.hooks import run_hooks def run_agent( conn: sqlite3.Connection, role: str, task_id: str, project_id: str, model: str = "sonnet", previous_output: str | None = None, brief_override: str | None = None, dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, ) -> dict: """Run a single Claude Code agent as a subprocess. 1. Build context from DB 2. Format prompt with role template 3. Run: claude -p "{prompt}" --output-format json 4. Log result to agent_logs 5. Return {success, output, tokens_used, duration_seconds, cost_usd} """ # Build context ctx = build_context(conn, task_id, role, project_id) if previous_output: ctx["previous_output"] = previous_output if brief_override: if ctx.get("task"): ctx["task"]["brief"] = brief_override prompt = format_prompt(ctx, role) if dry_run: return { "success": True, "output": None, "prompt": prompt, "role": role, "model": model, "dry_run": True, } # Determine working directory project = models.get_project(conn, project_id) working_dir = None if project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security"): project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run claude subprocess start = time.monotonic() result = _run_claude(prompt, model=model, working_dir=working_dir, allow_write=allow_write, noninteractive=noninteractive) duration = int(time.monotonic() - start) # Parse output — ensure output_text is always a string for DB storage raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) output_text = raw_output success = result["returncode"] == 0 parsed_output = _try_parse_json(output_text) # Log FULL output to DB (no truncation) models.log_agent_run( conn, project_id=project_id, task_id=task_id, agent_role=role, action="execute", input_summary=f"task={task_id}, model={model}", output_summary=output_text or None, tokens_used=result.get("tokens_used"), model=model, cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) return { "success": success, "output": parsed_output if parsed_output else output_text, "raw_output": output_text, "role": role, "model": model, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } def _run_claude( prompt: str, model: str = "sonnet", working_dir: str | None = None, allow_write: bool = False, noninteractive: bool = False, ) -> dict: """Execute claude CLI as subprocess. Returns dict with output, returncode, etc.""" cmd = [ "claude", "-p", prompt, "--output-format", "json", "--model", model, ] if allow_write: cmd.append("--dangerously-skip-permissions") is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1" timeout = 300 if is_noninteractive else 600 try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, cwd=working_dir, stdin=subprocess.DEVNULL if is_noninteractive else None, ) except FileNotFoundError: return { "output": "", "error": "claude CLI not found in PATH", "returncode": 127, } except subprocess.TimeoutExpired: return { "output": "", "error": f"Agent timed out after {timeout}s", "returncode": 124, } # Always preserve the full raw stdout raw_stdout = proc.stdout or "" result: dict[str, Any] = { "output": raw_stdout, "error": proc.stderr if proc.returncode != 0 else None, "returncode": proc.returncode, } # Parse JSON wrapper from claude --output-format json # Extract metadata (tokens, cost) but keep output as the full content string parsed = _try_parse_json(raw_stdout) if isinstance(parsed, dict): result["tokens_used"] = parsed.get("usage", {}).get("total_tokens") result["cost_usd"] = parsed.get("cost_usd") # Extract the agent's actual response, converting to string if needed content = parsed.get("result") or parsed.get("content") if content is not None: result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) return result def _try_parse_json(text: str) -> Any: """Try to parse JSON from text. Returns parsed obj or None.""" text = text.strip() if not text: return None # Direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Try to find JSON block in markdown code fences import re m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL) if m: try: return json.loads(m.group(1)) except json.JSONDecodeError: pass # Try to find first { ... } or [ ... ] for start_char, end_char in [("{", "}"), ("[", "]")]: start = text.find(start_char) if start >= 0: # Find matching close depth = 0 for i in range(start, len(text)): if text[i] == start_char: depth += 1 elif text[i] == end_char: depth -= 1 if depth == 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: break return None # --------------------------------------------------------------------------- # Backlog audit # --------------------------------------------------------------------------- PROMPTS_DIR = Path(__file__).parent / "prompts" _LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"} def run_audit( conn: sqlite3.Connection, project_id: str, noninteractive: bool = False, auto_apply: bool = False, ) -> dict: """Audit pending tasks against the actual codebase. auto_apply=True: marks already_done tasks as done in DB. auto_apply=False: returns results only (for API/GUI). Returns {success, already_done, still_pending, unclear, duration_seconds, ...} """ project = models.get_project(conn, project_id) if not project: return {"success": False, "error": f"Project '{project_id}' not found"} pending = models.list_tasks(conn, project_id=project_id, status="pending") if not pending: return { "success": True, "already_done": [], "still_pending": [], "unclear": [], "message": "No pending tasks to audit", } # Build prompt prompt_path = PROMPTS_DIR / "backlog_audit.md" template = prompt_path.read_text() if prompt_path.exists() else ( "You are a QA analyst. Check if pending tasks are already done in the code." ) task_list = [ {"id": t["id"], "title": t["title"], "brief": t.get("brief")} for t in pending ] sections = [ template, "", f"## Project: {project['id']} — {project['name']}", ] if project.get("tech_stack"): sections.append(f"Tech stack: {', '.join(project['tech_stack'])}") sections.append(f"Path: {project['path']}") sections.append("") sections.append(f"## Pending tasks ({len(task_list)}):") sections.append(json.dumps(task_list, ensure_ascii=False, indent=2)) sections.append("") language = project.get("language", "ru") lang_name = _LANG_NAMES.get(language, language) sections.append("## Language") sections.append(f"ALWAYS respond in {lang_name}.") sections.append("") prompt = "\n".join(sections) # Determine working dir working_dir = None project_path = Path(project["path"]).expanduser() if project_path.is_dir(): working_dir = str(project_path) # Run agent — allow_write=True so claude can use Read/Bash tools # without interactive permission prompts (critical for noninteractive mode) start = time.monotonic() result = _run_claude(prompt, model="sonnet", working_dir=working_dir, allow_write=True, noninteractive=noninteractive) duration = int(time.monotonic() - start) raw_output = result.get("output", "") if not isinstance(raw_output, str): raw_output = json.dumps(raw_output, ensure_ascii=False) success = result["returncode"] == 0 # Log to agent_logs models.log_agent_run( conn, project_id=project_id, task_id=None, agent_role="backlog_audit", action="audit", input_summary=f"project={project_id}, pending_tasks={len(pending)}", output_summary=raw_output or None, tokens_used=result.get("tokens_used"), model="sonnet", cost_usd=result.get("cost_usd"), success=success, error_message=result.get("error") if not success else None, duration_seconds=duration, ) if not success: return { "success": False, "error": result.get("error", "Agent failed"), "raw_output": raw_output, "duration_seconds": duration, } # Parse structured output parsed = _try_parse_json(raw_output) if not isinstance(parsed, dict): return { "success": False, "error": "Agent returned non-JSON output", "raw_output": raw_output, "duration_seconds": duration, } already_done = parsed.get("already_done", []) # Auto-apply: mark already_done tasks as done in DB applied = [] if auto_apply and already_done: for item in already_done: tid = item.get("id") if tid: t = models.get_task(conn, tid) if t and t["project_id"] == project_id and t["status"] == "pending": models.update_task(conn, tid, status="done") applied.append(tid) return { "success": True, "already_done": already_done, "still_pending": parsed.get("still_pending", []), "unclear": parsed.get("unclear", []), "applied": applied, "duration_seconds": duration, "tokens_used": result.get("tokens_used"), "cost_usd": result.get("cost_usd"), } # --------------------------------------------------------------------------- # Pipeline executor # --------------------------------------------------------------------------- def run_pipeline( conn: sqlite3.Connection, task_id: str, steps: list[dict], dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, ) -> dict: """Execute a multi-step pipeline of agents. steps = [ {"role": "debugger", "model": "opus", "brief": "..."}, {"role": "tester", "depends_on": "debugger", "brief": "..."}, ] Returns {success, steps_completed, total_cost, total_tokens, total_duration, results} """ task = models.get_task(conn, task_id) if not task: return {"success": False, "error": f"Task '{task_id}' not found"} project_id = task["project_id"] # Determine route type from steps or task brief route_type = "custom" if task.get("brief") and isinstance(task["brief"], dict): route_type = task["brief"].get("route_type", "custom") or "custom" # Create pipeline in DB pipeline = None if not dry_run: pipeline = models.create_pipeline( conn, task_id, project_id, route_type, steps, ) models.update_task(conn, task_id, status="in_progress") results = [] total_cost = 0.0 total_tokens = 0 total_duration = 0 previous_output = None for i, step in enumerate(steps): role = step["role"] model = step.get("model", "sonnet") brief = step.get("brief") result = run_agent( conn, role, task_id, project_id, model=model, previous_output=previous_output, brief_override=brief, dry_run=dry_run, allow_write=allow_write, noninteractive=noninteractive, ) results.append(result) if dry_run: continue # Accumulate stats total_cost += result.get("cost_usd") or 0 total_tokens += result.get("tokens_used") or 0 total_duration += result.get("duration_seconds") or 0 if not result["success"]: # Pipeline failed — stop and mark as failed if pipeline: models.update_pipeline( conn, pipeline["id"], status="failed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) models.update_task(conn, task_id, status="blocked") return { "success": False, "error": f"Step {i+1}/{len(steps)} ({role}) failed", "steps_completed": i, "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, } # Chain output to next step previous_output = result.get("raw_output") or result.get("output") if isinstance(previous_output, (dict, list)): previous_output = json.dumps(previous_output, ensure_ascii=False) # Pipeline completed if pipeline and not dry_run: models.update_pipeline( conn, pipeline["id"], status="completed", total_cost_usd=total_cost, total_tokens=total_tokens, total_duration_seconds=total_duration, ) models.update_task(conn, task_id, status="review") # Run post-pipeline hooks (failures don't affect pipeline status) task_modules = models.get_modules(conn, project_id) run_hooks(conn, project_id, task_id, event="pipeline_completed", task_modules=task_modules) return { "success": True, "steps_completed": len(steps), "results": results, "total_cost_usd": total_cost, "total_tokens": total_tokens, "total_duration_seconds": total_duration, "pipeline_id": pipeline["id"] if pipeline else None, "dry_run": dry_run, }