kin/tests/test_department_heads.py
2026-03-17 14:03:53 +02:00

929 lines
39 KiB
Python

"""Tests for KIN-098: Three-level hierarchy — PM → department heads → workers.
Covers:
- _execute_department_head_step() logic
- Sub-pipeline creation and execution
- Recursion guard (no _head roles in sub-pipeline)
- Blocked status propagation
- Inter-department handoff via DB (to_department routing)
- Context builder: department workers, description, incoming handoff
- Full cycle: PM → backend_head → workers → handoff → frontend_head → workers
- Artifacts passed to sub-pipeline workers via initial_previous_output
- decisions_made extraction from sub-pipeline results
- auto_complete eligibility for dept pipelines
- _is_department_head() with execution_type from YAML
- format_prompt fallback to department_head.md for _head roles
"""
import json
import pytest
from unittest.mock import patch, MagicMock
from core.db import init_db
from core import models
from core.context_builder import build_context, format_prompt
from agents.runner import run_pipeline, _execute_department_head_step, _is_department_head
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
c = init_db(":memory:")
models.create_project(c, "proj", "TestProject", "~/projects/test",
tech_stack=["python", "vue3"])
models.create_task(c, "PROJ-001", "proj", "Full-stack feature",
brief={"route_type": "dept_feature"})
yield c
c.close()
def _mock_claude_success(output_data):
mock = MagicMock()
mock.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data
mock.stderr = ""
mock.returncode = 0
return mock
def _mock_claude_failure(error_msg="error"):
mock = MagicMock()
mock.stdout = ""
mock.stderr = error_msg
mock.returncode = 1
return mock
# Valid department head output
def _dept_head_output(sub_pipeline, artifacts=None, handoff_notes="", status="done"):
return {
"status": status,
"sub_pipeline": sub_pipeline,
"artifacts": artifacts or {"files_changed": ["api.py"], "notes": "done"},
"handoff_notes": handoff_notes,
}
# ---------------------------------------------------------------------------
# _is_department_head — execution_type detection
# ---------------------------------------------------------------------------
class TestIsDepartmentHead:
"""Tests for _is_department_head() helper."""
def test_known_heads_detected(self):
"""All _head roles from specialists.yaml are detected."""
# Reset cache to force reload
import agents.runner as runner
runner._DEPT_HEAD_ROLES = None
assert _is_department_head("backend_head") is True
assert _is_department_head("frontend_head") is True
assert _is_department_head("qa_head") is True
assert _is_department_head("security_head") is True
assert _is_department_head("infra_head") is True
assert _is_department_head("research_head") is True
assert _is_department_head("marketing_head") is True
def test_non_heads_not_detected(self):
"""Regular roles are not department heads."""
import agents.runner as runner
runner._DEPT_HEAD_ROLES = None
assert _is_department_head("backend_dev") is False
assert _is_department_head("tester") is False
assert _is_department_head("pm") is False
assert _is_department_head("reviewer") is False
def test_suffix_fallback(self):
"""Unknown _head roles detected via suffix fallback."""
import agents.runner as runner
runner._DEPT_HEAD_ROLES = None
assert _is_department_head("custom_head") is True
# ---------------------------------------------------------------------------
# _execute_department_head_step — unit tests
# ---------------------------------------------------------------------------
class TestExecuteDepartmentHeadStep:
"""Unit tests for _execute_department_head_step()."""
@patch("agents.runner.subprocess.run")
def test_valid_sub_pipeline_creates_child_and_runs(self, mock_run, conn):
"""Dept head returns valid sub_pipeline → child pipeline created, workers executed."""
# Workers succeed
mock_run.return_value = _mock_claude_success({"result": "implemented"})
dept_output = _dept_head_output(
sub_pipeline=[
{"role": "backend_dev", "model": "sonnet", "brief": "Implement API"},
{"role": "tester", "model": "sonnet", "brief": "Test API"},
],
artifacts={"files_changed": ["api.py"]},
handoff_notes="API ready for frontend",
)
# Create parent pipeline first
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(dept_output)},
next_department="frontend",
)
assert result["success"] is True
# Verify output contains handoff summary
output = json.loads(result["output"])
assert output["from_department"] == "backend"
assert output["handoff_notes"] == "API ready for frontend"
assert output["artifacts"]["files_changed"] == ["api.py"]
# Verify child pipeline in DB
pipes = conn.execute(
"SELECT * FROM pipelines WHERE route_type='dept_sub'"
).fetchall()
assert len(pipes) == 1
child = dict(pipes[0])
assert child["department"] == "backend"
assert child["parent_pipeline_id"] == pipeline["id"]
# Verify handoff record in DB
handoffs = models.get_handoffs_for_task(conn, "PROJ-001")
assert len(handoffs) == 1
assert handoffs[0]["from_department"] == "backend"
assert handoffs[0]["to_department"] == "frontend"
assert handoffs[0]["status"] == "done"
def test_non_json_output_returns_error(self, conn):
"""Dept head returns non-JSON → error, no sub-pipeline created."""
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": "This is not JSON at all"},
)
assert result["success"] is False
assert "non-JSON" in result["output"]
def test_blocked_status_returns_failure(self, conn):
"""Dept head returns status=blocked → failure with blocked info."""
blocked_output = {
"status": "blocked",
"blocked_reason": "Missing database schema specification",
"blocked_at": "2026-03-17T12:00:00",
}
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(blocked_output)},
)
assert result["success"] is False
assert result.get("blocked") is True
assert "Missing database schema" in result["blocked_reason"]
def test_empty_sub_pipeline_returns_error(self, conn):
"""Dept head returns empty sub_pipeline → error."""
output = {"status": "done", "sub_pipeline": [], "artifacts": {}, "handoff_notes": ""}
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(output)},
)
assert result["success"] is False
assert "empty or invalid" in result["output"]
def test_recursion_guard_blocks_head_roles(self, conn):
"""Sub-pipeline containing _head role → recursion blocked."""
output = _dept_head_output(
sub_pipeline=[
{"role": "frontend_head", "model": "opus", "brief": "Delegate to frontend"},
],
)
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(output)},
)
assert result["success"] is False
assert "Recursion blocked" in result["output"]
assert "frontend_head" in result["output"]
@patch("agents.runner.subprocess.run")
def test_sub_pipeline_failure_propagates(self, mock_run, conn):
"""If a worker in sub-pipeline fails → dept step fails."""
mock_run.return_value = _mock_claude_failure("compilation error")
output = _dept_head_output(
sub_pipeline=[{"role": "backend_dev", "model": "sonnet", "brief": "Implement"}],
)
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(output)},
)
assert result["success"] is False
# Handoff saved with status=partial
handoffs = models.get_handoffs_for_task(conn, "PROJ-001")
assert len(handoffs) == 1
assert handoffs[0]["status"] == "partial"
@patch("agents.runner.subprocess.run")
def test_cost_tokens_duration_aggregated(self, mock_run, conn):
"""Sub-pipeline cost/tokens/duration are returned for aggregation."""
mock_run.return_value = _mock_claude_success({"result": "done"})
output = _dept_head_output(
sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}],
)
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(output)},
)
assert result["success"] is True
assert "cost_usd" in result
assert "tokens_used" in result
assert "duration_seconds" in result
@patch("agents.runner.subprocess.run")
def test_next_department_none_when_last_step(self, mock_run, conn):
"""When no next dept head step, to_department should be None."""
mock_run.return_value = _mock_claude_success({"result": "done"})
output = _dept_head_output(
sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}],
)
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_backend",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(output)},
next_department=None,
)
assert result["success"] is True
handoffs = models.get_handoffs_for_task(conn, "PROJ-001")
assert len(handoffs) == 1
assert handoffs[0]["to_department"] is None
@patch("agents.runner.subprocess.run")
def test_artifacts_passed_to_first_worker(self, mock_run, conn):
"""Dept head artifacts are passed as initial_previous_output to first worker."""
prompts_seen = []
def side_effect(*args, **kwargs):
cmd = args[0]
for i, arg in enumerate(cmd):
if arg == "-p" and i + 1 < len(cmd):
prompts_seen.append(cmd[i + 1])
break
return _mock_claude_success({"result": "done"})
mock_run.side_effect = side_effect
output = _dept_head_output(
sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}],
artifacts={"files_changed": ["api.py"], "unique_marker": "DEPT_ARTIFACTS_123"},
handoff_notes="Build the API using FastAPI",
)
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(output)},
)
assert result["success"] is True
# First worker's prompt should contain dept head artifacts
assert len(prompts_seen) >= 1
first_worker_prompt = prompts_seen[0]
assert "DEPT_ARTIFACTS_123" in first_worker_prompt or "department_head_plan" in first_worker_prompt
@patch("agents.runner.subprocess.run")
def test_last_sub_role_returned(self, mock_run, conn):
"""Dept head result includes last_sub_role for auto_complete tracking."""
mock_run.return_value = _mock_claude_success({"result": "done"})
output = _dept_head_output(
sub_pipeline=[
{"role": "backend_dev", "brief": "Implement"},
{"role": "reviewer", "brief": "Review"},
],
)
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(output)},
)
assert result["success"] is True
assert result["last_sub_role"] == "reviewer"
@patch("agents.runner.subprocess.run")
def test_decisions_extracted_from_sub_results(self, mock_run, conn):
"""Decisions from worker output are collected into handoff decisions_made."""
call_count = [0]
def side_effect(*args, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
return _mock_claude_success({"result": "done"})
elif call_count[0] == 2:
# Reviewer returns decisions
return _mock_claude_success({
"result": "reviewed",
"decisions": ["Use FastAPI instead of Flask", "Add rate limiting"],
"findings": ["Missing input validation on POST /api/feature"],
})
return _mock_claude_success({"result": "fallback"})
mock_run.side_effect = side_effect
output = _dept_head_output(
sub_pipeline=[
{"role": "backend_dev", "brief": "Implement"},
{"role": "reviewer", "brief": "Review"},
],
)
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(output)},
)
assert result["success"] is True
# Verify decisions_made in handoff
handoffs = models.get_handoffs_for_task(conn, "PROJ-001")
assert len(handoffs) == 1
decisions = handoffs[0]["decisions_made"]
if isinstance(decisions, str):
decisions = json.loads(decisions)
assert len(decisions) >= 2
assert "Use FastAPI instead of Flask" in decisions
# ---------------------------------------------------------------------------
# Inter-department handoff routing
# ---------------------------------------------------------------------------
class TestHandoffRouting:
"""Tests for to_department routing in create_handoff/get_last_handoff."""
def test_handoff_with_to_department_found_by_filter(self, conn):
"""Handoff with to_department set → found when filtering by that department."""
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
models.create_handoff(
conn, pipeline["id"], "PROJ-001",
from_department="backend",
to_department="frontend",
artifacts={"files_changed": ["api.py"]},
status="done",
)
result = models.get_last_handoff(conn, "PROJ-001", to_department="frontend")
assert result is not None
assert result["from_department"] == "backend"
assert result["to_department"] == "frontend"
def test_handoff_without_to_department_found_without_filter(self, conn):
"""Handoff with to_department=None → found when no filter applied."""
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_backend",
[{"role": "backend_head"}])
models.create_handoff(
conn, pipeline["id"], "PROJ-001",
from_department="backend",
artifacts={"notes": "done"},
status="done",
)
# Without filter — found
result = models.get_last_handoff(conn, "PROJ-001")
assert result is not None
assert result["from_department"] == "backend"
# With filter — not found (to_department is NULL, filter is "frontend")
result = models.get_last_handoff(conn, "PROJ-001", to_department="frontend")
assert result is None
def test_multiple_handoffs_returns_latest(self, conn):
"""Multiple handoffs → get_last_handoff returns the most recent."""
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}, {"role": "frontend_head"}])
models.create_handoff(
conn, pipeline["id"], "PROJ-001",
from_department="backend", to_department="frontend",
artifacts={"notes": "first"}, status="done",
)
models.create_handoff(
conn, pipeline["id"], "PROJ-001",
from_department="frontend", to_department="qa",
artifacts={"notes": "second"}, status="done",
)
# Latest without filter
result = models.get_last_handoff(conn, "PROJ-001")
assert result["from_department"] == "frontend"
# Filter by specific target
result = models.get_last_handoff(conn, "PROJ-001", to_department="frontend")
assert result["from_department"] == "backend"
# ---------------------------------------------------------------------------
# Context builder — department context injection
# ---------------------------------------------------------------------------
class TestDepartmentContext:
"""Tests for context_builder department head context injection."""
def test_dept_head_receives_department_info(self, conn):
"""Department head gets department name, workers, and description."""
ctx = build_context(conn, "PROJ-001", "backend_head", "proj")
assert ctx["department"] == "backend"
assert "backend_dev" in ctx["department_workers"]
assert "architect" in ctx["department_workers"]
assert "tester" in ctx["department_workers"]
assert "reviewer" in ctx["department_workers"]
assert "Backend development" in ctx["department_description"]
def test_dept_head_receives_incoming_handoff(self, conn):
"""If previous department left a handoff, next dept head sees it."""
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}, {"role": "frontend_head"}])
models.create_handoff(
conn, pipeline["id"], "PROJ-001",
from_department="backend",
to_department="frontend",
artifacts={"files_changed": ["api.py"], "notes": "API ready"},
status="done",
)
ctx = build_context(conn, "PROJ-001", "frontend_head", "proj")
assert "incoming_handoff" in ctx
assert ctx["incoming_handoff"]["from_department"] == "backend"
assert ctx["incoming_handoff"]["to_department"] == "frontend"
def test_dept_head_fallback_handoff_from_different_dept(self, conn):
"""Handoff with to_department=NULL is found via fallback (from different dept)."""
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}, {"role": "frontend_head"}])
# Old-style handoff without to_department (before bugfix)
models.create_handoff(
conn, pipeline["id"], "PROJ-001",
from_department="backend",
artifacts={"notes": "API ready"},
status="done",
)
ctx = build_context(conn, "PROJ-001", "frontend_head", "proj")
# Fallback: should still find the handoff (from different dept)
assert "incoming_handoff" in ctx
assert ctx["incoming_handoff"]["from_department"] == "backend"
def test_dept_head_fallback_ignores_own_dept_handoff(self, conn):
"""Fallback should NOT pick up handoff FROM our own department."""
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_backend",
[{"role": "backend_head"}])
# Only handoff is from backend itself — should not be picked up
models.create_handoff(
conn, pipeline["id"], "PROJ-001",
from_department="backend",
artifacts={"notes": "done"},
status="done",
)
ctx = build_context(conn, "PROJ-001", "backend_head", "proj")
# Should NOT find handoff from own department
assert "incoming_handoff" not in ctx
def test_dept_head_no_handoff_when_first_in_chain(self, conn):
"""First department head in chain has no incoming handoff."""
ctx = build_context(conn, "PROJ-001", "backend_head", "proj")
assert "incoming_handoff" not in ctx
def test_non_head_role_no_department_context(self, conn):
"""Regular specialist doesn't get department context."""
ctx = build_context(conn, "PROJ-001", "backend_dev", "proj")
assert "department" not in ctx
assert "department_workers" not in ctx
def test_department_context_in_formatted_prompt(self, conn):
"""Department info appears in formatted prompt string."""
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}, {"role": "frontend_head"}])
models.create_handoff(
conn, pipeline["id"], "PROJ-001",
from_department="backend", to_department="frontend",
artifacts={"files_changed": ["api.py"]},
status="done",
)
ctx = build_context(conn, "PROJ-001", "frontend_head", "proj")
prompt = format_prompt(ctx, "frontend_head")
assert "Department: frontend" in prompt
assert "Department workers:" in prompt
assert "frontend_dev" in prompt
assert "Incoming handoff from previous department:" in prompt
assert "api.py" in prompt
def test_format_prompt_uses_department_head_md(self, conn):
"""format_prompt for _head roles falls back to department_head.md."""
ctx = build_context(conn, "PROJ-001", "backend_head", "proj")
prompt = format_prompt(ctx, "backend_head")
# Should contain department_head.md content, not generic fallback
assert "Department Head" in prompt
assert "sub_pipeline" in prompt
# ---------------------------------------------------------------------------
# Full cycle smoke test: PM → dept_head → workers → handoff → next dept
# ---------------------------------------------------------------------------
class TestFullDepartmentCycle:
"""Integration test: multi-department pipeline with handoff."""
@patch("agents.runner._run_autocommit")
@patch("agents.runner.subprocess.run")
def test_backend_then_frontend_full_cycle(self, mock_run, mock_autocommit, conn):
"""PM routes to backend_head → frontend_head.
Each dept head spawns sub-pipeline, handoff passes between departments.
"""
call_count = [0]
def side_effect(*args, **kwargs):
call_count[0] += 1
# Call 1: backend_head (Opus) plans work
if call_count[0] == 1:
return _mock_claude_success(_dept_head_output(
sub_pipeline=[
{"role": "backend_dev", "model": "sonnet", "brief": "Implement API"},
{"role": "tester", "model": "sonnet", "brief": "Test API"},
],
artifacts={"files_changed": ["api.py", "models.py"],
"endpoints_added": ["POST /api/feature"]},
handoff_notes="Backend API ready. POST /api/feature accepts {name, value}.",
))
# Call 2: backend_dev executes
elif call_count[0] == 2:
return _mock_claude_success({"result": "API implemented"})
# Call 3: tester in backend dept executes
elif call_count[0] == 3:
return _mock_claude_success({"result": "Tests pass"})
# Call 4: frontend_head (Opus) plans work
elif call_count[0] == 4:
return _mock_claude_success(_dept_head_output(
sub_pipeline=[
{"role": "frontend_dev", "model": "sonnet",
"brief": "Build UI calling POST /api/feature"},
{"role": "tester", "model": "sonnet", "brief": "Test UI"},
],
artifacts={"files_changed": ["FeatureForm.vue"],
"notes": "UI calls backend API"},
handoff_notes="Frontend complete.",
))
# Call 5: frontend_dev executes
elif call_count[0] == 5:
return _mock_claude_success({"result": "UI built"})
# Call 6: tester in frontend dept executes
elif call_count[0] == 6:
return _mock_claude_success({"result": "UI tests pass"})
return _mock_claude_success({"result": "fallback"})
mock_run.side_effect = side_effect
steps = [
{"role": "backend_head", "model": "opus", "brief": "Implement backend"},
{"role": "frontend_head", "model": "opus", "brief": "Implement frontend"},
]
result = run_pipeline(conn, "PROJ-001", steps)
# Pipeline succeeded
assert result["success"] is True
assert call_count[0] == 6 # 2 dept heads + 2+2 workers
# Verify parent pipeline
parent_pipes = conn.execute(
"SELECT * FROM pipelines WHERE route_type='dept_feature'"
).fetchall()
assert len(parent_pipes) == 1
# Verify child pipelines (2: backend + frontend)
child_pipes = conn.execute(
"SELECT * FROM pipelines WHERE route_type='dept_sub' ORDER BY id"
).fetchall()
assert len(child_pipes) == 2
assert dict(child_pipes[0])["department"] == "backend"
assert dict(child_pipes[1])["department"] == "frontend"
# Verify handoff records (2: backend→frontend, frontend→None)
handoffs = models.get_handoffs_for_task(conn, "PROJ-001")
assert len(handoffs) == 2
# First handoff: backend → frontend
assert handoffs[0]["from_department"] == "backend"
assert handoffs[0]["to_department"] == "frontend"
assert handoffs[0]["status"] == "done"
artifacts = handoffs[0]["artifacts"]
if isinstance(artifacts, str):
artifacts = json.loads(artifacts)
assert "api.py" in artifacts["files_changed"]
# Second handoff: frontend → None (last in chain)
assert handoffs[1]["from_department"] == "frontend"
assert handoffs[1]["to_department"] is None
@patch("agents.runner._run_autocommit")
@patch("agents.runner.subprocess.run")
def test_first_dept_fails_blocks_pipeline(self, mock_run, mock_autocommit, conn):
"""If first dept head's sub-pipeline fails → entire pipeline blocked."""
call_count = [0]
def side_effect(*args, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
# backend_head plans work
return _mock_claude_success(_dept_head_output(
sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}],
))
elif call_count[0] == 2:
# backend_dev fails
return _mock_claude_failure("compilation error")
return _mock_claude_success({"result": "should not reach"})
mock_run.side_effect = side_effect
steps = [
{"role": "backend_head", "model": "opus", "brief": "Do backend"},
{"role": "frontend_head", "model": "opus", "brief": "Do frontend"},
]
result = run_pipeline(conn, "PROJ-001", steps)
assert result["success"] is False
assert "backend_head" in result["error"]
assert call_count[0] == 2 # frontend_head never called
# Task is blocked
task = models.get_task(conn, "PROJ-001")
assert task["status"] == "blocked"
@patch("agents.runner._run_autocommit")
@patch("agents.runner.subprocess.run")
def test_dept_head_blocked_blocks_pipeline(self, mock_run, mock_autocommit, conn):
"""Dept head returns status=blocked → entire pipeline blocked."""
mock_run.return_value = _mock_claude_success({
"status": "blocked",
"blocked_reason": "No DB schema",
"blocked_at": "2026-03-17T12:00:00",
})
steps = [
{"role": "backend_head", "model": "opus", "brief": "Do backend"},
{"role": "frontend_head", "model": "opus", "brief": "Do frontend"},
]
result = run_pipeline(conn, "PROJ-001", steps)
assert result["success"] is False
assert result["steps_completed"] == 0 # dept head blocked at step 0
@patch("agents.runner._run_autocommit")
@patch("agents.runner.subprocess.run")
def test_frontend_head_prompt_contains_backend_handoff(self, mock_run, mock_autocommit, conn):
"""Verify that frontend_head's prompt includes backend's handoff context."""
prompts_seen = []
call_count = [0]
def side_effect(*args, **kwargs):
call_count[0] += 1
cmd = args[0]
# Extract -p argument
for i, arg in enumerate(cmd):
if arg == "-p" and i + 1 < len(cmd):
prompts_seen.append(cmd[i + 1])
break
if call_count[0] == 1:
return _mock_claude_success(_dept_head_output(
sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}],
artifacts={"unique_marker": "BACKEND_ARTIFACTS_XYZ"},
handoff_notes="HANDOFF_NOTES_ABC",
))
elif call_count[0] == 2:
return _mock_claude_success({"result": "done"})
elif call_count[0] == 3:
# frontend_head — check its prompt
return _mock_claude_success(_dept_head_output(
sub_pipeline=[{"role": "frontend_dev", "brief": "Build UI"}],
))
elif call_count[0] == 4:
return _mock_claude_success({"result": "done"})
return _mock_claude_success({"result": "fallback"})
mock_run.side_effect = side_effect
steps = [
{"role": "backend_head", "model": "opus", "brief": "Backend"},
{"role": "frontend_head", "model": "opus", "brief": "Frontend"},
]
result = run_pipeline(conn, "PROJ-001", steps)
assert result["success"] is True
# The frontend_head prompt (3rd call) should contain handoff from backend
assert len(prompts_seen) >= 3
frontend_head_prompt = prompts_seen[2]
# The handoff summary is passed as previous_output in the prompt
assert "HANDOFF_NOTES_ABC" in frontend_head_prompt or \
"BACKEND_ARTIFACTS_XYZ" in frontend_head_prompt or \
"backend" in frontend_head_prompt.lower()
@patch("agents.runner._run_autocommit")
@patch("agents.runner.subprocess.run")
def test_initial_previous_output_in_sub_pipeline(self, mock_run, mock_autocommit, conn):
"""Workers in sub-pipeline receive dept head plan as initial context."""
prompts_seen = []
call_count = [0]
def side_effect(*args, **kwargs):
call_count[0] += 1
cmd = args[0]
for i, arg in enumerate(cmd):
if arg == "-p" and i + 1 < len(cmd):
prompts_seen.append(cmd[i + 1])
break
if call_count[0] == 1:
return _mock_claude_success(_dept_head_output(
sub_pipeline=[
{"role": "backend_dev", "brief": "Implement"},
{"role": "tester", "brief": "Test"},
],
artifacts={"files_changed": ["api.py"], "marker": "DEPT_PLAN_MARKER"},
handoff_notes="Use FastAPI for the endpoint",
))
return _mock_claude_success({"result": "done"})
mock_run.side_effect = side_effect
steps = [{"role": "backend_head", "model": "opus", "brief": "Do backend"}]
result = run_pipeline(conn, "PROJ-001", steps)
assert result["success"] is True
# Worker prompts (calls 2 and 3) should contain dept head plan context
assert len(prompts_seen) >= 2
first_worker_prompt = prompts_seen[1] # prompts_seen[0] is backend_head
assert "department_head_plan" in first_worker_prompt or "DEPT_PLAN_MARKER" in first_worker_prompt
# ---------------------------------------------------------------------------
# YAML structure validation
# ---------------------------------------------------------------------------
class TestSpecialistsYaml:
"""Validate specialists.yaml department head structure."""
def test_all_department_heads_have_required_fields(self):
"""Every _head specialist must have model=opus, execution_type=department_head, department."""
import yaml
with open("agents/specialists.yaml") as f:
data = yaml.safe_load(f)
specialists = data["specialists"]
heads = {k: v for k, v in specialists.items() if k.endswith("_head")}
assert len(heads) >= 7, f"Expected >=7 dept heads, got {len(heads)}"
for name, spec in heads.items():
assert spec["model"] == "opus", f"{name} should use opus model"
assert spec["execution_type"] == "department_head", \
f"{name} missing execution_type=department_head"
assert "department" in spec, f"{name} missing department field"
def test_all_departments_have_head_and_workers(self):
"""Every department must reference a valid head and have workers list."""
import yaml
with open("agents/specialists.yaml") as f:
data = yaml.safe_load(f)
departments = data["departments"]
specialists = data["specialists"]
assert len(departments) >= 7
for dept_name, dept in departments.items():
assert "head" in dept, f"Department '{dept_name}' missing 'head'"
assert "workers" in dept, f"Department '{dept_name}' missing 'workers'"
assert len(dept["workers"]) > 0, f"Department '{dept_name}' has no workers"
# Head exists as specialist
head_role = dept["head"]
assert head_role in specialists, \
f"Department '{dept_name}' head '{head_role}' not in specialists"
# All workers exist as specialists
for worker in dept["workers"]:
assert worker in specialists, \
f"Department '{dept_name}' worker '{worker}' not in specialists"
def test_department_routes_exist(self):
"""dept_* routes reference valid _head roles."""
import yaml
with open("agents/specialists.yaml") as f:
data = yaml.safe_load(f)
routes = data["routes"]
specialists = data["specialists"]
dept_routes = {k: v for k, v in routes.items() if k.startswith("dept_")}
assert len(dept_routes) >= 6, f"Expected >=6 dept routes, got {len(dept_routes)}"
for route_name, route in dept_routes.items():
for step_role in route["steps"]:
assert step_role in specialists, \
f"Route '{route_name}' references unknown role '{step_role}'"
assert step_role.endswith("_head"), \
f"Route '{route_name}' step '{step_role}' should be a dept head"
def test_key_departments_present(self):
"""AC4: marketing, infra (sysadmin), frontend, backend, qa (testers), security."""
import yaml
with open("agents/specialists.yaml") as f:
data = yaml.safe_load(f)
departments = data["departments"]
required = ["marketing", "infra", "frontend", "backend", "qa", "security", "research"]
for dept in required:
assert dept in departments, f"Required department '{dept}' missing"
def test_pm_prompt_references_all_department_heads(self):
"""PM prompt must list all department heads."""
with open("agents/prompts/pm.md") as f:
pm_prompt = f.read()
import yaml
with open("agents/specialists.yaml") as f:
data = yaml.safe_load(f)
specialists = data["specialists"]
heads = [k for k in specialists if k.endswith("_head")]
for head in heads:
assert head in pm_prompt, \
f"PM prompt missing reference to '{head}'"