kin/agents/runner.py
Gros Frumos 3cb516193b feat(KIN-012): auto followup generation and pending_actions auto-resolution
Auto mode now calls generate_followups() after task_auto_approved hook.
Permission-blocked followup items are auto-resolved: rerun first, fallback
to manual_task on failure. Recursion guard skips followup-sourced tasks.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 19:49:34 +02:00

564 lines
19 KiB
Python

"""
Kin agent runner — launches Claude Code as subprocess with role-specific context.
Each agent = separate process with isolated context.
"""
import json
import os
import sqlite3
import subprocess
import time
from pathlib import Path
from typing import Any
import re
from core import models
from core.context_builder import build_context, format_prompt
from core.hooks import run_hooks
def run_agent(
conn: sqlite3.Connection,
role: str,
task_id: str,
project_id: str,
model: str = "sonnet",
previous_output: str | None = None,
brief_override: str | None = None,
dry_run: bool = False,
allow_write: bool = False,
noninteractive: bool = False,
) -> dict:
"""Run a single Claude Code agent as a subprocess.
1. Build context from DB
2. Format prompt with role template
3. Run: claude -p "{prompt}" --output-format json
4. Log result to agent_logs
5. Return {success, output, tokens_used, duration_seconds, cost_usd}
"""
# Build context
ctx = build_context(conn, task_id, role, project_id)
if previous_output:
ctx["previous_output"] = previous_output
if brief_override:
if ctx.get("task"):
ctx["task"]["brief"] = brief_override
prompt = format_prompt(ctx, role)
if dry_run:
return {
"success": True,
"output": None,
"prompt": prompt,
"role": role,
"model": model,
"dry_run": True,
}
# Determine working directory
project = models.get_project(conn, project_id)
working_dir = None
if project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security"):
project_path = Path(project["path"]).expanduser()
if project_path.is_dir():
working_dir = str(project_path)
# Run claude subprocess
start = time.monotonic()
result = _run_claude(prompt, model=model, working_dir=working_dir,
allow_write=allow_write, noninteractive=noninteractive)
duration = int(time.monotonic() - start)
# Parse output — ensure output_text is always a string for DB storage
raw_output = result.get("output", "")
if not isinstance(raw_output, str):
raw_output = json.dumps(raw_output, ensure_ascii=False)
output_text = raw_output
success = result["returncode"] == 0
parsed_output = _try_parse_json(output_text)
# Log FULL output to DB (no truncation)
models.log_agent_run(
conn,
project_id=project_id,
task_id=task_id,
agent_role=role,
action="execute",
input_summary=f"task={task_id}, model={model}",
output_summary=output_text or None,
tokens_used=result.get("tokens_used"),
model=model,
cost_usd=result.get("cost_usd"),
success=success,
error_message=result.get("error") if not success else None,
duration_seconds=duration,
)
return {
"success": success,
"output": parsed_output if parsed_output else output_text,
"raw_output": output_text,
"role": role,
"model": model,
"duration_seconds": duration,
"tokens_used": result.get("tokens_used"),
"cost_usd": result.get("cost_usd"),
}
def _run_claude(
prompt: str,
model: str = "sonnet",
working_dir: str | None = None,
allow_write: bool = False,
noninteractive: bool = False,
) -> dict:
"""Execute claude CLI as subprocess. Returns dict with output, returncode, etc."""
cmd = [
"claude",
"-p", prompt,
"--output-format", "json",
"--model", model,
]
if allow_write:
cmd.append("--dangerously-skip-permissions")
is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1"
timeout = 300 if is_noninteractive else 600
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
cwd=working_dir,
stdin=subprocess.DEVNULL if is_noninteractive else None,
)
except FileNotFoundError:
return {
"output": "",
"error": "claude CLI not found in PATH",
"returncode": 127,
}
except subprocess.TimeoutExpired:
return {
"output": "",
"error": f"Agent timed out after {timeout}s",
"returncode": 124,
}
# Always preserve the full raw stdout
raw_stdout = proc.stdout or ""
result: dict[str, Any] = {
"output": raw_stdout,
"error": proc.stderr if proc.returncode != 0 else None,
"returncode": proc.returncode,
}
# Parse JSON wrapper from claude --output-format json
# Extract metadata (tokens, cost) but keep output as the full content string
parsed = _try_parse_json(raw_stdout)
if isinstance(parsed, dict):
result["tokens_used"] = parsed.get("usage", {}).get("total_tokens")
result["cost_usd"] = parsed.get("cost_usd")
# Extract the agent's actual response, converting to string if needed
content = parsed.get("result") or parsed.get("content")
if content is not None:
result["output"] = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False)
return result
def _try_parse_json(text: str) -> Any:
"""Try to parse JSON from text. Returns parsed obj or None."""
text = text.strip()
if not text:
return None
# Direct parse
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try to find JSON block in markdown code fences
import re
m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
# Try to find first { ... } or [ ... ]
for start_char, end_char in [("{", "}"), ("[", "]")]:
start = text.find(start_char)
if start >= 0:
# Find matching close
depth = 0
for i in range(start, len(text)):
if text[i] == start_char:
depth += 1
elif text[i] == end_char:
depth -= 1
if depth == 0:
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
break
return None
# ---------------------------------------------------------------------------
# Backlog audit
# ---------------------------------------------------------------------------
PROMPTS_DIR = Path(__file__).parent / "prompts"
_LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish",
"de": "German", "fr": "French"}
def run_audit(
conn: sqlite3.Connection,
project_id: str,
noninteractive: bool = False,
auto_apply: bool = False,
) -> dict:
"""Audit pending tasks against the actual codebase.
auto_apply=True: marks already_done tasks as done in DB.
auto_apply=False: returns results only (for API/GUI).
Returns {success, already_done, still_pending, unclear, duration_seconds, ...}
"""
project = models.get_project(conn, project_id)
if not project:
return {"success": False, "error": f"Project '{project_id}' not found"}
pending = models.list_tasks(conn, project_id=project_id, status="pending")
if not pending:
return {
"success": True,
"already_done": [],
"still_pending": [],
"unclear": [],
"message": "No pending tasks to audit",
}
# Build prompt
prompt_path = PROMPTS_DIR / "backlog_audit.md"
template = prompt_path.read_text() if prompt_path.exists() else (
"You are a QA analyst. Check if pending tasks are already done in the code."
)
task_list = [
{"id": t["id"], "title": t["title"], "brief": t.get("brief")}
for t in pending
]
sections = [
template,
"",
f"## Project: {project['id']}{project['name']}",
]
if project.get("tech_stack"):
sections.append(f"Tech stack: {', '.join(project['tech_stack'])}")
sections.append(f"Path: {project['path']}")
sections.append("")
sections.append(f"## Pending tasks ({len(task_list)}):")
sections.append(json.dumps(task_list, ensure_ascii=False, indent=2))
sections.append("")
language = project.get("language", "ru")
lang_name = _LANG_NAMES.get(language, language)
sections.append("## Language")
sections.append(f"ALWAYS respond in {lang_name}.")
sections.append("")
prompt = "\n".join(sections)
# Determine working dir
working_dir = None
project_path = Path(project["path"]).expanduser()
if project_path.is_dir():
working_dir = str(project_path)
# Run agent — allow_write=True so claude can use Read/Bash tools
# without interactive permission prompts (critical for noninteractive mode)
start = time.monotonic()
result = _run_claude(prompt, model="sonnet", working_dir=working_dir,
allow_write=True, noninteractive=noninteractive)
duration = int(time.monotonic() - start)
raw_output = result.get("output", "")
if not isinstance(raw_output, str):
raw_output = json.dumps(raw_output, ensure_ascii=False)
success = result["returncode"] == 0
# Log to agent_logs
models.log_agent_run(
conn,
project_id=project_id,
task_id=None,
agent_role="backlog_audit",
action="audit",
input_summary=f"project={project_id}, pending_tasks={len(pending)}",
output_summary=raw_output or None,
tokens_used=result.get("tokens_used"),
model="sonnet",
cost_usd=result.get("cost_usd"),
success=success,
error_message=result.get("error") if not success else None,
duration_seconds=duration,
)
if not success:
return {
"success": False,
"error": result.get("error", "Agent failed"),
"raw_output": raw_output,
"duration_seconds": duration,
}
# Parse structured output
parsed = _try_parse_json(raw_output)
if not isinstance(parsed, dict):
return {
"success": False,
"error": "Agent returned non-JSON output",
"raw_output": raw_output,
"duration_seconds": duration,
}
already_done = parsed.get("already_done", [])
# Auto-apply: mark already_done tasks as done in DB
applied = []
if auto_apply and already_done:
for item in already_done:
tid = item.get("id")
if tid:
t = models.get_task(conn, tid)
if t and t["project_id"] == project_id and t["status"] == "pending":
models.update_task(conn, tid, status="done")
applied.append(tid)
return {
"success": True,
"already_done": already_done,
"still_pending": parsed.get("still_pending", []),
"unclear": parsed.get("unclear", []),
"applied": applied,
"duration_seconds": duration,
"tokens_used": result.get("tokens_used"),
"cost_usd": result.get("cost_usd"),
}
# ---------------------------------------------------------------------------
# Permission error detection
# ---------------------------------------------------------------------------
def _is_permission_error(result: dict) -> bool:
"""Return True if agent result indicates a permission/write failure."""
from core.followup import PERMISSION_PATTERNS
output = (result.get("raw_output") or result.get("output") or "")
if not isinstance(output, str):
output = json.dumps(output, ensure_ascii=False)
error = result.get("error_message") or ""
text = output + " " + error
return any(re.search(p, text) for p in PERMISSION_PATTERNS)
# ---------------------------------------------------------------------------
# Pipeline executor
# ---------------------------------------------------------------------------
def run_pipeline(
conn: sqlite3.Connection,
task_id: str,
steps: list[dict],
dry_run: bool = False,
allow_write: bool = False,
noninteractive: bool = False,
) -> dict:
"""Execute a multi-step pipeline of agents.
steps = [
{"role": "debugger", "model": "opus", "brief": "..."},
{"role": "tester", "depends_on": "debugger", "brief": "..."},
]
Returns {success, steps_completed, total_cost, total_tokens, total_duration, results}
"""
task = models.get_task(conn, task_id)
if not task:
return {"success": False, "error": f"Task '{task_id}' not found"}
project_id = task["project_id"]
# Determine route type from steps or task brief
route_type = "custom"
if task.get("brief") and isinstance(task["brief"], dict):
route_type = task["brief"].get("route_type", "custom") or "custom"
# Determine execution mode (auto vs review)
mode = models.get_effective_mode(conn, project_id, task_id)
# Create pipeline in DB
pipeline = None
if not dry_run:
pipeline = models.create_pipeline(
conn, task_id, project_id, route_type, steps,
)
models.update_task(conn, task_id, status="in_progress")
results = []
total_cost = 0.0
total_tokens = 0
total_duration = 0
previous_output = None
for i, step in enumerate(steps):
role = step["role"]
model = step.get("model", "sonnet")
brief = step.get("brief")
result = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=previous_output,
brief_override=brief,
dry_run=dry_run,
allow_write=allow_write,
noninteractive=noninteractive,
)
if dry_run:
results.append(result)
continue
# Accumulate stats
total_cost += result.get("cost_usd") or 0
total_tokens += result.get("tokens_used") or 0
total_duration += result.get("duration_seconds") or 0
if not result["success"]:
# Auto mode: retry once with allow_write on permission error
if mode == "auto" and not allow_write and _is_permission_error(result):
task_modules = models.get_modules(conn, project_id)
try:
run_hooks(conn, project_id, task_id,
event="task_permission_retry",
task_modules=task_modules)
except Exception:
pass
retry = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=previous_output,
brief_override=brief,
dry_run=False,
allow_write=True,
noninteractive=noninteractive,
)
allow_write = True # subsequent steps also with allow_write
total_cost += retry.get("cost_usd") or 0
total_tokens += retry.get("tokens_used") or 0
total_duration += retry.get("duration_seconds") or 0
if retry["success"]:
result = retry
if not result["success"]:
# Still failed — block regardless of mode
results.append(result)
if pipeline:
models.update_pipeline(
conn, pipeline["id"],
status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
models.update_task(conn, task_id, status="blocked")
return {
"success": False,
"error": f"Step {i+1}/{len(steps)} ({role}) failed",
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
results.append(result)
# Chain output to next step
previous_output = result.get("raw_output") or result.get("output")
if isinstance(previous_output, (dict, list)):
previous_output = json.dumps(previous_output, ensure_ascii=False)
# Pipeline completed
if pipeline and not dry_run:
models.update_pipeline(
conn, pipeline["id"],
status="completed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
task_modules = models.get_modules(conn, project_id)
if mode == "auto":
# Auto mode: skip review, approve immediately
models.update_task(conn, task_id, status="done")
try:
run_hooks(conn, project_id, task_id,
event="task_auto_approved", task_modules=task_modules)
except Exception:
pass
# Auto followup: generate tasks, auto-resolve permission issues.
# Guard: skip for followup-sourced tasks to prevent infinite recursion.
task_brief = task.get("brief") or {}
is_followup_task = (
isinstance(task_brief, dict)
and str(task_brief.get("source", "")).startswith("followup:")
)
if not is_followup_task:
try:
from core.followup import generate_followups, auto_resolve_pending_actions
fu_result = generate_followups(conn, task_id)
if fu_result.get("pending_actions"):
auto_resolve_pending_actions(conn, task_id, fu_result["pending_actions"])
except Exception:
pass
else:
# Review mode: wait for manual approval
models.update_task(conn, task_id, status="review")
# Run post-pipeline hooks (failures don't affect pipeline status)
try:
run_hooks(conn, project_id, task_id,
event="pipeline_completed", task_modules=task_modules)
except Exception:
pass # Hook errors must never block pipeline completion
return {
"success": True,
"steps_completed": len(steps),
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
"dry_run": dry_run,
"mode": mode,
}