kin/core/context_builder.py
2026-03-19 19:06:18 +02:00

387 lines
15 KiB
Python

"""
Kin context builder — assembles role-specific context from DB for agent prompts.
Each role gets only the information it needs, keeping prompts focused.
"""
import json
import sqlite3
from pathlib import Path
from core import models
PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts"
SPECIALISTS_PATH = Path(__file__).parent.parent / "agents" / "specialists.yaml"
def _load_specialists() -> dict:
"""Load specialists.yaml (lazy, no pyyaml dependency — simple parser)."""
path = SPECIALISTS_PATH
if not path.exists():
return {}
import yaml
return yaml.safe_load(path.read_text())
def build_context(
conn: sqlite3.Connection,
task_id: str,
role: str,
project_id: str,
) -> dict:
"""Build role-specific context from DB.
Returns a dict with keys: task, project, and role-specific data.
"""
task = models.get_task(conn, task_id)
project = models.get_project(conn, project_id)
ctx = {
"task": _slim_task(task) if task else None,
"project": _slim_project(project) if project else None,
"role": role,
}
# Attachments — all roles get them so debugger sees screenshots, UX sees mockups, etc.
# Initialize before conditional to guarantee key presence in ctx (#213)
attachments = models.list_attachments(conn, task_id)
ctx["attachments"] = attachments
# If task has a revise comment, fetch the last agent output for context
if task and task.get("revise_comment"):
row = conn.execute(
"""SELECT output_summary FROM agent_logs
WHERE task_id = ? AND success = 1
ORDER BY created_at DESC LIMIT 1""",
(task_id,),
).fetchone()
if row and row["output_summary"]:
ctx["last_agent_output"] = row["output_summary"]
if role == "pm":
ctx["modules"] = models.get_modules(conn, project_id)
ctx["decisions"] = models.get_decisions(conn, project_id)
ctx["active_tasks"] = models.list_tasks(conn, project_id=project_id, status="in_progress")
try:
specs = _load_specialists()
ctx["available_specialists"] = list(specs.get("specialists", {}).keys())
ctx["routes"] = specs.get("routes", {})
ctx["departments"] = specs.get("departments", {})
except Exception:
ctx["available_specialists"] = []
ctx["routes"] = {}
ctx["departments"] = {}
elif role == "architect":
ctx["modules"] = models.get_modules(conn, project_id)
ctx["decisions"] = models.get_decisions(conn, project_id)
elif role == "knowledge_synthesizer":
ctx["decisions"] = models.get_decisions(conn, project_id)
elif role == "debugger":
ctx["decisions"] = models.get_decisions(
conn, project_id, types=["gotcha", "workaround"],
)
ctx["module_hint"] = _extract_module_hint(task)
elif role in ("frontend_dev", "backend_dev"):
ctx["decisions"] = models.get_decisions(
conn, project_id, types=["gotcha", "workaround", "convention"],
)
elif role == "reviewer":
ctx["decisions"] = models.get_decisions(
conn, project_id, types=["convention"],
)
elif role == "sysadmin":
ctx["decisions"] = models.get_decisions(conn, project_id)
ctx["modules"] = models.get_modules(conn, project_id)
elif role == "tester":
# Minimal context — just the task spec
pass
elif role in ("constitution", "spec"):
ctx["modules"] = models.get_modules(conn, project_id)
ctx["decisions"] = models.get_decisions(conn, project_id)
elif role == "task_decomposer":
ctx["modules"] = models.get_modules(conn, project_id)
ctx["decisions"] = models.get_decisions(conn, project_id)
ctx["active_tasks"] = models.list_tasks(conn, project_id=project_id, status="in_progress")
elif role == "security":
ctx["decisions"] = models.get_decisions(
conn, project_id, category="security",
)
elif role.endswith("_head"):
# Department head: load department config and previous handoff
ctx["decisions"] = models.get_decisions(conn, project_id)
ctx["modules"] = models.get_modules(conn, project_id)
try:
specs = _load_specialists()
all_specs = specs.get("specialists", {})
departments = specs.get("departments", {})
spec = all_specs.get(role, {})
dept_name = spec.get("department", "")
dept_info = departments.get(dept_name, {})
ctx["department"] = dept_name
ctx["department_workers"] = dept_info.get("workers", [])
ctx["department_description"] = dept_info.get("description", "")
except Exception:
ctx["department"] = ""
ctx["department_workers"] = []
ctx["department_description"] = ""
# Previous handoff from another department (if any)
try:
dept = ctx.get("department")
last_handoff = models.get_last_handoff(conn, task_id, to_department=dept)
# Fallback: get latest handoff NOT from our own department
# (avoids picking up our own outgoing handoff)
if not last_handoff and dept:
all_handoffs = models.get_handoffs_for_task(conn, task_id)
for h in reversed(all_handoffs):
if h.get("from_department") != dept:
last_handoff = h
break
if last_handoff:
ctx["incoming_handoff"] = last_handoff
except Exception:
pass
else:
# Unknown role — give decisions as fallback
ctx["decisions"] = models.get_decisions(conn, project_id, limit=20)
return ctx
def _slim_task(task: dict) -> dict:
"""Extract only relevant fields from a task for the prompt."""
result = {
"id": task["id"],
"title": task["title"],
"status": task["status"],
"priority": task["priority"],
"assigned_role": task.get("assigned_role"),
"brief": task.get("brief"),
"spec": task.get("spec"),
}
if task.get("revise_comment"):
result["revise_comment"] = task["revise_comment"]
if task.get("acceptance_criteria"):
result["acceptance_criteria"] = task["acceptance_criteria"]
return result
def _slim_project(project: dict) -> dict:
"""Extract only relevant fields from a project."""
result = {
"id": project["id"],
"name": project["name"],
"path": project["path"],
"tech_stack": project.get("tech_stack"),
"language": project.get("language", "ru"),
"execution_mode": project.get("execution_mode"),
"project_type": project.get("project_type", "development"),
}
# Include SSH fields for operations projects
if project.get("project_type") == "operations":
result["ssh_host"] = project.get("ssh_host")
result["ssh_user"] = project.get("ssh_user")
result["ssh_key_path"] = project.get("ssh_key_path")
result["ssh_proxy_jump"] = project.get("ssh_proxy_jump")
return result
def _extract_module_hint(task: dict | None) -> str | None:
"""Try to extract module name from task brief."""
if not task:
return None
brief = task.get("brief")
if isinstance(brief, dict):
return brief.get("module")
return None
def format_prompt(context: dict, role: str, prompt_template: str | None = None) -> str:
"""Format a prompt by injecting context into a role template.
If prompt_template is None, loads from agents/prompts/{role}.md.
"""
if prompt_template is None:
prompt_path = PROMPTS_DIR / f"{role}.md"
if prompt_path.exists():
prompt_template = prompt_path.read_text()
elif role.endswith("_head"):
# Fallback: all department heads share the base department_head.md prompt
dept_head_path = PROMPTS_DIR / "department_head.md"
if dept_head_path.exists():
prompt_template = dept_head_path.read_text()
else:
prompt_template = f"You are a {role}. Complete the task described below."
else:
prompt_template = f"You are a {role}. Complete the task described below."
sections = [prompt_template, ""]
# Project info
proj = context.get("project")
if proj:
sections.append(f"## Project: {proj['id']}{proj['name']}")
if proj.get("tech_stack"):
sections.append(f"Tech stack: {', '.join(proj['tech_stack'])}")
sections.append(f"Path: {proj['path']}")
project_type = proj.get("project_type", "development")
sections.append(f"Project type: {project_type}")
sections.append("")
# SSH connection info for operations projects
if proj and proj.get("project_type") == "operations":
ssh_host = proj.get("ssh_host") or ""
ssh_user = proj.get("ssh_user") or ""
ssh_key = proj.get("ssh_key_path") or ""
ssh_proxy = proj.get("ssh_proxy_jump") or ""
sections.append("## SSH Connection")
if ssh_host:
sections.append(f"Host: {ssh_host}")
if ssh_user:
sections.append(f"User: {ssh_user}")
if ssh_key:
sections.append(f"Key: {ssh_key}")
if ssh_proxy:
sections.append(f"ProxyJump: {ssh_proxy}")
sections.append("")
# Task info
task = context.get("task")
if task:
sections.append(f"## Task: {task['id']}{task['title']}")
sections.append(f"Status: {task['status']}, Priority: {task['priority']}")
if task.get("brief"):
sections.append(f"Brief: {json.dumps(task['brief'], ensure_ascii=False)}")
if task.get("spec"):
sections.append(f"Spec: {json.dumps(task['spec'], ensure_ascii=False)}")
sections.append("")
# Acceptance criteria — shown as a dedicated section so agents use it for completeness check
if task and task.get("acceptance_criteria"):
sections.append("## Acceptance Criteria")
sections.append(task["acceptance_criteria"])
sections.append("")
# Decisions
decisions = context.get("decisions")
if decisions:
sections.append(f"## Known decisions ({len(decisions)}):")
for d in decisions[:30]: # Cap at 30 to avoid token bloat
tags = f" [{', '.join(d['tags'])}]" if d.get("tags") else ""
sections.append(f"- #{d['id']} [{d['type']}] {d['title']}{tags}")
sections.append("")
# Modules
modules = context.get("modules")
if modules:
sections.append(f"## Modules ({len(modules)}):")
for m in modules:
sections.append(f"- {m['name']} ({m['type']}) — {m['path']}")
sections.append("")
# Active tasks (PM)
active = context.get("active_tasks")
if active:
sections.append(f"## Active tasks ({len(active)}):")
for t in active:
sections.append(f"- {t['id']}: {t['title']} [{t['status']}]")
sections.append("")
# Available specialists (PM)
specialists = context.get("available_specialists")
if specialists:
sections.append(f"## Available specialists: {', '.join(specialists)}")
sections.append("")
# Routes (PM)
routes = context.get("routes")
if routes:
sections.append("## Route templates:")
for name, route in routes.items():
steps = "".join(route.get("steps", []))
sections.append(f"- {name}: {steps}")
sections.append("")
# Department context (department heads)
dept = context.get("department")
if dept:
dept_desc = context.get("department_description", "")
sections.append(f"## Department: {dept}" + (f"{dept_desc}" if dept_desc else ""))
sections.append("")
dept_workers = context.get("department_workers")
if dept_workers:
sections.append(f"## Department workers: {', '.join(dept_workers)}")
sections.append("")
incoming_handoff = context.get("incoming_handoff")
if incoming_handoff:
sections.append("## Incoming handoff from previous department:")
sections.append(json.dumps(incoming_handoff, ensure_ascii=False))
sections.append("")
# Module hint (debugger)
hint = context.get("module_hint")
if hint:
sections.append(f"## Target module: {hint}")
sections.append("")
# Revision context: director's comment + agent's previous output
if task and task.get("revise_comment"):
sections.append("## Director's revision request:")
sections.append(task["revise_comment"])
sections.append("")
last_output = context.get("last_agent_output")
if last_output:
sections.append("## Your previous output (before revision):")
sections.append(last_output)
sections.append("")
# Attachments
attachments = context.get("attachments")
if attachments:
sections.append(f"## Attachments ({len(attachments)}):")
for a in attachments:
mime = a.get("mime_type", "")
size = a.get("size", 0)
sections.append(f"- {a['filename']} ({mime}, {size} bytes): {a['path']}")
# Inline content for small text-readable files (<= 32 KB) so PM can use them immediately
_TEXT_TYPES = {"text/", "application/json", "application/xml", "application/yaml"}
_TEXT_EXTS = {".txt", ".md", ".json", ".yaml", ".yml", ".csv", ".log", ".xml", ".toml", ".ini", ".env"}
is_text = (
any(mime.startswith(t) if t.endswith("/") else mime == t for t in _TEXT_TYPES)
or Path(a["filename"]).suffix.lower() in _TEXT_EXTS
)
if is_text and 0 < size <= 32 * 1024:
try:
content = Path(a["path"]).read_text(encoding="utf-8", errors="replace")
sections.append(f"```\n{content}\n```")
except Exception:
pass
sections.append("")
# Previous step output (pipeline chaining)
prev = context.get("previous_output")
if prev:
sections.append("## Previous step output:")
sections.append(prev if isinstance(prev, str) else json.dumps(prev, ensure_ascii=False))
sections.append("")
# Language instruction — always last so it's fresh in context
proj = context.get("project")
language = proj.get("language", "ru") if proj else "ru"
_LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"}
lang_name = _LANG_NAMES.get(language, language)
sections.append(f"## Language")
sections.append(f"ALWAYS respond in {lang_name}. All summaries, analysis, comments, and recommendations must be in {lang_name}.")
sections.append("")
return "\n".join(sections)