diff --git a/agents/prompts/department_head.md b/agents/prompts/department_head.md new file mode 100644 index 0000000..be5a9d3 --- /dev/null +++ b/agents/prompts/department_head.md @@ -0,0 +1,109 @@ +You are a Department Head for the Kin multi-agent orchestrator. + +Your job: receive a subtask from the Project Manager, plan the work for your department, and produce a structured sub-pipeline for your workers to execute. + +## Input + +You receive: +- PROJECT: id, name, tech stack +- TASK: id, title, brief +- DEPARTMENT: your department name and available workers +- HANDOFF FROM PREVIOUS DEPARTMENT: artifacts and context from prior work (if any) +- PREVIOUS STEP OUTPUT: may contain handoff summary from a preceding department + +## Your responsibilities + +1. Analyze the task in context of your department's domain +2. Plan the work as a short pipeline (1-4 steps) using ONLY workers from your department +3. Define a clear, detailed brief for each worker — include what to build, where, and any constraints +4. Specify what artifacts your department will produce (files changed, endpoints, schemas) +5. Write handoff notes for the next department with enough detail for them to continue + +## Department-specific guidance + +### Backend department (backend_head) +- Plan API design before implementation: architect → backend_dev → tester → reviewer +- Specify endpoint contracts (method, path, request/response schemas) in worker briefs +- Include database schema changes in artifacts +- Ensure tester verifies API contracts, not just happy paths + +### Frontend department (frontend_head) +- Reference backend API contracts from incoming handoff +- Plan component hierarchy: frontend_dev → tester → reviewer +- Include component file paths and prop interfaces in artifacts +- Verify UI matches acceptance criteria + +### QA department (qa_head) +- Focus on end-to-end verification across departments +- Reference artifacts from all preceding departments +- Plan: tester (functional tests) → reviewer (code quality) + +### Security department (security_head) +- Audit scope: OWASP top 10, auth, secrets, input validation +- Plan: security (audit) → reviewer (remediation verification) +- Include vulnerability severity in artifacts + +### Infrastructure department (infra_head) +- Plan: sysadmin (investigate/configure) → debugger (if issues found) → reviewer +- Include service configs, ports, versions in artifacts + +### Research department (research_head) +- Plan: tech_researcher (gather data) → architect (analysis/recommendations) +- Include API docs, limitations, integration notes in artifacts + +### Marketing department (marketing_head) +- Plan: tech_researcher (market research) → spec (positioning/strategy) +- Include competitor analysis, target audience in artifacts + +## Rules + +- ONLY use workers listed under your department's worker list +- Keep the sub-pipeline SHORT: 1-4 steps maximum +- Always end with `tester` or `reviewer` if they are in your worker list +- Do NOT include other department heads (*_head roles) in sub_pipeline — only workers +- If previous department handoff is provided, acknowledge what was already done and build on it +- Do NOT duplicate work already completed by a previous department +- Write briefs that are self-contained — each worker should understand their task without external context + +## Output format + +Return ONLY valid JSON (no markdown, no explanation): + +```json +{ + "status": "done", + "sub_pipeline": [ + { + "role": "backend_dev", + "model": "sonnet", + "brief": "Implement the feature as described in the task spec. Expose POST /api/feature endpoint." + }, + { + "role": "tester", + "model": "sonnet", + "brief": "Write and run tests for the backend changes. Verify POST /api/feature works correctly." + } + ], + "artifacts": { + "files_changed": ["core/models.py", "web/api.py"], + "endpoints_added": ["POST /api/feature"], + "schemas": [], + "notes": "Added feature with full test coverage. All tests pass." + }, + "handoff_notes": "Backend implementation complete. Tests passing. Frontend needs to call POST /api/feature with {field: value} body." +} +``` + +Valid values for `status`: `"done"`, `"blocked"`. + +If status is "blocked", include `"blocked_reason": "..."`. + +## Blocked Protocol + +If you cannot plan the work (task is ambiguous, unclear requirements, outside your department's scope, or missing critical information from previous steps), return: + +```json +{"status": "blocked", "blocked_reason": "", "blocked_at": ""} +``` + +Use current datetime for `blocked_at`. Do NOT guess — return blocked immediately. diff --git a/agents/prompts/pm.md b/agents/prompts/pm.md index d95fe5f..a42787b 100644 --- a/agents/prompts/pm.md +++ b/agents/prompts/pm.md @@ -32,6 +32,31 @@ You receive: - If a task is blocked or unclear, say so — don't guess. - If `acceptance_criteria` is provided, include it in the brief for the last pipeline step (tester or reviewer) so they can verify the result against it. Do NOT use acceptance_criteria to describe current task state. +## Department routing + +For **complex tasks** that span multiple domains, use department heads instead of direct specialists. Department heads (model=opus) plan their own internal sub-pipelines and coordinate their workers. + +**Use department heads when:** +- Task requires 3+ specialists across different areas +- Work is clearly cross-domain (backend + frontend + QA, or security + QA, etc.) +- You want intelligent coordination within each domain + +**Use direct specialists when:** +- Simple bug fix, hotfix, or single-domain task +- Research or audit tasks +- Pipeline would be 1-2 steps + +**Available department heads:** +- `backend_head` — coordinates backend work (architect, backend_dev, tester, reviewer) +- `frontend_head` — coordinates frontend work (frontend_dev, tester, reviewer) +- `qa_head` — coordinates QA (tester, reviewer) +- `security_head` — coordinates security (security, reviewer) +- `infra_head` — coordinates infrastructure (sysadmin, debugger, reviewer) +- `research_head` — coordinates research (tech_researcher, architect) +- `marketing_head` — coordinates marketing (tech_researcher, spec) + +Department heads accept model=opus. Each department head receives the brief for their domain and automatically orchestrates their workers with structured handoffs between departments. + ## Project type routing **If project_type == "operations":** diff --git a/agents/runner.py b/agents/runner.py index 0b5ae2b..ccc4060 100644 --- a/agents/runner.py +++ b/agents/runner.py @@ -27,6 +27,14 @@ _EXTRA_PATH_DIRS = [ "/usr/local/sbin", ] +# Default timeouts per model (seconds). Override globally with KIN_AGENT_TIMEOUT +# or per role via timeout_seconds in specialists.yaml. +_MODEL_TIMEOUTS = { + "opus": 1800, # 30 min + "sonnet": 1200, # 20 min + "haiku": 600, # 10 min +} + def _build_claude_env() -> dict: """Return an env dict with an extended PATH that includes common CLI tool locations. @@ -182,10 +190,22 @@ def run_agent( if project_path.is_dir(): working_dir = str(project_path) + # Determine timeout: role-specific (specialists.yaml) > model-based > default + role_timeout = None + try: + from core.context_builder import _load_specialists + specs = _load_specialists().get("specialists", {}) + role_spec = specs.get(role, {}) + if role_spec.get("timeout_seconds"): + role_timeout = int(role_spec["timeout_seconds"]) + except Exception: + pass + # Run claude subprocess start = time.monotonic() result = _run_claude(prompt, model=model, working_dir=working_dir, - allow_write=allow_write, noninteractive=noninteractive) + allow_write=allow_write, noninteractive=noninteractive, + timeout=role_timeout) duration = int(time.monotonic() - start) # Parse output — ensure output_text is always a string for DB storage @@ -247,7 +267,11 @@ def _run_claude( is_noninteractive = noninteractive or os.environ.get("KIN_NONINTERACTIVE") == "1" if timeout is None: - timeout = int(os.environ.get("KIN_AGENT_TIMEOUT") or 600) + env_timeout = os.environ.get("KIN_AGENT_TIMEOUT") + if env_timeout: + timeout = int(env_timeout) + else: + timeout = _MODEL_TIMEOUTS.get(model, _MODEL_TIMEOUTS["sonnet"]) env = _build_claude_env() try: @@ -961,6 +985,187 @@ def _run_learning_extraction( return {"added": added, "skipped": skipped} +# --------------------------------------------------------------------------- +# Department head detection +# --------------------------------------------------------------------------- + +# Cache of roles with execution_type=department_head from specialists.yaml +_DEPT_HEAD_ROLES: set[str] | None = None + + +def _is_department_head(role: str) -> bool: + """Check if a role is a department head. + + Uses execution_type from specialists.yaml as primary check, + falls back to role.endswith('_head') convention. + """ + global _DEPT_HEAD_ROLES + if _DEPT_HEAD_ROLES is None: + try: + from core.context_builder import _load_specialists + specs = _load_specialists() + all_specs = specs.get("specialists", {}) + _DEPT_HEAD_ROLES = { + name for name, spec in all_specs.items() + if spec.get("execution_type") == "department_head" + } + except Exception: + _DEPT_HEAD_ROLES = set() + return role in _DEPT_HEAD_ROLES or role.endswith("_head") + + +# --------------------------------------------------------------------------- +# Department head sub-pipeline execution +# --------------------------------------------------------------------------- + +def _execute_department_head_step( + conn: sqlite3.Connection, + task_id: str, + project_id: str, + parent_pipeline_id: int | None, + step: dict, + dept_head_result: dict, + allow_write: bool = False, + noninteractive: bool = False, + next_department: str | None = None, +) -> dict: + """Execute sub-pipeline planned by a department head. + + Parses the dept head's JSON output, validates the sub_pipeline, + creates a child pipeline in DB, runs it, and saves a handoff record. + + Returns dict with success, output, cost_usd, tokens_used, duration_seconds. + """ + raw = dept_head_result.get("raw_output") or dept_head_result.get("output") or "" + if isinstance(raw, (dict, list)): + raw = json.dumps(raw, ensure_ascii=False) + + parsed = _try_parse_json(raw) + if not isinstance(parsed, dict): + return { + "success": False, + "output": "Department head returned non-JSON output", + "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, + } + + # Blocked status from dept head + if parsed.get("status") == "blocked": + reason = parsed.get("blocked_reason", "Department head reported blocked") + return { + "success": False, + "output": json.dumps(parsed, ensure_ascii=False), + "blocked": True, + "blocked_reason": reason, + "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, + } + + sub_pipeline = parsed.get("sub_pipeline", []) + if not isinstance(sub_pipeline, list) or not sub_pipeline: + return { + "success": False, + "output": "Department head returned empty or invalid sub_pipeline", + "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, + } + + # Recursion guard: no department head roles allowed in sub_pipeline + for sub_step in sub_pipeline: + if isinstance(sub_step, dict) and _is_department_head(str(sub_step.get("role", ""))): + return { + "success": False, + "output": f"Recursion blocked: sub_pipeline contains _head role '{sub_step['role']}'", + "cost_usd": 0, "tokens_used": 0, "duration_seconds": 0, + } + + role = step["role"] + dept_name = role.replace("_head", "") + + # Create child pipeline in DB + child_pipeline = models.create_pipeline( + conn, task_id, project_id, + route_type="dept_sub", + steps=sub_pipeline, + parent_pipeline_id=parent_pipeline_id, + department=dept_name, + ) + + # Build initial context for workers: dept head's plan + artifacts + dept_plan_context = json.dumps({ + "department_head_plan": { + "department": dept_name, + "artifacts": parsed.get("artifacts", {}), + "handoff_notes": parsed.get("handoff_notes", ""), + }, + }, ensure_ascii=False) + + # Run the sub-pipeline (noninteractive=True — Opus already reviewed the plan) + sub_result = run_pipeline( + conn, task_id, sub_pipeline, + dry_run=False, + allow_write=allow_write, + noninteractive=True, + initial_previous_output=dept_plan_context, + ) + + # Extract decisions from sub-pipeline results for handoff + decisions_made = [] + sub_results = sub_result.get("results", []) + for sr in sub_results: + output = sr.get("output") or sr.get("raw_output") or "" + if isinstance(output, str): + try: + output = json.loads(output) + except (json.JSONDecodeError, ValueError): + pass + if isinstance(output, dict): + # Reviewer/tester may include decisions or findings + for key in ("decisions", "findings", "recommendations"): + val = output.get(key) + if isinstance(val, list): + decisions_made.extend(val) + elif isinstance(val, str) and val: + decisions_made.append(val) + + # Determine last worker role for auto_complete tracking + last_sub_role = sub_pipeline[-1].get("role", "") if sub_pipeline else "" + + # Save handoff for inter-department context + handoff_status = "done" if sub_result.get("success") else "partial" + try: + models.create_handoff( + conn, + pipeline_id=parent_pipeline_id or child_pipeline["id"], + task_id=task_id, + from_department=dept_name, + to_department=next_department, + artifacts=parsed.get("artifacts", {}), + decisions_made=decisions_made, + blockers=[], + status=handoff_status, + ) + except Exception: + pass # Handoff save errors must never block pipeline + + # Build summary output for the next pipeline step + summary = { + "from_department": dept_name, + "handoff_notes": parsed.get("handoff_notes", ""), + "artifacts": parsed.get("artifacts", {}), + "sub_pipeline_summary": { + "steps_completed": sub_result.get("steps_completed", 0), + "success": sub_result.get("success", False), + }, + } + + return { + "success": sub_result.get("success", False), + "output": json.dumps(summary, ensure_ascii=False), + "cost_usd": sub_result.get("total_cost_usd", 0), + "tokens_used": sub_result.get("total_tokens", 0), + "duration_seconds": sub_result.get("total_duration_seconds", 0), + "last_sub_role": last_sub_role, + } + + # --------------------------------------------------------------------------- # Pipeline executor # --------------------------------------------------------------------------- @@ -972,6 +1177,7 @@ def run_pipeline( dry_run: bool = False, allow_write: bool = False, noninteractive: bool = False, + initial_previous_output: str | None = None, ) -> dict: """Execute a multi-step pipeline of agents. @@ -980,6 +1186,9 @@ def run_pipeline( {"role": "tester", "depends_on": "debugger", "brief": "..."}, ] + initial_previous_output: context injected as previous_output for the first step + (used by dept head sub-pipelines to pass artifacts/plan to workers). + Returns {success, steps_completed, total_cost, total_tokens, total_duration, results} """ # Auth check — skip for dry_run (dry_run never calls claude CLI) @@ -1020,7 +1229,8 @@ def run_pipeline( total_cost = 0.0 total_tokens = 0 total_duration = 0 - previous_output = None + previous_output = initial_previous_output + _last_sub_role = None # Track last worker role from dept sub-pipelines (for auto_complete) for i, step in enumerate(steps): role = step["role"] @@ -1283,6 +1493,62 @@ def run_pipeline( except Exception: pass # Never block pipeline on decomposer save errors + # Department head: execute sub-pipeline planned by the dept head + if _is_department_head(role) and result["success"] and not dry_run: + # Determine next department for handoff routing + _next_dept = None + if i + 1 < len(steps): + _next_role = steps[i + 1].get("role", "") + if _is_department_head(_next_role): + _next_dept = _next_role.replace("_head", "") + dept_result = _execute_department_head_step( + conn, task_id, project_id, + parent_pipeline_id=pipeline["id"] if pipeline else None, + step=step, + dept_head_result=result, + allow_write=allow_write, + noninteractive=noninteractive, + next_department=_next_dept, + ) + # Accumulate sub-pipeline costs + total_cost += dept_result.get("cost_usd") or 0 + total_tokens += dept_result.get("tokens_used") or 0 + total_duration += dept_result.get("duration_seconds") or 0 + + if not dept_result.get("success"): + # Sub-pipeline failed — handle as blocked + results.append({"role": role, "_dept_sub": True, **dept_result}) + if pipeline: + models.update_pipeline( + conn, pipeline["id"], + status="failed", + total_cost_usd=total_cost, + total_tokens=total_tokens, + total_duration_seconds=total_duration, + ) + error_msg = f"Department {role} sub-pipeline failed" + models.update_task(conn, task_id, status="blocked", blocked_reason=error_msg) + return { + "success": False, + "error": error_msg, + "steps_completed": i, + "results": results, + "total_cost_usd": total_cost, + "total_tokens": total_tokens, + "total_duration_seconds": total_duration, + "pipeline_id": pipeline["id"] if pipeline else None, + } + + # Track last worker role from sub-pipeline for auto_complete eligibility + if dept_result.get("last_sub_role"): + _last_sub_role = dept_result["last_sub_role"] + + # Override previous_output with dept handoff summary (not raw dept head JSON) + previous_output = dept_result.get("output") + if isinstance(previous_output, (dict, list)): + previous_output = json.dumps(previous_output, ensure_ascii=False) + continue + # Project-level auto-test: run `make test` after backend_dev/frontend_dev steps. # Enabled per project via auto_test_enabled flag (opt-in). # On failure, loop fixer up to KIN_AUTO_TEST_MAX_ATTEMPTS times, then block. @@ -1433,7 +1699,9 @@ def run_pipeline( changed_files = _get_changed_files(str(p_path)) last_role = steps[-1].get("role", "") if steps else "" - auto_eligible = last_role in {"tester", "reviewer"} + # For dept pipelines: if last step is a _head, check the last worker in its sub-pipeline + effective_last_role = _last_sub_role if (_is_department_head(last_role) and _last_sub_role) else last_role + auto_eligible = effective_last_role in {"tester", "reviewer"} # Guard: re-fetch current status — user may have manually changed it while pipeline ran current_task = models.get_task(conn, task_id) diff --git a/agents/specialists.yaml b/agents/specialists.yaml index 6c8f07c..e3c49eb 100644 --- a/agents/specialists.yaml +++ b/agents/specialists.yaml @@ -151,6 +151,126 @@ specialists: output_schema: tasks: "array of { title, brief, priority, category, acceptance_criteria }" + # Department heads — Opus-level coordinators that plan work within their department + # and spawn internal sub-pipelines of Sonnet workers. + + backend_head: + name: "Backend Department Head" + model: opus + execution_type: department_head + department: backend + tools: [Read, Grep, Glob] + description: "Plans backend work, coordinates architect/backend_dev/tester within backend department" + permissions: read_only + context_rules: + decisions: all + modules: all + + frontend_head: + name: "Frontend Department Head" + model: opus + execution_type: department_head + department: frontend + tools: [Read, Grep, Glob] + description: "Plans frontend work, coordinates frontend_dev/tester within frontend department" + permissions: read_only + context_rules: + decisions: all + modules: all + + qa_head: + name: "QA Department Head" + model: opus + execution_type: department_head + department: qa + tools: [Read, Grep, Glob] + description: "Plans QA work, coordinates tester/reviewer within QA department" + permissions: read_only + context_rules: + decisions: all + + security_head: + name: "Security Department Head" + model: opus + execution_type: department_head + department: security + tools: [Read, Grep, Glob] + description: "Plans security work, coordinates security engineer within security department" + permissions: read_only + context_rules: + decisions_category: security + + infra_head: + name: "Infrastructure Department Head" + model: opus + execution_type: department_head + department: infra + tools: [Read, Grep, Glob] + description: "Plans infrastructure work, coordinates sysadmin/debugger within infra department" + permissions: read_only + context_rules: + decisions: all + + research_head: + name: "Research Department Head" + model: opus + execution_type: department_head + department: research + tools: [Read, Grep, Glob] + description: "Plans research work, coordinates tech_researcher/architect within research department" + permissions: read_only + context_rules: + decisions: all + + marketing_head: + name: "Marketing Department Head" + model: opus + execution_type: department_head + department: marketing + tools: [Read, Grep, Glob] + description: "Plans marketing work, coordinates tech_researcher/spec within marketing department" + permissions: read_only + context_rules: + decisions: all + modules: all + +# Departments — PM uses these when routing complex cross-domain tasks to department heads +departments: + backend: + head: backend_head + workers: [architect, backend_dev, tester, reviewer] + description: "Backend development: API, database, business logic" + + frontend: + head: frontend_head + workers: [frontend_dev, tester, reviewer] + description: "Frontend development: Vue, CSS, components, composables" + + qa: + head: qa_head + workers: [tester, reviewer] + description: "Quality assurance: testing and code review" + + security: + head: security_head + workers: [security, reviewer] + description: "Security: OWASP audit, vulnerability analysis, remediation" + + infra: + head: infra_head + workers: [sysadmin, debugger, reviewer] + description: "Infrastructure: DevOps, deployment, server management" + + research: + head: research_head + workers: [tech_researcher, architect] + description: "Technical research and architecture planning" + + marketing: + head: marketing_head + workers: [tech_researcher, spec] + description: "Marketing: market research, positioning, content strategy, SEO" + # Route templates — PM uses these to build pipelines routes: debug: @@ -188,3 +308,27 @@ routes: spec_driven: steps: [constitution, spec, architect, task_decomposer] description: "Constitution → spec → implementation plan → decompose into tasks" + + dept_feature: + steps: [backend_head, frontend_head, qa_head] + description: "Full-stack feature: backend dept → frontend dept → QA dept" + + dept_fullstack: + steps: [backend_head, frontend_head] + description: "Full-stack feature without dedicated QA pass" + + dept_security_audit: + steps: [security_head, qa_head] + description: "Security audit followed by QA verification" + + dept_backend: + steps: [backend_head] + description: "Backend-only task routed through department head" + + dept_frontend: + steps: [frontend_head] + description: "Frontend-only task routed through department head" + + dept_marketing: + steps: [marketing_head] + description: "Marketing task routed through department head" diff --git a/core/context_builder.py b/core/context_builder.py index 88e6c9d..a7215c2 100644 --- a/core/context_builder.py +++ b/core/context_builder.py @@ -65,9 +65,11 @@ def build_context( specs = _load_specialists() ctx["available_specialists"] = list(specs.get("specialists", {}).keys()) ctx["routes"] = specs.get("routes", {}) + ctx["departments"] = specs.get("departments", {}) except Exception: ctx["available_specialists"] = [] ctx["routes"] = {} + ctx["departments"] = {} elif role == "architect": ctx["modules"] = models.get_modules(conn, project_id) @@ -111,6 +113,41 @@ def build_context( conn, project_id, category="security", ) + elif role.endswith("_head"): + # Department head: load department config and previous handoff + ctx["decisions"] = models.get_decisions(conn, project_id) + ctx["modules"] = models.get_modules(conn, project_id) + try: + specs = _load_specialists() + all_specs = specs.get("specialists", {}) + departments = specs.get("departments", {}) + spec = all_specs.get(role, {}) + dept_name = spec.get("department", "") + dept_info = departments.get(dept_name, {}) + ctx["department"] = dept_name + ctx["department_workers"] = dept_info.get("workers", []) + ctx["department_description"] = dept_info.get("description", "") + except Exception: + ctx["department"] = "" + ctx["department_workers"] = [] + ctx["department_description"] = "" + # Previous handoff from another department (if any) + try: + dept = ctx.get("department") + last_handoff = models.get_last_handoff(conn, task_id, to_department=dept) + # Fallback: get latest handoff NOT from our own department + # (avoids picking up our own outgoing handoff) + if not last_handoff and dept: + all_handoffs = models.get_handoffs_for_task(conn, task_id) + for h in reversed(all_handoffs): + if h.get("from_department") != dept: + last_handoff = h + break + if last_handoff: + ctx["incoming_handoff"] = last_handoff + except Exception: + pass + else: # Unknown role — give decisions as fallback ctx["decisions"] = models.get_decisions(conn, project_id, limit=20) @@ -175,6 +212,13 @@ def format_prompt(context: dict, role: str, prompt_template: str | None = None) prompt_path = PROMPTS_DIR / f"{role}.md" if prompt_path.exists(): prompt_template = prompt_path.read_text() + elif role.endswith("_head"): + # Fallback: all department heads share the base department_head.md prompt + dept_head_path = PROMPTS_DIR / "department_head.md" + if dept_head_path.exists(): + prompt_template = dept_head_path.read_text() + else: + prompt_template = f"You are a {role}. Complete the task described below." else: prompt_template = f"You are a {role}. Complete the task described below." @@ -265,6 +309,22 @@ def format_prompt(context: dict, role: str, prompt_template: str | None = None) sections.append(f"- {name}: {steps}") sections.append("") + # Department context (department heads) + dept = context.get("department") + if dept: + dept_desc = context.get("department_description", "") + sections.append(f"## Department: {dept}" + (f" — {dept_desc}" if dept_desc else "")) + sections.append("") + dept_workers = context.get("department_workers") + if dept_workers: + sections.append(f"## Department workers: {', '.join(dept_workers)}") + sections.append("") + incoming_handoff = context.get("incoming_handoff") + if incoming_handoff: + sections.append("## Incoming handoff from previous department:") + sections.append(json.dumps(incoming_handoff, ensure_ascii=False)) + sections.append("") + # Module hint (debugger) hint = context.get("module_hint") if hint: diff --git a/core/db.py b/core/db.py index 8bbb5f1..23413c4 100644 --- a/core/db.py +++ b/core/db.py @@ -140,10 +140,29 @@ CREATE TABLE IF NOT EXISTS pipelines ( total_cost_usd REAL, total_tokens INTEGER, total_duration_seconds INTEGER, + parent_pipeline_id INTEGER REFERENCES pipelines(id), + department TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, completed_at DATETIME ); +-- Межотдельные handoff-ы (KIN-098) +CREATE TABLE IF NOT EXISTS department_handoffs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline_id INTEGER NOT NULL REFERENCES pipelines(id), + task_id TEXT NOT NULL REFERENCES tasks(id), + from_department TEXT NOT NULL, + to_department TEXT, + artifacts JSON, + decisions_made JSON, + blockers JSON, + status TEXT DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_handoffs_pipeline ON department_handoffs(pipeline_id); +CREATE INDEX IF NOT EXISTS idx_handoffs_task ON department_handoffs(task_id); + -- Post-pipeline хуки CREATE TABLE IF NOT EXISTS hooks ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -591,6 +610,35 @@ def _migrate(conn: sqlite3.Connection): """) conn.commit() + # Migrate pipelines: add parent_pipeline_id and department columns (KIN-098) + pipeline_cols = {r[1] for r in conn.execute("PRAGMA table_info(pipelines)").fetchall()} + if "parent_pipeline_id" not in pipeline_cols: + conn.execute("ALTER TABLE pipelines ADD COLUMN parent_pipeline_id INTEGER REFERENCES pipelines(id)") + conn.commit() + if "department" not in pipeline_cols: + conn.execute("ALTER TABLE pipelines ADD COLUMN department TEXT") + conn.commit() + + # Create department_handoffs table (KIN-098) + if "department_handoffs" not in existing_tables: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS department_handoffs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline_id INTEGER NOT NULL REFERENCES pipelines(id), + task_id TEXT NOT NULL REFERENCES tasks(id), + from_department TEXT NOT NULL, + to_department TEXT, + artifacts JSON, + decisions_made JSON, + blockers JSON, + status TEXT DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_handoffs_pipeline ON department_handoffs(pipeline_id); + CREATE INDEX IF NOT EXISTS idx_handoffs_task ON department_handoffs(task_id); + """) + conn.commit() + # Rename legacy 'auto' → 'auto_complete' (KIN-063) conn.execute( "UPDATE projects SET execution_mode = 'auto_complete' WHERE execution_mode = 'auto'" diff --git a/core/models.py b/core/models.py index e06cf9b..dfad33d 100644 --- a/core/models.py +++ b/core/models.py @@ -473,12 +473,14 @@ def create_pipeline( project_id: str, route_type: str, steps: list | dict, + parent_pipeline_id: int | None = None, + department: str | None = None, ) -> dict: """Create a new pipeline run.""" cur = conn.execute( - """INSERT INTO pipelines (task_id, project_id, route_type, steps) - VALUES (?, ?, ?, ?)""", - (task_id, project_id, route_type, _json_encode(steps)), + """INSERT INTO pipelines (task_id, project_id, route_type, steps, parent_pipeline_id, department) + VALUES (?, ?, ?, ?, ?, ?)""", + (task_id, project_id, route_type, _json_encode(steps), parent_pipeline_id, department), ) conn.commit() row = conn.execute( @@ -923,6 +925,68 @@ def delete_attachment(conn: sqlite3.Connection, attachment_id: int) -> bool: return cur.rowcount > 0 +# --------------------------------------------------------------------------- +# Department Handoffs (KIN-098) +# --------------------------------------------------------------------------- + +def create_handoff( + conn: sqlite3.Connection, + pipeline_id: int, + task_id: str, + from_department: str, + to_department: str | None = None, + artifacts: dict | None = None, + decisions_made: list | None = None, + blockers: list | None = None, + status: str = "pending", +) -> dict: + """Record a department handoff with artifacts for inter-department context.""" + cur = conn.execute( + """INSERT INTO department_handoffs + (pipeline_id, task_id, from_department, to_department, artifacts, decisions_made, blockers, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + (pipeline_id, task_id, from_department, to_department, + _json_encode(artifacts), _json_encode(decisions_made), _json_encode(blockers), status), + ) + conn.commit() + row = conn.execute( + "SELECT * FROM department_handoffs WHERE id = ?", (cur.lastrowid,) + ).fetchone() + return _row_to_dict(row) + + +def get_handoffs_for_task(conn: sqlite3.Connection, task_id: str) -> list[dict]: + """Get all handoffs for a task ordered by creation time.""" + rows = conn.execute( + "SELECT * FROM department_handoffs WHERE task_id = ? ORDER BY created_at", + (task_id,), + ).fetchall() + return _rows_to_list(rows) + + +def get_last_handoff( + conn: sqlite3.Connection, + task_id: str, + to_department: str | None = None, +) -> dict | None: + """Get the most recent handoff for a task, optionally filtered by destination department.""" + if to_department: + row = conn.execute( + """SELECT * FROM department_handoffs + WHERE task_id = ? AND to_department = ? + ORDER BY created_at DESC LIMIT 1""", + (task_id, to_department), + ).fetchone() + else: + row = conn.execute( + """SELECT * FROM department_handoffs + WHERE task_id = ? + ORDER BY created_at DESC LIMIT 1""", + (task_id,), + ).fetchone() + return _row_to_dict(row) + + def get_chat_messages( conn: sqlite3.Connection, project_id: str, diff --git a/tests/test_department_heads.py b/tests/test_department_heads.py new file mode 100644 index 0000000..d797853 --- /dev/null +++ b/tests/test_department_heads.py @@ -0,0 +1,929 @@ +"""Tests for KIN-098: Three-level hierarchy — PM → department heads → workers. + +Covers: +- _execute_department_head_step() logic +- Sub-pipeline creation and execution +- Recursion guard (no _head roles in sub-pipeline) +- Blocked status propagation +- Inter-department handoff via DB (to_department routing) +- Context builder: department workers, description, incoming handoff +- Full cycle: PM → backend_head → workers → handoff → frontend_head → workers +- Artifacts passed to sub-pipeline workers via initial_previous_output +- decisions_made extraction from sub-pipeline results +- auto_complete eligibility for dept pipelines +- _is_department_head() with execution_type from YAML +- format_prompt fallback to department_head.md for _head roles +""" + +import json +import pytest +from unittest.mock import patch, MagicMock + +from core.db import init_db +from core import models +from core.context_builder import build_context, format_prompt +from agents.runner import run_pipeline, _execute_department_head_step, _is_department_head + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def conn(): + c = init_db(":memory:") + models.create_project(c, "proj", "TestProject", "~/projects/test", + tech_stack=["python", "vue3"]) + models.create_task(c, "PROJ-001", "proj", "Full-stack feature", + brief={"route_type": "dept_feature"}) + yield c + c.close() + + +def _mock_claude_success(output_data): + mock = MagicMock() + mock.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data + mock.stderr = "" + mock.returncode = 0 + return mock + + +def _mock_claude_failure(error_msg="error"): + mock = MagicMock() + mock.stdout = "" + mock.stderr = error_msg + mock.returncode = 1 + return mock + + +# Valid department head output +def _dept_head_output(sub_pipeline, artifacts=None, handoff_notes="", status="done"): + return { + "status": status, + "sub_pipeline": sub_pipeline, + "artifacts": artifacts or {"files_changed": ["api.py"], "notes": "done"}, + "handoff_notes": handoff_notes, + } + + +# --------------------------------------------------------------------------- +# _is_department_head — execution_type detection +# --------------------------------------------------------------------------- + +class TestIsDepartmentHead: + """Tests for _is_department_head() helper.""" + + def test_known_heads_detected(self): + """All _head roles from specialists.yaml are detected.""" + # Reset cache to force reload + import agents.runner as runner + runner._DEPT_HEAD_ROLES = None + + assert _is_department_head("backend_head") is True + assert _is_department_head("frontend_head") is True + assert _is_department_head("qa_head") is True + assert _is_department_head("security_head") is True + assert _is_department_head("infra_head") is True + assert _is_department_head("research_head") is True + assert _is_department_head("marketing_head") is True + + def test_non_heads_not_detected(self): + """Regular roles are not department heads.""" + import agents.runner as runner + runner._DEPT_HEAD_ROLES = None + + assert _is_department_head("backend_dev") is False + assert _is_department_head("tester") is False + assert _is_department_head("pm") is False + assert _is_department_head("reviewer") is False + + def test_suffix_fallback(self): + """Unknown _head roles detected via suffix fallback.""" + import agents.runner as runner + runner._DEPT_HEAD_ROLES = None + + assert _is_department_head("custom_head") is True + + +# --------------------------------------------------------------------------- +# _execute_department_head_step — unit tests +# --------------------------------------------------------------------------- + +class TestExecuteDepartmentHeadStep: + """Unit tests for _execute_department_head_step().""" + + @patch("agents.runner.subprocess.run") + def test_valid_sub_pipeline_creates_child_and_runs(self, mock_run, conn): + """Dept head returns valid sub_pipeline → child pipeline created, workers executed.""" + # Workers succeed + mock_run.return_value = _mock_claude_success({"result": "implemented"}) + + dept_output = _dept_head_output( + sub_pipeline=[ + {"role": "backend_dev", "model": "sonnet", "brief": "Implement API"}, + {"role": "tester", "model": "sonnet", "brief": "Test API"}, + ], + artifacts={"files_changed": ["api.py"]}, + handoff_notes="API ready for frontend", + ) + + # Create parent pipeline first + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(dept_output)}, + next_department="frontend", + ) + + assert result["success"] is True + # Verify output contains handoff summary + output = json.loads(result["output"]) + assert output["from_department"] == "backend" + assert output["handoff_notes"] == "API ready for frontend" + assert output["artifacts"]["files_changed"] == ["api.py"] + + # Verify child pipeline in DB + pipes = conn.execute( + "SELECT * FROM pipelines WHERE route_type='dept_sub'" + ).fetchall() + assert len(pipes) == 1 + child = dict(pipes[0]) + assert child["department"] == "backend" + assert child["parent_pipeline_id"] == pipeline["id"] + + # Verify handoff record in DB + handoffs = models.get_handoffs_for_task(conn, "PROJ-001") + assert len(handoffs) == 1 + assert handoffs[0]["from_department"] == "backend" + assert handoffs[0]["to_department"] == "frontend" + assert handoffs[0]["status"] == "done" + + def test_non_json_output_returns_error(self, conn): + """Dept head returns non-JSON → error, no sub-pipeline created.""" + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": "This is not JSON at all"}, + ) + + assert result["success"] is False + assert "non-JSON" in result["output"] + + def test_blocked_status_returns_failure(self, conn): + """Dept head returns status=blocked → failure with blocked info.""" + blocked_output = { + "status": "blocked", + "blocked_reason": "Missing database schema specification", + "blocked_at": "2026-03-17T12:00:00", + } + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(blocked_output)}, + ) + + assert result["success"] is False + assert result.get("blocked") is True + assert "Missing database schema" in result["blocked_reason"] + + def test_empty_sub_pipeline_returns_error(self, conn): + """Dept head returns empty sub_pipeline → error.""" + output = {"status": "done", "sub_pipeline": [], "artifacts": {}, "handoff_notes": ""} + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(output)}, + ) + + assert result["success"] is False + assert "empty or invalid" in result["output"] + + def test_recursion_guard_blocks_head_roles(self, conn): + """Sub-pipeline containing _head role → recursion blocked.""" + output = _dept_head_output( + sub_pipeline=[ + {"role": "frontend_head", "model": "opus", "brief": "Delegate to frontend"}, + ], + ) + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(output)}, + ) + + assert result["success"] is False + assert "Recursion blocked" in result["output"] + assert "frontend_head" in result["output"] + + @patch("agents.runner.subprocess.run") + def test_sub_pipeline_failure_propagates(self, mock_run, conn): + """If a worker in sub-pipeline fails → dept step fails.""" + mock_run.return_value = _mock_claude_failure("compilation error") + + output = _dept_head_output( + sub_pipeline=[{"role": "backend_dev", "model": "sonnet", "brief": "Implement"}], + ) + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(output)}, + ) + + assert result["success"] is False + # Handoff saved with status=partial + handoffs = models.get_handoffs_for_task(conn, "PROJ-001") + assert len(handoffs) == 1 + assert handoffs[0]["status"] == "partial" + + @patch("agents.runner.subprocess.run") + def test_cost_tokens_duration_aggregated(self, mock_run, conn): + """Sub-pipeline cost/tokens/duration are returned for aggregation.""" + mock_run.return_value = _mock_claude_success({"result": "done"}) + + output = _dept_head_output( + sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], + ) + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(output)}, + ) + + assert result["success"] is True + assert "cost_usd" in result + assert "tokens_used" in result + assert "duration_seconds" in result + + @patch("agents.runner.subprocess.run") + def test_next_department_none_when_last_step(self, mock_run, conn): + """When no next dept head step, to_department should be None.""" + mock_run.return_value = _mock_claude_success({"result": "done"}) + + output = _dept_head_output( + sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], + ) + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_backend", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(output)}, + next_department=None, + ) + + assert result["success"] is True + handoffs = models.get_handoffs_for_task(conn, "PROJ-001") + assert len(handoffs) == 1 + assert handoffs[0]["to_department"] is None + + @patch("agents.runner.subprocess.run") + def test_artifacts_passed_to_first_worker(self, mock_run, conn): + """Dept head artifacts are passed as initial_previous_output to first worker.""" + prompts_seen = [] + + def side_effect(*args, **kwargs): + cmd = args[0] + for i, arg in enumerate(cmd): + if arg == "-p" and i + 1 < len(cmd): + prompts_seen.append(cmd[i + 1]) + break + return _mock_claude_success({"result": "done"}) + + mock_run.side_effect = side_effect + + output = _dept_head_output( + sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], + artifacts={"files_changed": ["api.py"], "unique_marker": "DEPT_ARTIFACTS_123"}, + handoff_notes="Build the API using FastAPI", + ) + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(output)}, + ) + + assert result["success"] is True + # First worker's prompt should contain dept head artifacts + assert len(prompts_seen) >= 1 + first_worker_prompt = prompts_seen[0] + assert "DEPT_ARTIFACTS_123" in first_worker_prompt or "department_head_plan" in first_worker_prompt + + @patch("agents.runner.subprocess.run") + def test_last_sub_role_returned(self, mock_run, conn): + """Dept head result includes last_sub_role for auto_complete tracking.""" + mock_run.return_value = _mock_claude_success({"result": "done"}) + + output = _dept_head_output( + sub_pipeline=[ + {"role": "backend_dev", "brief": "Implement"}, + {"role": "reviewer", "brief": "Review"}, + ], + ) + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(output)}, + ) + + assert result["success"] is True + assert result["last_sub_role"] == "reviewer" + + @patch("agents.runner.subprocess.run") + def test_decisions_extracted_from_sub_results(self, mock_run, conn): + """Decisions from worker output are collected into handoff decisions_made.""" + call_count = [0] + + def side_effect(*args, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + return _mock_claude_success({"result": "done"}) + elif call_count[0] == 2: + # Reviewer returns decisions + return _mock_claude_success({ + "result": "reviewed", + "decisions": ["Use FastAPI instead of Flask", "Add rate limiting"], + "findings": ["Missing input validation on POST /api/feature"], + }) + return _mock_claude_success({"result": "fallback"}) + + mock_run.side_effect = side_effect + + output = _dept_head_output( + sub_pipeline=[ + {"role": "backend_dev", "brief": "Implement"}, + {"role": "reviewer", "brief": "Review"}, + ], + ) + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + + result = _execute_department_head_step( + conn, "PROJ-001", "proj", + parent_pipeline_id=pipeline["id"], + step={"role": "backend_head", "brief": "Do backend"}, + dept_head_result={"raw_output": json.dumps(output)}, + ) + + assert result["success"] is True + # Verify decisions_made in handoff + handoffs = models.get_handoffs_for_task(conn, "PROJ-001") + assert len(handoffs) == 1 + decisions = handoffs[0]["decisions_made"] + if isinstance(decisions, str): + decisions = json.loads(decisions) + assert len(decisions) >= 2 + assert "Use FastAPI instead of Flask" in decisions + + +# --------------------------------------------------------------------------- +# Inter-department handoff routing +# --------------------------------------------------------------------------- + +class TestHandoffRouting: + """Tests for to_department routing in create_handoff/get_last_handoff.""" + + def test_handoff_with_to_department_found_by_filter(self, conn): + """Handoff with to_department set → found when filtering by that department.""" + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}]) + models.create_handoff( + conn, pipeline["id"], "PROJ-001", + from_department="backend", + to_department="frontend", + artifacts={"files_changed": ["api.py"]}, + status="done", + ) + + result = models.get_last_handoff(conn, "PROJ-001", to_department="frontend") + assert result is not None + assert result["from_department"] == "backend" + assert result["to_department"] == "frontend" + + def test_handoff_without_to_department_found_without_filter(self, conn): + """Handoff with to_department=None → found when no filter applied.""" + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_backend", + [{"role": "backend_head"}]) + models.create_handoff( + conn, pipeline["id"], "PROJ-001", + from_department="backend", + artifacts={"notes": "done"}, + status="done", + ) + + # Without filter — found + result = models.get_last_handoff(conn, "PROJ-001") + assert result is not None + assert result["from_department"] == "backend" + + # With filter — not found (to_department is NULL, filter is "frontend") + result = models.get_last_handoff(conn, "PROJ-001", to_department="frontend") + assert result is None + + def test_multiple_handoffs_returns_latest(self, conn): + """Multiple handoffs → get_last_handoff returns the most recent.""" + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}, {"role": "frontend_head"}]) + models.create_handoff( + conn, pipeline["id"], "PROJ-001", + from_department="backend", to_department="frontend", + artifacts={"notes": "first"}, status="done", + ) + models.create_handoff( + conn, pipeline["id"], "PROJ-001", + from_department="frontend", to_department="qa", + artifacts={"notes": "second"}, status="done", + ) + + # Latest without filter + result = models.get_last_handoff(conn, "PROJ-001") + assert result["from_department"] == "frontend" + + # Filter by specific target + result = models.get_last_handoff(conn, "PROJ-001", to_department="frontend") + assert result["from_department"] == "backend" + + +# --------------------------------------------------------------------------- +# Context builder — department context injection +# --------------------------------------------------------------------------- + +class TestDepartmentContext: + """Tests for context_builder department head context injection.""" + + def test_dept_head_receives_department_info(self, conn): + """Department head gets department name, workers, and description.""" + ctx = build_context(conn, "PROJ-001", "backend_head", "proj") + + assert ctx["department"] == "backend" + assert "backend_dev" in ctx["department_workers"] + assert "architect" in ctx["department_workers"] + assert "tester" in ctx["department_workers"] + assert "reviewer" in ctx["department_workers"] + assert "Backend development" in ctx["department_description"] + + def test_dept_head_receives_incoming_handoff(self, conn): + """If previous department left a handoff, next dept head sees it.""" + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}, {"role": "frontend_head"}]) + models.create_handoff( + conn, pipeline["id"], "PROJ-001", + from_department="backend", + to_department="frontend", + artifacts={"files_changed": ["api.py"], "notes": "API ready"}, + status="done", + ) + + ctx = build_context(conn, "PROJ-001", "frontend_head", "proj") + assert "incoming_handoff" in ctx + assert ctx["incoming_handoff"]["from_department"] == "backend" + assert ctx["incoming_handoff"]["to_department"] == "frontend" + + def test_dept_head_fallback_handoff_from_different_dept(self, conn): + """Handoff with to_department=NULL is found via fallback (from different dept).""" + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}, {"role": "frontend_head"}]) + # Old-style handoff without to_department (before bugfix) + models.create_handoff( + conn, pipeline["id"], "PROJ-001", + from_department="backend", + artifacts={"notes": "API ready"}, + status="done", + ) + + ctx = build_context(conn, "PROJ-001", "frontend_head", "proj") + # Fallback: should still find the handoff (from different dept) + assert "incoming_handoff" in ctx + assert ctx["incoming_handoff"]["from_department"] == "backend" + + def test_dept_head_fallback_ignores_own_dept_handoff(self, conn): + """Fallback should NOT pick up handoff FROM our own department.""" + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_backend", + [{"role": "backend_head"}]) + # Only handoff is from backend itself — should not be picked up + models.create_handoff( + conn, pipeline["id"], "PROJ-001", + from_department="backend", + artifacts={"notes": "done"}, + status="done", + ) + + ctx = build_context(conn, "PROJ-001", "backend_head", "proj") + # Should NOT find handoff from own department + assert "incoming_handoff" not in ctx + + def test_dept_head_no_handoff_when_first_in_chain(self, conn): + """First department head in chain has no incoming handoff.""" + ctx = build_context(conn, "PROJ-001", "backend_head", "proj") + assert "incoming_handoff" not in ctx + + def test_non_head_role_no_department_context(self, conn): + """Regular specialist doesn't get department context.""" + ctx = build_context(conn, "PROJ-001", "backend_dev", "proj") + assert "department" not in ctx + assert "department_workers" not in ctx + + def test_department_context_in_formatted_prompt(self, conn): + """Department info appears in formatted prompt string.""" + pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", + [{"role": "backend_head"}, {"role": "frontend_head"}]) + models.create_handoff( + conn, pipeline["id"], "PROJ-001", + from_department="backend", to_department="frontend", + artifacts={"files_changed": ["api.py"]}, + status="done", + ) + + ctx = build_context(conn, "PROJ-001", "frontend_head", "proj") + prompt = format_prompt(ctx, "frontend_head") + + assert "Department: frontend" in prompt + assert "Department workers:" in prompt + assert "frontend_dev" in prompt + assert "Incoming handoff from previous department:" in prompt + assert "api.py" in prompt + + def test_format_prompt_uses_department_head_md(self, conn): + """format_prompt for _head roles falls back to department_head.md.""" + ctx = build_context(conn, "PROJ-001", "backend_head", "proj") + prompt = format_prompt(ctx, "backend_head") + + # Should contain department_head.md content, not generic fallback + assert "Department Head" in prompt + assert "sub_pipeline" in prompt + + +# --------------------------------------------------------------------------- +# Full cycle smoke test: PM → dept_head → workers → handoff → next dept +# --------------------------------------------------------------------------- + +class TestFullDepartmentCycle: + """Integration test: multi-department pipeline with handoff.""" + + @patch("agents.runner._run_autocommit") + @patch("agents.runner.subprocess.run") + def test_backend_then_frontend_full_cycle(self, mock_run, mock_autocommit, conn): + """PM routes to backend_head → frontend_head. + Each dept head spawns sub-pipeline, handoff passes between departments. + """ + call_count = [0] + + def side_effect(*args, **kwargs): + call_count[0] += 1 + # Call 1: backend_head (Opus) plans work + if call_count[0] == 1: + return _mock_claude_success(_dept_head_output( + sub_pipeline=[ + {"role": "backend_dev", "model": "sonnet", "brief": "Implement API"}, + {"role": "tester", "model": "sonnet", "brief": "Test API"}, + ], + artifacts={"files_changed": ["api.py", "models.py"], + "endpoints_added": ["POST /api/feature"]}, + handoff_notes="Backend API ready. POST /api/feature accepts {name, value}.", + )) + # Call 2: backend_dev executes + elif call_count[0] == 2: + return _mock_claude_success({"result": "API implemented"}) + # Call 3: tester in backend dept executes + elif call_count[0] == 3: + return _mock_claude_success({"result": "Tests pass"}) + # Call 4: frontend_head (Opus) plans work + elif call_count[0] == 4: + return _mock_claude_success(_dept_head_output( + sub_pipeline=[ + {"role": "frontend_dev", "model": "sonnet", + "brief": "Build UI calling POST /api/feature"}, + {"role": "tester", "model": "sonnet", "brief": "Test UI"}, + ], + artifacts={"files_changed": ["FeatureForm.vue"], + "notes": "UI calls backend API"}, + handoff_notes="Frontend complete.", + )) + # Call 5: frontend_dev executes + elif call_count[0] == 5: + return _mock_claude_success({"result": "UI built"}) + # Call 6: tester in frontend dept executes + elif call_count[0] == 6: + return _mock_claude_success({"result": "UI tests pass"}) + return _mock_claude_success({"result": "fallback"}) + + mock_run.side_effect = side_effect + + steps = [ + {"role": "backend_head", "model": "opus", "brief": "Implement backend"}, + {"role": "frontend_head", "model": "opus", "brief": "Implement frontend"}, + ] + + result = run_pipeline(conn, "PROJ-001", steps) + + # Pipeline succeeded + assert result["success"] is True + assert call_count[0] == 6 # 2 dept heads + 2+2 workers + + # Verify parent pipeline + parent_pipes = conn.execute( + "SELECT * FROM pipelines WHERE route_type='dept_feature'" + ).fetchall() + assert len(parent_pipes) == 1 + + # Verify child pipelines (2: backend + frontend) + child_pipes = conn.execute( + "SELECT * FROM pipelines WHERE route_type='dept_sub' ORDER BY id" + ).fetchall() + assert len(child_pipes) == 2 + assert dict(child_pipes[0])["department"] == "backend" + assert dict(child_pipes[1])["department"] == "frontend" + + # Verify handoff records (2: backend→frontend, frontend→None) + handoffs = models.get_handoffs_for_task(conn, "PROJ-001") + assert len(handoffs) == 2 + + # First handoff: backend → frontend + assert handoffs[0]["from_department"] == "backend" + assert handoffs[0]["to_department"] == "frontend" + assert handoffs[0]["status"] == "done" + artifacts = handoffs[0]["artifacts"] + if isinstance(artifacts, str): + artifacts = json.loads(artifacts) + assert "api.py" in artifacts["files_changed"] + + # Second handoff: frontend → None (last in chain) + assert handoffs[1]["from_department"] == "frontend" + assert handoffs[1]["to_department"] is None + + @patch("agents.runner._run_autocommit") + @patch("agents.runner.subprocess.run") + def test_first_dept_fails_blocks_pipeline(self, mock_run, mock_autocommit, conn): + """If first dept head's sub-pipeline fails → entire pipeline blocked.""" + call_count = [0] + + def side_effect(*args, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + # backend_head plans work + return _mock_claude_success(_dept_head_output( + sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], + )) + elif call_count[0] == 2: + # backend_dev fails + return _mock_claude_failure("compilation error") + return _mock_claude_success({"result": "should not reach"}) + + mock_run.side_effect = side_effect + + steps = [ + {"role": "backend_head", "model": "opus", "brief": "Do backend"}, + {"role": "frontend_head", "model": "opus", "brief": "Do frontend"}, + ] + + result = run_pipeline(conn, "PROJ-001", steps) + + assert result["success"] is False + assert "backend_head" in result["error"] + assert call_count[0] == 2 # frontend_head never called + + # Task is blocked + task = models.get_task(conn, "PROJ-001") + assert task["status"] == "blocked" + + @patch("agents.runner._run_autocommit") + @patch("agents.runner.subprocess.run") + def test_dept_head_blocked_blocks_pipeline(self, mock_run, mock_autocommit, conn): + """Dept head returns status=blocked → entire pipeline blocked.""" + mock_run.return_value = _mock_claude_success({ + "status": "blocked", + "blocked_reason": "No DB schema", + "blocked_at": "2026-03-17T12:00:00", + }) + + steps = [ + {"role": "backend_head", "model": "opus", "brief": "Do backend"}, + {"role": "frontend_head", "model": "opus", "brief": "Do frontend"}, + ] + + result = run_pipeline(conn, "PROJ-001", steps) + + assert result["success"] is False + assert result["steps_completed"] == 0 # dept head blocked at step 0 + + @patch("agents.runner._run_autocommit") + @patch("agents.runner.subprocess.run") + def test_frontend_head_prompt_contains_backend_handoff(self, mock_run, mock_autocommit, conn): + """Verify that frontend_head's prompt includes backend's handoff context.""" + prompts_seen = [] + call_count = [0] + + def side_effect(*args, **kwargs): + call_count[0] += 1 + cmd = args[0] + # Extract -p argument + for i, arg in enumerate(cmd): + if arg == "-p" and i + 1 < len(cmd): + prompts_seen.append(cmd[i + 1]) + break + + if call_count[0] == 1: + return _mock_claude_success(_dept_head_output( + sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], + artifacts={"unique_marker": "BACKEND_ARTIFACTS_XYZ"}, + handoff_notes="HANDOFF_NOTES_ABC", + )) + elif call_count[0] == 2: + return _mock_claude_success({"result": "done"}) + elif call_count[0] == 3: + # frontend_head — check its prompt + return _mock_claude_success(_dept_head_output( + sub_pipeline=[{"role": "frontend_dev", "brief": "Build UI"}], + )) + elif call_count[0] == 4: + return _mock_claude_success({"result": "done"}) + return _mock_claude_success({"result": "fallback"}) + + mock_run.side_effect = side_effect + + steps = [ + {"role": "backend_head", "model": "opus", "brief": "Backend"}, + {"role": "frontend_head", "model": "opus", "brief": "Frontend"}, + ] + + result = run_pipeline(conn, "PROJ-001", steps) + assert result["success"] is True + + # The frontend_head prompt (3rd call) should contain handoff from backend + assert len(prompts_seen) >= 3 + frontend_head_prompt = prompts_seen[2] + # The handoff summary is passed as previous_output in the prompt + assert "HANDOFF_NOTES_ABC" in frontend_head_prompt or \ + "BACKEND_ARTIFACTS_XYZ" in frontend_head_prompt or \ + "backend" in frontend_head_prompt.lower() + + @patch("agents.runner._run_autocommit") + @patch("agents.runner.subprocess.run") + def test_initial_previous_output_in_sub_pipeline(self, mock_run, mock_autocommit, conn): + """Workers in sub-pipeline receive dept head plan as initial context.""" + prompts_seen = [] + call_count = [0] + + def side_effect(*args, **kwargs): + call_count[0] += 1 + cmd = args[0] + for i, arg in enumerate(cmd): + if arg == "-p" and i + 1 < len(cmd): + prompts_seen.append(cmd[i + 1]) + break + + if call_count[0] == 1: + return _mock_claude_success(_dept_head_output( + sub_pipeline=[ + {"role": "backend_dev", "brief": "Implement"}, + {"role": "tester", "brief": "Test"}, + ], + artifacts={"files_changed": ["api.py"], "marker": "DEPT_PLAN_MARKER"}, + handoff_notes="Use FastAPI for the endpoint", + )) + return _mock_claude_success({"result": "done"}) + + mock_run.side_effect = side_effect + + steps = [{"role": "backend_head", "model": "opus", "brief": "Do backend"}] + result = run_pipeline(conn, "PROJ-001", steps) + + assert result["success"] is True + # Worker prompts (calls 2 and 3) should contain dept head plan context + assert len(prompts_seen) >= 2 + first_worker_prompt = prompts_seen[1] # prompts_seen[0] is backend_head + assert "department_head_plan" in first_worker_prompt or "DEPT_PLAN_MARKER" in first_worker_prompt + + +# --------------------------------------------------------------------------- +# YAML structure validation +# --------------------------------------------------------------------------- + +class TestSpecialistsYaml: + """Validate specialists.yaml department head structure.""" + + def test_all_department_heads_have_required_fields(self): + """Every _head specialist must have model=opus, execution_type=department_head, department.""" + import yaml + with open("agents/specialists.yaml") as f: + data = yaml.safe_load(f) + + specialists = data["specialists"] + heads = {k: v for k, v in specialists.items() if k.endswith("_head")} + + assert len(heads) >= 7, f"Expected >=7 dept heads, got {len(heads)}" + + for name, spec in heads.items(): + assert spec["model"] == "opus", f"{name} should use opus model" + assert spec["execution_type"] == "department_head", \ + f"{name} missing execution_type=department_head" + assert "department" in spec, f"{name} missing department field" + + def test_all_departments_have_head_and_workers(self): + """Every department must reference a valid head and have workers list.""" + import yaml + with open("agents/specialists.yaml") as f: + data = yaml.safe_load(f) + + departments = data["departments"] + specialists = data["specialists"] + + assert len(departments) >= 7 + + for dept_name, dept in departments.items(): + assert "head" in dept, f"Department '{dept_name}' missing 'head'" + assert "workers" in dept, f"Department '{dept_name}' missing 'workers'" + assert len(dept["workers"]) > 0, f"Department '{dept_name}' has no workers" + + # Head exists as specialist + head_role = dept["head"] + assert head_role in specialists, \ + f"Department '{dept_name}' head '{head_role}' not in specialists" + + # All workers exist as specialists + for worker in dept["workers"]: + assert worker in specialists, \ + f"Department '{dept_name}' worker '{worker}' not in specialists" + + def test_department_routes_exist(self): + """dept_* routes reference valid _head roles.""" + import yaml + with open("agents/specialists.yaml") as f: + data = yaml.safe_load(f) + + routes = data["routes"] + specialists = data["specialists"] + dept_routes = {k: v for k, v in routes.items() if k.startswith("dept_")} + + assert len(dept_routes) >= 6, f"Expected >=6 dept routes, got {len(dept_routes)}" + + for route_name, route in dept_routes.items(): + for step_role in route["steps"]: + assert step_role in specialists, \ + f"Route '{route_name}' references unknown role '{step_role}'" + assert step_role.endswith("_head"), \ + f"Route '{route_name}' step '{step_role}' should be a dept head" + + def test_key_departments_present(self): + """AC4: marketing, infra (sysadmin), frontend, backend, qa (testers), security.""" + import yaml + with open("agents/specialists.yaml") as f: + data = yaml.safe_load(f) + + departments = data["departments"] + required = ["marketing", "infra", "frontend", "backend", "qa", "security", "research"] + for dept in required: + assert dept in departments, f"Required department '{dept}' missing" + + def test_pm_prompt_references_all_department_heads(self): + """PM prompt must list all department heads.""" + with open("agents/prompts/pm.md") as f: + pm_prompt = f.read() + + import yaml + with open("agents/specialists.yaml") as f: + data = yaml.safe_load(f) + + specialists = data["specialists"] + heads = [k for k in specialists if k.endswith("_head")] + + for head in heads: + assert head in pm_prompt, \ + f"PM prompt missing reference to '{head}'" diff --git a/tests/test_runner.py b/tests/test_runner.py index ad6e5eb..264813f 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -599,19 +599,19 @@ class TestNonInteractive: @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": ""}, clear=False) @patch("agents.runner.subprocess.run") - def test_noninteractive_uses_600s_timeout(self, mock_run, conn): + def test_noninteractive_uses_model_timeout(self, mock_run, conn): mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=True) call_kwargs = mock_run.call_args[1] - assert call_kwargs.get("timeout") == 600 + assert call_kwargs.get("timeout") == 1200 # sonnet default @patch.dict("os.environ", {"KIN_NONINTERACTIVE": ""}) @patch("agents.runner.subprocess.run") - def test_interactive_uses_600s_timeout(self, mock_run, conn): + def test_interactive_uses_model_timeout(self, mock_run, conn): mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=False) call_kwargs = mock_run.call_args[1] - assert call_kwargs.get("timeout") == 600 + assert call_kwargs.get("timeout") == 1200 # sonnet default @patch.dict("os.environ", {"KIN_NONINTERACTIVE": ""}) @patch("agents.runner.subprocess.run") @@ -630,17 +630,44 @@ class TestNonInteractive: run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=False) call_kwargs = mock_run.call_args[1] assert call_kwargs.get("stdin") == subprocess.DEVNULL - assert call_kwargs.get("timeout") == 600 + assert call_kwargs.get("timeout") == 1200 # sonnet default @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": "900"}) @patch("agents.runner.subprocess.run") def test_custom_timeout_via_env_var(self, mock_run, conn): - """KIN_AGENT_TIMEOUT overrides the default 600s timeout.""" + """KIN_AGENT_TIMEOUT overrides model-based timeout.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol") call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 900 + @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": ""}) + @patch("agents.runner.subprocess.run") + def test_opus_timeout_1800(self, mock_run, conn): + """Opus model gets 1800s (30 min) timeout.""" + mock_run.return_value = _mock_claude_success({"result": "ok"}) + run_agent(conn, "debugger", "VDOL-001", "vdol", model="opus") + call_kwargs = mock_run.call_args[1] + assert call_kwargs.get("timeout") == 1800 + + @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": ""}) + @patch("agents.runner.subprocess.run") + def test_haiku_timeout_600(self, mock_run, conn): + """Haiku model gets 600s (10 min) timeout.""" + mock_run.return_value = _mock_claude_success({"result": "ok"}) + run_agent(conn, "debugger", "VDOL-001", "vdol", model="haiku") + call_kwargs = mock_run.call_args[1] + assert call_kwargs.get("timeout") == 600 + + @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": "999"}) + @patch("agents.runner.subprocess.run") + def test_env_timeout_overrides_model(self, mock_run, conn): + """KIN_AGENT_TIMEOUT env var overrides model-based timeout.""" + mock_run.return_value = _mock_claude_success({"result": "ok"}) + run_agent(conn, "debugger", "VDOL-001", "vdol", model="opus") + call_kwargs = mock_run.call_args[1] + assert call_kwargs.get("timeout") == 999 + @patch("agents.runner.subprocess.run") def test_allow_write_adds_skip_permissions(self, mock_run, conn): mock_run.return_value = _mock_claude_success({"result": "ok"}) @@ -1236,12 +1263,12 @@ class TestRegressionKIN056: @patch.dict("os.environ", {"KIN_NONINTERACTIVE": "1", "KIN_AGENT_TIMEOUT": ""}) @patch("agents.runner.subprocess.run") - def test_web_noninteractive_timeout_is_600(self, mock_run, conn): - """Web путь: KIN_NONINTERACTIVE=1 → timeout = 600s (не 300s).""" + def test_web_noninteractive_timeout_uses_model_default(self, mock_run, conn): + """Web путь: KIN_NONINTERACTIVE=1 → timeout = model default (sonnet=1200s).""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol") call_kwargs = mock_run.call_args[1] - assert call_kwargs.get("timeout") == 600 + assert call_kwargs.get("timeout") == 1200 @patch("agents.runner.subprocess.run") def test_web_and_cli_paths_use_same_timeout(self, mock_run, conn):