""" Kin context builder — assembles role-specific context from DB for agent prompts. Each role gets only the information it needs, keeping prompts focused. """ import json import sqlite3 from pathlib import Path from core import models PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts" SPECIALISTS_PATH = Path(__file__).parent.parent / "agents" / "specialists.yaml" def _load_specialists() -> dict: """Load specialists.yaml (lazy, no pyyaml dependency — simple parser).""" path = SPECIALISTS_PATH if not path.exists(): return {} import yaml return yaml.safe_load(path.read_text()) def build_context( conn: sqlite3.Connection, task_id: str, role: str, project_id: str, ) -> dict: """Build role-specific context from DB. Returns a dict with keys: task, project, and role-specific data. """ task = models.get_task(conn, task_id) project = models.get_project(conn, project_id) ctx = { "task": _slim_task(task) if task else None, "project": _slim_project(project) if project else None, "role": role, } # Attachments — all roles get them so debugger sees screenshots, UX sees mockups, etc. attachments = models.list_attachments(conn, task_id) if attachments: ctx["attachments"] = attachments # If task has a revise comment, fetch the last agent output for context if task and task.get("revise_comment"): row = conn.execute( """SELECT output_summary FROM agent_logs WHERE task_id = ? AND success = 1 ORDER BY created_at DESC LIMIT 1""", (task_id,), ).fetchone() if row and row["output_summary"]: ctx["last_agent_output"] = row["output_summary"] if role == "pm": ctx["modules"] = models.get_modules(conn, project_id) ctx["decisions"] = models.get_decisions(conn, project_id) ctx["active_tasks"] = models.list_tasks(conn, project_id=project_id, status="in_progress") try: specs = _load_specialists() ctx["available_specialists"] = list(specs.get("specialists", {}).keys()) ctx["routes"] = specs.get("routes", {}) except Exception: ctx["available_specialists"] = [] ctx["routes"] = {} elif role == "architect": ctx["modules"] = models.get_modules(conn, project_id) ctx["decisions"] = models.get_decisions(conn, project_id) elif role == "debugger": ctx["decisions"] = models.get_decisions( conn, project_id, types=["gotcha", "workaround"], ) ctx["module_hint"] = _extract_module_hint(task) elif role in ("frontend_dev", "backend_dev"): ctx["decisions"] = models.get_decisions( conn, project_id, types=["gotcha", "workaround", "convention"], ) elif role == "reviewer": ctx["decisions"] = models.get_decisions( conn, project_id, types=["convention"], ) elif role == "sysadmin": ctx["decisions"] = models.get_decisions(conn, project_id) ctx["modules"] = models.get_modules(conn, project_id) elif role == "tester": # Minimal context — just the task spec pass elif role == "security": ctx["decisions"] = models.get_decisions( conn, project_id, category="security", ) else: # Unknown role — give decisions as fallback ctx["decisions"] = models.get_decisions(conn, project_id, limit=20) return ctx def _slim_task(task: dict) -> dict: """Extract only relevant fields from a task for the prompt.""" result = { "id": task["id"], "title": task["title"], "status": task["status"], "priority": task["priority"], "assigned_role": task.get("assigned_role"), "brief": task.get("brief"), "spec": task.get("spec"), } if task.get("revise_comment"): result["revise_comment"] = task["revise_comment"] if task.get("acceptance_criteria"): result["acceptance_criteria"] = task["acceptance_criteria"] return result def _slim_project(project: dict) -> dict: """Extract only relevant fields from a project.""" result = { "id": project["id"], "name": project["name"], "path": project["path"], "tech_stack": project.get("tech_stack"), "language": project.get("language", "ru"), "execution_mode": project.get("execution_mode"), "project_type": project.get("project_type", "development"), } # Include SSH fields for operations projects if project.get("project_type") == "operations": result["ssh_host"] = project.get("ssh_host") result["ssh_user"] = project.get("ssh_user") result["ssh_key_path"] = project.get("ssh_key_path") result["ssh_proxy_jump"] = project.get("ssh_proxy_jump") return result def _extract_module_hint(task: dict | None) -> str | None: """Try to extract module name from task brief.""" if not task: return None brief = task.get("brief") if isinstance(brief, dict): return brief.get("module") return None def format_prompt(context: dict, role: str, prompt_template: str | None = None) -> str: """Format a prompt by injecting context into a role template. If prompt_template is None, loads from agents/prompts/{role}.md. """ if prompt_template is None: prompt_path = PROMPTS_DIR / f"{role}.md" if prompt_path.exists(): prompt_template = prompt_path.read_text() else: prompt_template = f"You are a {role}. Complete the task described below." sections = [prompt_template, ""] # Project info proj = context.get("project") if proj: sections.append(f"## Project: {proj['id']} — {proj['name']}") if proj.get("tech_stack"): sections.append(f"Tech stack: {', '.join(proj['tech_stack'])}") sections.append(f"Path: {proj['path']}") project_type = proj.get("project_type", "development") sections.append(f"Project type: {project_type}") sections.append("") # SSH connection info for operations projects if proj and proj.get("project_type") == "operations": ssh_host = proj.get("ssh_host") or "" ssh_user = proj.get("ssh_user") or "" ssh_key = proj.get("ssh_key_path") or "" ssh_proxy = proj.get("ssh_proxy_jump") or "" sections.append("## SSH Connection") if ssh_host: sections.append(f"Host: {ssh_host}") if ssh_user: sections.append(f"User: {ssh_user}") if ssh_key: sections.append(f"Key: {ssh_key}") if ssh_proxy: sections.append(f"ProxyJump: {ssh_proxy}") sections.append("") # Task info task = context.get("task") if task: sections.append(f"## Task: {task['id']} — {task['title']}") sections.append(f"Status: {task['status']}, Priority: {task['priority']}") if task.get("brief"): sections.append(f"Brief: {json.dumps(task['brief'], ensure_ascii=False)}") if task.get("spec"): sections.append(f"Spec: {json.dumps(task['spec'], ensure_ascii=False)}") sections.append("") # Acceptance criteria — shown as a dedicated section so agents use it for completeness check if task and task.get("acceptance_criteria"): sections.append("## Acceptance Criteria") sections.append(task["acceptance_criteria"]) sections.append("") # Decisions decisions = context.get("decisions") if decisions: sections.append(f"## Known decisions ({len(decisions)}):") for d in decisions[:30]: # Cap at 30 to avoid token bloat tags = f" [{', '.join(d['tags'])}]" if d.get("tags") else "" sections.append(f"- #{d['id']} [{d['type']}] {d['title']}{tags}") sections.append("") # Modules modules = context.get("modules") if modules: sections.append(f"## Modules ({len(modules)}):") for m in modules: sections.append(f"- {m['name']} ({m['type']}) — {m['path']}") sections.append("") # Active tasks (PM) active = context.get("active_tasks") if active: sections.append(f"## Active tasks ({len(active)}):") for t in active: sections.append(f"- {t['id']}: {t['title']} [{t['status']}]") sections.append("") # Available specialists (PM) specialists = context.get("available_specialists") if specialists: sections.append(f"## Available specialists: {', '.join(specialists)}") sections.append("") # Routes (PM) routes = context.get("routes") if routes: sections.append("## Route templates:") for name, route in routes.items(): steps = " → ".join(route.get("steps", [])) sections.append(f"- {name}: {steps}") sections.append("") # Module hint (debugger) hint = context.get("module_hint") if hint: sections.append(f"## Target module: {hint}") sections.append("") # Revision context: director's comment + agent's previous output task = context.get("task") if task and task.get("revise_comment"): sections.append("## Director's revision request:") sections.append(task["revise_comment"]) sections.append("") last_output = context.get("last_agent_output") if last_output: sections.append("## Your previous output (before revision):") sections.append(last_output) sections.append("") # Attachments attachments = context.get("attachments") if attachments: sections.append(f"## Attachments ({len(attachments)}):") for a in attachments: sections.append(f"- {a['filename']}: {a['path']}") sections.append("") # Previous step output (pipeline chaining) prev = context.get("previous_output") if prev: sections.append("## Previous step output:") sections.append(prev if isinstance(prev, str) else json.dumps(prev, ensure_ascii=False)) sections.append("") # Language instruction — always last so it's fresh in context proj = context.get("project") language = proj.get("language", "ru") if proj else "ru" _LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"} lang_name = _LANG_NAMES.get(language, language) sections.append(f"## Language") sections.append(f"ALWAYS respond in {lang_name}. All summaries, analysis, comments, and recommendations must be in {lang_name}.") sections.append("") return "\n".join(sections)