""" Kin context builder — assembles role-specific context from DB for agent prompts. Each role gets only the information it needs, keeping prompts focused. """ import json import sqlite3 from pathlib import Path from core import models PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts" SPECIALISTS_PATH = Path(__file__).parent.parent / "agents" / "specialists.yaml" def _load_specialists() -> dict: """Load specialists.yaml (lazy, no pyyaml dependency — simple parser).""" path = SPECIALISTS_PATH if not path.exists(): return {} import yaml return yaml.safe_load(path.read_text()) def build_context( conn: sqlite3.Connection, task_id: str, role: str, project_id: str, ) -> dict: """Build role-specific context from DB. Returns a dict with keys: task, project, and role-specific data. """ task = models.get_task(conn, task_id) project = models.get_project(conn, project_id) ctx = { "task": _slim_task(task) if task else None, "project": _slim_project(project) if project else None, "role": role, } # Attachments — all roles get them so debugger sees screenshots, UX sees mockups, etc. # Initialize before conditional to guarantee key presence in ctx (#213) attachments = models.list_attachments(conn, task_id) ctx["attachments"] = attachments # If task has a revise comment, fetch the last agent output for context if task and task.get("revise_comment"): row = conn.execute( """SELECT output_summary FROM agent_logs WHERE task_id = ? AND success = 1 ORDER BY created_at DESC LIMIT 1""", (task_id,), ).fetchone() if row and row["output_summary"]: ctx["last_agent_output"] = row["output_summary"] if role == "pm": ctx["modules"] = models.get_modules(conn, project_id) ctx["decisions"] = models.get_decisions(conn, project_id) ctx["active_tasks"] = models.list_tasks(conn, project_id=project_id, status="in_progress") try: specs = _load_specialists() ctx["available_specialists"] = list(specs.get("specialists", {}).keys()) ctx["routes"] = specs.get("routes", {}) ctx["departments"] = specs.get("departments", {}) except Exception: ctx["available_specialists"] = [] ctx["routes"] = {} ctx["departments"] = {} # KIN-135: return history for escalation routing try: return_count = (task or {}).get("return_count") or 0 ctx["return_count"] = return_count if return_count > 0: ctx["return_history"] = models.get_task_returns(conn, task_id, limit=5) else: ctx["return_history"] = [] except Exception: ctx["return_count"] = 0 ctx["return_history"] = [] elif role == "architect": ctx["modules"] = models.get_modules(conn, project_id) ctx["decisions"] = models.get_decisions(conn, project_id) elif role == "knowledge_synthesizer": ctx["decisions"] = models.get_decisions(conn, project_id) elif role == "debugger": ctx["decisions"] = models.get_decisions( conn, project_id, types=["gotcha", "workaround"], ) ctx["module_hint"] = _extract_module_hint(task) elif role in ("frontend_dev", "backend_dev"): ctx["decisions"] = models.get_decisions( conn, project_id, types=["gotcha", "workaround", "convention"], ) elif role == "reviewer": ctx["decisions"] = models.get_decisions( conn, project_id, types=["convention"], ) elif role == "sysadmin": ctx["decisions"] = models.get_decisions(conn, project_id) ctx["modules"] = models.get_modules(conn, project_id) elif role == "tester": # Minimal context — just the task spec pass elif role in ("constitution", "spec"): ctx["modules"] = models.get_modules(conn, project_id) ctx["decisions"] = models.get_decisions(conn, project_id) elif role == "task_decomposer": ctx["modules"] = models.get_modules(conn, project_id) ctx["decisions"] = models.get_decisions(conn, project_id) ctx["active_tasks"] = models.list_tasks(conn, project_id=project_id, status="in_progress") elif role == "security": ctx["decisions"] = models.get_decisions( conn, project_id, category="security", ) elif role.endswith("_head"): # Department head: load department config and previous handoff ctx["decisions"] = models.get_decisions(conn, project_id) ctx["modules"] = models.get_modules(conn, project_id) try: specs = _load_specialists() all_specs = specs.get("specialists", {}) departments = specs.get("departments", {}) spec = all_specs.get(role, {}) dept_name = spec.get("department", "") dept_info = departments.get(dept_name, {}) ctx["department"] = dept_name ctx["department_workers"] = dept_info.get("workers", []) ctx["department_description"] = dept_info.get("description", "") except Exception: ctx["department"] = "" ctx["department_workers"] = [] ctx["department_description"] = "" # Previous handoff from another department (if any) try: dept = ctx.get("department") last_handoff = models.get_last_handoff(conn, task_id, to_department=dept) # Fallback: get latest handoff NOT from our own department # (avoids picking up our own outgoing handoff) if not last_handoff and dept: all_handoffs = models.get_handoffs_for_task(conn, task_id) for h in reversed(all_handoffs): if h.get("from_department") != dept: last_handoff = h break if last_handoff: ctx["incoming_handoff"] = last_handoff except Exception: pass elif role == "return_analyst": # KIN-135: return analyst needs full return history and decisions ctx["decisions"] = models.get_decisions(conn, project_id) try: return_count = (task or {}).get("return_count") or 0 ctx["return_count"] = return_count ctx["return_history"] = models.get_task_returns(conn, task_id, limit=20) except Exception: ctx["return_count"] = 0 ctx["return_history"] = [] else: # Unknown role — give decisions as fallback ctx["decisions"] = models.get_decisions(conn, project_id, limit=20) return ctx def _slim_task(task: dict) -> dict: """Extract only relevant fields from a task for the prompt.""" result = { "id": task["id"], "title": task["title"], "status": task["status"], "priority": task["priority"], "assigned_role": task.get("assigned_role"), "brief": task.get("brief"), "spec": task.get("spec"), } if task.get("revise_comment"): result["revise_comment"] = task["revise_comment"] if task.get("acceptance_criteria"): result["acceptance_criteria"] = task["acceptance_criteria"] return result def _slim_project(project: dict) -> dict: """Extract only relevant fields from a project.""" result = { "id": project["id"], "name": project["name"], "path": project["path"], "tech_stack": project.get("tech_stack"), "language": project.get("language", "ru"), "execution_mode": project.get("execution_mode"), "project_type": project.get("project_type", "development"), } # Include SSH fields for operations projects if project.get("project_type") == "operations": result["ssh_host"] = project.get("ssh_host") result["ssh_user"] = project.get("ssh_user") result["ssh_key_path"] = project.get("ssh_key_path") result["ssh_proxy_jump"] = project.get("ssh_proxy_jump") return result def _extract_module_hint(task: dict | None) -> str | None: """Try to extract module name from task brief.""" if not task: return None brief = task.get("brief") if isinstance(brief, dict): return brief.get("module") return None def format_prompt(context: dict, role: str, prompt_template: str | None = None) -> str: """Format a prompt by injecting context into a role template. If prompt_template is None, loads from agents/prompts/{role}.md. """ if prompt_template is None: prompt_path = PROMPTS_DIR / f"{role}.md" if prompt_path.exists(): prompt_template = prompt_path.read_text() elif role.endswith("_head"): # Fallback: all department heads share the base department_head.md prompt dept_head_path = PROMPTS_DIR / "department_head.md" if dept_head_path.exists(): prompt_template = dept_head_path.read_text() else: prompt_template = f"You are a {role}. Complete the task described below." else: prompt_template = f"You are a {role}. Complete the task described below." sections = [prompt_template, ""] # Project info proj = context.get("project") if proj: sections.append(f"## Project: {proj['id']} — {proj['name']}") if proj.get("tech_stack"): sections.append(f"Tech stack: {', '.join(proj['tech_stack'])}") sections.append(f"Path: {proj['path']}") project_type = proj.get("project_type", "development") sections.append(f"Project type: {project_type}") sections.append("") # SSH connection info for operations projects if proj and proj.get("project_type") == "operations": ssh_host = proj.get("ssh_host") or "" ssh_user = proj.get("ssh_user") or "" ssh_key = proj.get("ssh_key_path") or "" ssh_proxy = proj.get("ssh_proxy_jump") or "" sections.append("## SSH Connection") if ssh_host: sections.append(f"Host: {ssh_host}") if ssh_user: sections.append(f"User: {ssh_user}") if ssh_key: sections.append(f"Key: {ssh_key}") if ssh_proxy: sections.append(f"ProxyJump: {ssh_proxy}") sections.append("") # Task info task = context.get("task") if task: sections.append(f"## Task: {task['id']} — {task['title']}") sections.append(f"Status: {task['status']}, Priority: {task['priority']}") if task.get("brief"): sections.append(f"Brief: {json.dumps(task['brief'], ensure_ascii=False)}") if task.get("spec"): sections.append(f"Spec: {json.dumps(task['spec'], ensure_ascii=False)}") sections.append("") # Acceptance criteria — shown as a dedicated section so agents use it for completeness check if task and task.get("acceptance_criteria"): sections.append("## Acceptance Criteria") sections.append(task["acceptance_criteria"]) sections.append("") # Decisions decisions = context.get("decisions") if decisions: sections.append(f"## Known decisions ({len(decisions)}):") for d in decisions[:30]: # Cap at 30 to avoid token bloat tags = f" [{', '.join(d['tags'])}]" if d.get("tags") else "" sections.append(f"- #{d['id']} [{d['type']}] {d['title']}{tags}") sections.append("") # Modules modules = context.get("modules") if modules: sections.append(f"## Modules ({len(modules)}):") for m in modules: sections.append(f"- {m['name']} ({m['type']}) — {m['path']}") sections.append("") # Active tasks (PM) active = context.get("active_tasks") if active: sections.append(f"## Active tasks ({len(active)}):") for t in active: sections.append(f"- {t['id']}: {t['title']} [{t['status']}]") sections.append("") # Return history (PM) — KIN-135 return_count = context.get("return_count", 0) return_history = context.get("return_history") if return_count and return_count > 0: sections.append(f"## Return History (return_count={return_count}):") if return_history: for r in return_history: reason_text = f" — {r['reason_text']}" if r.get("reason_text") else "" sections.append( f"- #{r['return_number']} [{r['reason_category']}]{reason_text} " f"(returned_by={r.get('returned_by', 'system')}, at={r.get('returned_at', '')})" ) sections.append("") # Available specialists (PM) specialists = context.get("available_specialists") if specialists: sections.append(f"## Available specialists: {', '.join(specialists)}") sections.append("") # Routes (PM) routes = context.get("routes") if routes: sections.append("## Route templates:") for name, route in routes.items(): steps = " → ".join(route.get("steps", [])) sections.append(f"- {name}: {steps}") sections.append("") # Department context (department heads) dept = context.get("department") if dept: dept_desc = context.get("department_description", "") sections.append(f"## Department: {dept}" + (f" — {dept_desc}" if dept_desc else "")) sections.append("") dept_workers = context.get("department_workers") if dept_workers: sections.append(f"## Department workers: {', '.join(dept_workers)}") sections.append("") incoming_handoff = context.get("incoming_handoff") if incoming_handoff: sections.append("## Incoming handoff from previous department:") sections.append(json.dumps(incoming_handoff, ensure_ascii=False)) sections.append("") # Module hint (debugger) hint = context.get("module_hint") if hint: sections.append(f"## Target module: {hint}") sections.append("") # Revision context: director's comment + agent's previous output if task and task.get("revise_comment"): sections.append("## Director's revision request:") sections.append(task["revise_comment"]) sections.append("") last_output = context.get("last_agent_output") if last_output: sections.append("## Your previous output (before revision):") sections.append(last_output) sections.append("") # Attachments attachments = context.get("attachments") if attachments: sections.append(f"## Attachments ({len(attachments)}):") for a in attachments: mime = a.get("mime_type", "") size = a.get("size", 0) sections.append(f"- {a['filename']} ({mime}, {size} bytes): {a['path']}") # Inline content for small text-readable files (<= 32 KB) so PM can use them immediately _TEXT_TYPES = {"text/", "application/json", "application/xml", "application/yaml"} _TEXT_EXTS = {".txt", ".md", ".json", ".yaml", ".yml", ".csv", ".log", ".xml", ".toml", ".ini", ".env"} is_text = ( any(mime.startswith(t) if t.endswith("/") else mime == t for t in _TEXT_TYPES) or Path(a["filename"]).suffix.lower() in _TEXT_EXTS ) if is_text and 0 < size <= 32 * 1024: try: content = Path(a["path"]).read_text(encoding="utf-8", errors="replace") sections.append(f"```\n{content}\n```") except Exception: pass sections.append("") # Previous step output (pipeline chaining) prev = context.get("previous_output") if prev: sections.append("## Previous step output:") sections.append(prev if isinstance(prev, str) else json.dumps(prev, ensure_ascii=False)) sections.append("") # Language instruction — always last so it's fresh in context proj = context.get("project") language = proj.get("language", "ru") if proj else "ru" _LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish", "de": "German", "fr": "French"} lang_name = _LANG_NAMES.get(language, language) sections.append(f"## Language") sections.append(f"ALWAYS respond in {lang_name}. All summaries, analysis, comments, and recommendations must be in {lang_name}.") sections.append("") return "\n".join(sections)