kin/core/context_builder.py

288 lines
10 KiB
Python

"""
Kin context builder — assembles role-specific context from DB for agent prompts.
Each role gets only the information it needs, keeping prompts focused.
"""
import json
import sqlite3
from pathlib import Path
from core import models
PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts"
SPECIALISTS_PATH = Path(__file__).parent.parent / "agents" / "specialists.yaml"
def _load_specialists() -> dict:
"""Load specialists.yaml (lazy, no pyyaml dependency — simple parser)."""
path = SPECIALISTS_PATH
if not path.exists():
return {}
import yaml
return yaml.safe_load(path.read_text())
def build_context(
conn: sqlite3.Connection,
task_id: str,
role: str,
project_id: str,
) -> dict:
"""Build role-specific context from DB.
Returns a dict with keys: task, project, and role-specific data.
"""
task = models.get_task(conn, task_id)
project = models.get_project(conn, project_id)
ctx = {
"task": _slim_task(task) if task else None,
"project": _slim_project(project) if project else None,
"role": role,
}
# If task has a revise comment, fetch the last agent output for context
if task and task.get("revise_comment"):
row = conn.execute(
"""SELECT output_summary FROM agent_logs
WHERE task_id = ? AND success = 1
ORDER BY created_at DESC LIMIT 1""",
(task_id,),
).fetchone()
if row and row["output_summary"]:
ctx["last_agent_output"] = row["output_summary"]
if role == "pm":
ctx["modules"] = models.get_modules(conn, project_id)
ctx["decisions"] = models.get_decisions(conn, project_id)
ctx["active_tasks"] = models.list_tasks(conn, project_id=project_id, status="in_progress")
try:
specs = _load_specialists()
ctx["available_specialists"] = list(specs.get("specialists", {}).keys())
ctx["routes"] = specs.get("routes", {})
except Exception:
ctx["available_specialists"] = []
ctx["routes"] = {}
elif role == "architect":
ctx["modules"] = models.get_modules(conn, project_id)
ctx["decisions"] = models.get_decisions(conn, project_id)
elif role == "debugger":
ctx["decisions"] = models.get_decisions(
conn, project_id, types=["gotcha", "workaround"],
)
ctx["module_hint"] = _extract_module_hint(task)
elif role in ("frontend_dev", "backend_dev"):
ctx["decisions"] = models.get_decisions(
conn, project_id, types=["gotcha", "workaround", "convention"],
)
elif role == "reviewer":
ctx["decisions"] = models.get_decisions(
conn, project_id, types=["convention"],
)
elif role == "sysadmin":
ctx["decisions"] = models.get_decisions(conn, project_id)
ctx["modules"] = models.get_modules(conn, project_id)
elif role == "tester":
# Minimal context — just the task spec
pass
elif role == "security":
ctx["decisions"] = models.get_decisions(
conn, project_id, category="security",
)
else:
# Unknown role — give decisions as fallback
ctx["decisions"] = models.get_decisions(conn, project_id, limit=20)
return ctx
def _slim_task(task: dict) -> dict:
"""Extract only relevant fields from a task for the prompt."""
result = {
"id": task["id"],
"title": task["title"],
"status": task["status"],
"priority": task["priority"],
"assigned_role": task.get("assigned_role"),
"brief": task.get("brief"),
"spec": task.get("spec"),
}
if task.get("revise_comment"):
result["revise_comment"] = task["revise_comment"]
if task.get("acceptance_criteria"):
result["acceptance_criteria"] = task["acceptance_criteria"]
return result
def _slim_project(project: dict) -> dict:
"""Extract only relevant fields from a project."""
result = {
"id": project["id"],
"name": project["name"],
"path": project["path"],
"tech_stack": project.get("tech_stack"),
"language": project.get("language", "ru"),
"execution_mode": project.get("execution_mode"),
"project_type": project.get("project_type", "development"),
}
# Include SSH fields for operations projects
if project.get("project_type") == "operations":
result["ssh_host"] = project.get("ssh_host")
result["ssh_user"] = project.get("ssh_user")
result["ssh_key_path"] = project.get("ssh_key_path")
result["ssh_proxy_jump"] = project.get("ssh_proxy_jump")
return result
def _extract_module_hint(task: dict | None) -> str | None:
"""Try to extract module name from task brief."""
if not task:
return None
brief = task.get("brief")
if isinstance(brief, dict):
return brief.get("module")
return None
def format_prompt(context: dict, role: str, prompt_template: str | None = None) -> str:
"""Format a prompt by injecting context into a role template.
If prompt_template is None, loads from agents/prompts/{role}.md.
"""
if prompt_template is None:
prompt_path = PROMPTS_DIR / f"{role}.md"
if prompt_path.exists():
prompt_template = prompt_path.read_text()
else:
prompt_template = f"You are a {role}. Complete the task described below."
sections = [prompt_template, ""]
# Project info
proj = context.get("project")
if proj:
sections.append(f"## Project: {proj['id']}{proj['name']}")
if proj.get("tech_stack"):
sections.append(f"Tech stack: {', '.join(proj['tech_stack'])}")
sections.append(f"Path: {proj['path']}")
project_type = proj.get("project_type", "development")
sections.append(f"Project type: {project_type}")
sections.append("")
# SSH connection info for operations projects
if proj and proj.get("project_type") == "operations":
ssh_host = proj.get("ssh_host") or ""
ssh_user = proj.get("ssh_user") or ""
ssh_key = proj.get("ssh_key_path") or ""
ssh_proxy = proj.get("ssh_proxy_jump") or ""
sections.append("## SSH Connection")
if ssh_host:
sections.append(f"Host: {ssh_host}")
if ssh_user:
sections.append(f"User: {ssh_user}")
if ssh_key:
sections.append(f"Key: {ssh_key}")
if ssh_proxy:
sections.append(f"ProxyJump: {ssh_proxy}")
sections.append("")
# Task info
task = context.get("task")
if task:
sections.append(f"## Task: {task['id']}{task['title']}")
sections.append(f"Status: {task['status']}, Priority: {task['priority']}")
if task.get("brief"):
sections.append(f"Brief: {json.dumps(task['brief'], ensure_ascii=False)}")
if task.get("spec"):
sections.append(f"Spec: {json.dumps(task['spec'], ensure_ascii=False)}")
sections.append("")
# Acceptance criteria — shown as a dedicated section so agents use it for completeness check
if task and task.get("acceptance_criteria"):
sections.append("## Acceptance Criteria")
sections.append(task["acceptance_criteria"])
sections.append("")
# Decisions
decisions = context.get("decisions")
if decisions:
sections.append(f"## Known decisions ({len(decisions)}):")
for d in decisions[:30]: # Cap at 30 to avoid token bloat
tags = f" [{', '.join(d['tags'])}]" if d.get("tags") else ""
sections.append(f"- #{d['id']} [{d['type']}] {d['title']}{tags}")
sections.append("")
# Modules
modules = context.get("modules")
if modules:
sections.append(f"## Modules ({len(modules)}):")
for m in modules:
sections.append(f"- {m['name']} ({m['type']}) — {m['path']}")
sections.append("")
# Active tasks (PM)
active = context.get("active_tasks")
if active:
sections.append(f"## Active tasks ({len(active)}):")
for t in active:
sections.append(f"- {t['id']}: {t['title']} [{t['status']}]")
sections.append("")
# Available specialists (PM)
specialists = context.get("available_specialists")
if specialists:
sections.append(f"## Available specialists: {', '.join(specialists)}")
sections.append("")
# Routes (PM)
routes = context.get("routes")
if routes:
sections.append("## Route templates:")
for name, route in routes.items():
steps = "".join(route.get("steps", []))
sections.append(f"- {name}: {steps}")
sections.append("")
# Module hint (debugger)
hint = context.get("module_hint")
if hint:
sections.append(f"## Target module: {hint}")
sections.append("")
# Revision context: director's comment + agent's previous output
task = context.get("task")
if task and task.get("revise_comment"):
sections.append("## Director's revision request:")
sections.append(task["revise_comment"])
sections.append("")
last_output = context.get("last_agent_output")
if last_output:
sections.append("## Your previous output (before revision):")
sections.append(last_output)
sections.append("")
# Previous step output (pipeline chaining)
prev = context.get("previous_output")
if prev:
sections.append("## Previous step output:")
sections.append(prev if isinstance(prev, str) else json.dumps(prev, ensure_ascii=False))
sections.append("")
# Language instruction — always last so it's fresh in context
proj = context.get("project")
language = proj.get("language", "ru") if proj else "ru"
_LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"}
lang_name = _LANG_NAMES.get(language, language)
sections.append(f"## Language")
sections.append(f"ALWAYS respond in {lang_name}. All summaries, analysis, comments, and recommendations must be in {lang_name}.")
sections.append("")
return "\n".join(sections)