kin/core/context_builder.py
johnfrum1234 fabae74c19 Add context builder, agent runner, and pipeline executor
core/context_builder.py:
  build_context() — assembles role-specific context from DB.
  PM gets everything; debugger gets gotchas/workarounds; reviewer
  gets conventions only; tester gets minimal context; security
  gets security-category decisions.
  format_prompt() — injects context into role templates.

agents/runner.py:
  run_agent() — launches claude CLI as subprocess with role prompt.
  run_pipeline() — executes multi-step pipelines sequentially,
  chains output between steps, logs to agent_logs, creates/updates
  pipeline records, handles failures gracefully.

agents/specialists.yaml — 8 roles with tools, permissions, context rules.
agents/prompts/pm.md — PM prompt for task decomposition.
agents/prompts/security.md — security audit prompt (OWASP, auth, secrets).

CLI: kin run <task_id> [--dry-run]
  PM decomposes → shows pipeline → executes with confirmation.

31 new tests (15 context_builder, 11 runner, 5 JSON parsing).
92 total, all passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 14:03:32 +02:00

212 lines
6.7 KiB
Python

"""
Kin context builder — assembles role-specific context from DB for agent prompts.
Each role gets only the information it needs, keeping prompts focused.
"""
import json
import sqlite3
from pathlib import Path
from core import models
PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts"
SPECIALISTS_PATH = Path(__file__).parent.parent / "agents" / "specialists.yaml"
def _load_specialists() -> dict:
"""Load specialists.yaml (lazy, no pyyaml dependency — simple parser)."""
path = SPECIALISTS_PATH
if not path.exists():
return {}
import yaml
return yaml.safe_load(path.read_text())
def build_context(
conn: sqlite3.Connection,
task_id: str,
role: str,
project_id: str,
) -> dict:
"""Build role-specific context from DB.
Returns a dict with keys: task, project, and role-specific data.
"""
task = models.get_task(conn, task_id)
project = models.get_project(conn, project_id)
ctx = {
"task": _slim_task(task) if task else None,
"project": _slim_project(project) if project else None,
"role": role,
}
if role == "pm":
ctx["modules"] = models.get_modules(conn, project_id)
ctx["decisions"] = models.get_decisions(conn, project_id)
ctx["active_tasks"] = models.list_tasks(conn, project_id=project_id, status="in_progress")
try:
specs = _load_specialists()
ctx["available_specialists"] = list(specs.get("specialists", {}).keys())
ctx["routes"] = specs.get("routes", {})
except Exception:
ctx["available_specialists"] = []
ctx["routes"] = {}
elif role == "architect":
ctx["modules"] = models.get_modules(conn, project_id)
ctx["decisions"] = models.get_decisions(conn, project_id)
elif role == "debugger":
ctx["decisions"] = models.get_decisions(
conn, project_id, types=["gotcha", "workaround"],
)
ctx["module_hint"] = _extract_module_hint(task)
elif role in ("frontend_dev", "backend_dev"):
ctx["decisions"] = models.get_decisions(
conn, project_id, types=["gotcha", "workaround", "convention"],
)
elif role == "reviewer":
ctx["decisions"] = models.get_decisions(
conn, project_id, types=["convention"],
)
elif role == "tester":
# Minimal context — just the task spec
pass
elif role == "security":
ctx["decisions"] = models.get_decisions(
conn, project_id, category="security",
)
else:
# Unknown role — give decisions as fallback
ctx["decisions"] = models.get_decisions(conn, project_id, limit=20)
return ctx
def _slim_task(task: dict) -> dict:
"""Extract only relevant fields from a task for the prompt."""
return {
"id": task["id"],
"title": task["title"],
"status": task["status"],
"priority": task["priority"],
"assigned_role": task.get("assigned_role"),
"brief": task.get("brief"),
"spec": task.get("spec"),
}
def _slim_project(project: dict) -> dict:
"""Extract only relevant fields from a project."""
return {
"id": project["id"],
"name": project["name"],
"path": project["path"],
"tech_stack": project.get("tech_stack"),
}
def _extract_module_hint(task: dict | None) -> str | None:
"""Try to extract module name from task brief."""
if not task:
return None
brief = task.get("brief")
if isinstance(brief, dict):
return brief.get("module")
return None
def format_prompt(context: dict, role: str, prompt_template: str | None = None) -> str:
"""Format a prompt by injecting context into a role template.
If prompt_template is None, loads from agents/prompts/{role}.md.
"""
if prompt_template is None:
prompt_path = PROMPTS_DIR / f"{role}.md"
if prompt_path.exists():
prompt_template = prompt_path.read_text()
else:
prompt_template = f"You are a {role}. Complete the task described below."
sections = [prompt_template, ""]
# Project info
proj = context.get("project")
if proj:
sections.append(f"## Project: {proj['id']}{proj['name']}")
if proj.get("tech_stack"):
sections.append(f"Tech stack: {', '.join(proj['tech_stack'])}")
sections.append(f"Path: {proj['path']}")
sections.append("")
# Task info
task = context.get("task")
if task:
sections.append(f"## Task: {task['id']}{task['title']}")
sections.append(f"Status: {task['status']}, Priority: {task['priority']}")
if task.get("brief"):
sections.append(f"Brief: {json.dumps(task['brief'], ensure_ascii=False)}")
if task.get("spec"):
sections.append(f"Spec: {json.dumps(task['spec'], ensure_ascii=False)}")
sections.append("")
# Decisions
decisions = context.get("decisions")
if decisions:
sections.append(f"## Known decisions ({len(decisions)}):")
for d in decisions[:30]: # Cap at 30 to avoid token bloat
tags = f" [{', '.join(d['tags'])}]" if d.get("tags") else ""
sections.append(f"- #{d['id']} [{d['type']}] {d['title']}{tags}")
sections.append("")
# Modules
modules = context.get("modules")
if modules:
sections.append(f"## Modules ({len(modules)}):")
for m in modules:
sections.append(f"- {m['name']} ({m['type']}) — {m['path']}")
sections.append("")
# Active tasks (PM)
active = context.get("active_tasks")
if active:
sections.append(f"## Active tasks ({len(active)}):")
for t in active:
sections.append(f"- {t['id']}: {t['title']} [{t['status']}]")
sections.append("")
# Available specialists (PM)
specialists = context.get("available_specialists")
if specialists:
sections.append(f"## Available specialists: {', '.join(specialists)}")
sections.append("")
# Routes (PM)
routes = context.get("routes")
if routes:
sections.append("## Route templates:")
for name, route in routes.items():
steps = "".join(route.get("steps", []))
sections.append(f"- {name}: {steps}")
sections.append("")
# Module hint (debugger)
hint = context.get("module_hint")
if hint:
sections.append(f"## Target module: {hint}")
sections.append("")
# Previous step output (pipeline chaining)
prev = context.get("previous_output")
if prev:
sections.append("## Previous step output:")
sections.append(prev if isinstance(prev, str) else json.dumps(prev, ensure_ascii=False))
sections.append("")
return "\n".join(sections)