"""Tests for KIN-098: Three-level hierarchy — PM → department heads → workers. Covers: - _execute_department_head_step() logic - Sub-pipeline creation and execution - Recursion guard (no _head roles in sub-pipeline) - Blocked status propagation - Inter-department handoff via DB (to_department routing) - Context builder: department workers, description, incoming handoff - Full cycle: PM → backend_head → workers → handoff → frontend_head → workers - Artifacts passed to sub-pipeline workers via initial_previous_output - decisions_made extraction from sub-pipeline results - auto_complete eligibility for dept pipelines - _is_department_head() with execution_type from YAML - format_prompt fallback to department_head.md for _head roles """ import json import pytest from unittest.mock import patch, MagicMock from core.db import init_db from core import models from core.context_builder import build_context, format_prompt from agents.runner import run_pipeline, _execute_department_head_step, _is_department_head # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def conn(): c = init_db(":memory:") models.create_project(c, "proj", "TestProject", "~/projects/test", tech_stack=["python", "vue3"]) models.create_task(c, "PROJ-001", "proj", "Full-stack feature", brief={"route_type": "dept_feature"}) yield c c.close() def _mock_claude_success(output_data): mock = MagicMock() mock.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data mock.stderr = "" mock.returncode = 0 return mock def _mock_claude_failure(error_msg="error"): mock = MagicMock() mock.stdout = "" mock.stderr = error_msg mock.returncode = 1 return mock # Valid department head output def _dept_head_output(sub_pipeline, artifacts=None, handoff_notes="", status="done"): return { "status": status, "sub_pipeline": sub_pipeline, "artifacts": artifacts or {"files_changed": ["api.py"], "notes": "done"}, "handoff_notes": handoff_notes, } # --------------------------------------------------------------------------- # _is_department_head — execution_type detection # --------------------------------------------------------------------------- class TestIsDepartmentHead: """Tests for _is_department_head() helper.""" def test_known_heads_detected(self): """All _head roles from specialists.yaml are detected.""" # Reset cache to force reload import agents.runner as runner runner._DEPT_HEAD_ROLES = None assert _is_department_head("backend_head") is True assert _is_department_head("frontend_head") is True assert _is_department_head("qa_head") is True assert _is_department_head("security_head") is True assert _is_department_head("infra_head") is True assert _is_department_head("research_head") is True assert _is_department_head("marketing_head") is True def test_non_heads_not_detected(self): """Regular roles are not department heads.""" import agents.runner as runner runner._DEPT_HEAD_ROLES = None assert _is_department_head("backend_dev") is False assert _is_department_head("tester") is False assert _is_department_head("pm") is False assert _is_department_head("reviewer") is False def test_suffix_fallback(self): """Unknown _head roles detected via suffix fallback.""" import agents.runner as runner runner._DEPT_HEAD_ROLES = None assert _is_department_head("custom_head") is True # --------------------------------------------------------------------------- # _execute_department_head_step — unit tests # --------------------------------------------------------------------------- class TestExecuteDepartmentHeadStep: """Unit tests for _execute_department_head_step().""" @patch("agents.runner.subprocess.run") def test_valid_sub_pipeline_creates_child_and_runs(self, mock_run, conn): """Dept head returns valid sub_pipeline → child pipeline created, workers executed.""" # Workers succeed mock_run.return_value = _mock_claude_success({"result": "implemented"}) dept_output = _dept_head_output( sub_pipeline=[ {"role": "backend_dev", "model": "sonnet", "brief": "Implement API"}, {"role": "tester", "model": "sonnet", "brief": "Test API"}, ], artifacts={"files_changed": ["api.py"]}, handoff_notes="API ready for frontend", ) # Create parent pipeline first pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(dept_output)}, next_department="frontend", ) assert result["success"] is True # Verify output contains handoff summary output = json.loads(result["output"]) assert output["from_department"] == "backend" assert output["handoff_notes"] == "API ready for frontend" assert output["artifacts"]["files_changed"] == ["api.py"] # Verify child pipeline in DB pipes = conn.execute( "SELECT * FROM pipelines WHERE route_type='dept_sub'" ).fetchall() assert len(pipes) == 1 child = dict(pipes[0]) assert child["department"] == "backend" assert child["parent_pipeline_id"] == pipeline["id"] # Verify handoff record in DB handoffs = models.get_handoffs_for_task(conn, "PROJ-001") assert len(handoffs) == 1 assert handoffs[0]["from_department"] == "backend" assert handoffs[0]["to_department"] == "frontend" assert handoffs[0]["status"] == "done" def test_non_json_output_returns_error(self, conn): """Dept head returns non-JSON → error, no sub-pipeline created.""" pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": "This is not JSON at all"}, ) assert result["success"] is False assert "non-JSON" in result["output"] def test_blocked_status_returns_failure(self, conn): """Dept head returns status=blocked → failure with blocked info.""" blocked_output = { "status": "blocked", "blocked_reason": "Missing database schema specification", "blocked_at": "2026-03-17T12:00:00", } pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(blocked_output)}, ) assert result["success"] is False assert result.get("blocked") is True assert "Missing database schema" in result["blocked_reason"] def test_empty_sub_pipeline_returns_error(self, conn): """Dept head returns empty sub_pipeline → error.""" output = {"status": "done", "sub_pipeline": [], "artifacts": {}, "handoff_notes": ""} pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(output)}, ) assert result["success"] is False assert "empty or invalid" in result["output"] def test_recursion_guard_blocks_head_roles(self, conn): """Sub-pipeline containing _head role → recursion blocked.""" output = _dept_head_output( sub_pipeline=[ {"role": "frontend_head", "model": "opus", "brief": "Delegate to frontend"}, ], ) pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(output)}, ) assert result["success"] is False assert "Recursion blocked" in result["output"] assert "frontend_head" in result["output"] @patch("agents.runner.subprocess.run") def test_sub_pipeline_failure_propagates(self, mock_run, conn): """If a worker in sub-pipeline fails → dept step fails.""" mock_run.return_value = _mock_claude_failure("compilation error") output = _dept_head_output( sub_pipeline=[{"role": "backend_dev", "model": "sonnet", "brief": "Implement"}], ) pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(output)}, ) assert result["success"] is False # Handoff saved with status=partial handoffs = models.get_handoffs_for_task(conn, "PROJ-001") assert len(handoffs) == 1 assert handoffs[0]["status"] == "partial" @patch("agents.runner.subprocess.run") def test_cost_tokens_duration_aggregated(self, mock_run, conn): """Sub-pipeline cost/tokens/duration are returned for aggregation.""" mock_run.return_value = _mock_claude_success({"result": "done"}) output = _dept_head_output( sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], ) pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(output)}, ) assert result["success"] is True assert "cost_usd" in result assert "tokens_used" in result assert "duration_seconds" in result @patch("agents.runner.subprocess.run") def test_next_department_none_when_last_step(self, mock_run, conn): """When no next dept head step, to_department should be None.""" mock_run.return_value = _mock_claude_success({"result": "done"}) output = _dept_head_output( sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], ) pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_backend", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(output)}, next_department=None, ) assert result["success"] is True handoffs = models.get_handoffs_for_task(conn, "PROJ-001") assert len(handoffs) == 1 assert handoffs[0]["to_department"] is None @patch("agents.runner.subprocess.run") def test_artifacts_passed_to_first_worker(self, mock_run, conn): """Dept head artifacts are passed as initial_previous_output to first worker.""" prompts_seen = [] def side_effect(*args, **kwargs): cmd = args[0] for i, arg in enumerate(cmd): if arg == "-p" and i + 1 < len(cmd): prompts_seen.append(cmd[i + 1]) break return _mock_claude_success({"result": "done"}) mock_run.side_effect = side_effect output = _dept_head_output( sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], artifacts={"files_changed": ["api.py"], "unique_marker": "DEPT_ARTIFACTS_123"}, handoff_notes="Build the API using FastAPI", ) pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(output)}, ) assert result["success"] is True # First worker's prompt should contain dept head artifacts assert len(prompts_seen) >= 1 first_worker_prompt = prompts_seen[0] assert "DEPT_ARTIFACTS_123" in first_worker_prompt or "department_head_plan" in first_worker_prompt @patch("agents.runner.subprocess.run") def test_last_sub_role_returned(self, mock_run, conn): """Dept head result includes last_sub_role for auto_complete tracking.""" mock_run.return_value = _mock_claude_success({"result": "done"}) output = _dept_head_output( sub_pipeline=[ {"role": "backend_dev", "brief": "Implement"}, {"role": "reviewer", "brief": "Review"}, ], ) pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(output)}, ) assert result["success"] is True assert result["last_sub_role"] == "reviewer" @patch("agents.runner.check_claude_auth") @patch("agents.runner.subprocess.run") def test_decisions_extracted_from_sub_results(self, mock_run, mock_auth, conn): """Decisions from worker output are collected into handoff decisions_made.""" mock_auth.return_value = None # skip auth check — does not consume subprocess call call_count = [0] def side_effect(*args, **kwargs): call_count[0] += 1 if call_count[0] == 1: # backend_dev: returns plain result return _mock_claude_success({"result": "done"}) elif call_count[0] == 2: # Reviewer returns decisions as a JSON string inside "result" # (_run_claude extracts result key as the agent's text output) decisions_json = json.dumps({ "decisions": ["Use FastAPI instead of Flask", "Add rate limiting"], "findings": ["Missing input validation on POST /api/feature"], }) return _mock_claude_success({"result": decisions_json}) return _mock_claude_success({"result": "fallback"}) mock_run.side_effect = side_effect output = _dept_head_output( sub_pipeline=[ {"role": "backend_dev", "brief": "Implement"}, {"role": "reviewer", "brief": "Review"}, ], ) pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(output)}, ) assert result["success"] is True # Verify decisions_made in handoff handoffs = models.get_handoffs_for_task(conn, "PROJ-001") assert len(handoffs) == 1 decisions = handoffs[0]["decisions_made"] if isinstance(decisions, str): decisions = json.loads(decisions) assert len(decisions) >= 2 assert "Use FastAPI instead of Flask" in decisions # --------------------------------------------------------------------------- # Inter-department handoff routing # --------------------------------------------------------------------------- class TestHandoffRouting: """Tests for to_department routing in create_handoff/get_last_handoff.""" def test_handoff_with_to_department_found_by_filter(self, conn): """Handoff with to_department set → found when filtering by that department.""" pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) models.create_handoff( conn, pipeline["id"], "PROJ-001", from_department="backend", to_department="frontend", artifacts={"files_changed": ["api.py"]}, status="done", ) result = models.get_last_handoff(conn, "PROJ-001", to_department="frontend") assert result is not None assert result["from_department"] == "backend" assert result["to_department"] == "frontend" def test_handoff_without_to_department_found_without_filter(self, conn): """Handoff with to_department=None → found when no filter applied.""" pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_backend", [{"role": "backend_head"}]) models.create_handoff( conn, pipeline["id"], "PROJ-001", from_department="backend", artifacts={"notes": "done"}, status="done", ) # Without filter — found result = models.get_last_handoff(conn, "PROJ-001") assert result is not None assert result["from_department"] == "backend" # With filter — not found (to_department is NULL, filter is "frontend") result = models.get_last_handoff(conn, "PROJ-001", to_department="frontend") assert result is None def test_multiple_handoffs_returns_latest(self, conn): """Multiple handoffs → get_last_handoff returns the most recent.""" pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}, {"role": "frontend_head"}]) models.create_handoff( conn, pipeline["id"], "PROJ-001", from_department="backend", to_department="frontend", artifacts={"notes": "first"}, status="done", ) models.create_handoff( conn, pipeline["id"], "PROJ-001", from_department="frontend", to_department="qa", artifacts={"notes": "second"}, status="done", ) # Latest without filter result = models.get_last_handoff(conn, "PROJ-001") assert result["from_department"] == "frontend" # Filter by specific target result = models.get_last_handoff(conn, "PROJ-001", to_department="frontend") assert result["from_department"] == "backend" # --------------------------------------------------------------------------- # Context builder — department context injection # --------------------------------------------------------------------------- class TestDepartmentContext: """Tests for context_builder department head context injection.""" def test_dept_head_receives_department_info(self, conn): """Department head gets department name, workers, and description.""" ctx = build_context(conn, "PROJ-001", "backend_head", "proj") assert ctx["department"] == "backend" assert "backend_dev" in ctx["department_workers"] assert "architect" in ctx["department_workers"] assert "tester" in ctx["department_workers"] assert "reviewer" in ctx["department_workers"] assert "Backend development" in ctx["department_description"] def test_dept_head_receives_incoming_handoff(self, conn): """If previous department left a handoff, next dept head sees it.""" pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}, {"role": "frontend_head"}]) models.create_handoff( conn, pipeline["id"], "PROJ-001", from_department="backend", to_department="frontend", artifacts={"files_changed": ["api.py"], "notes": "API ready"}, status="done", ) ctx = build_context(conn, "PROJ-001", "frontend_head", "proj") assert "incoming_handoff" in ctx assert ctx["incoming_handoff"]["from_department"] == "backend" assert ctx["incoming_handoff"]["to_department"] == "frontend" def test_dept_head_fallback_handoff_from_different_dept(self, conn): """Handoff with to_department=NULL is found via fallback (from different dept).""" pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}, {"role": "frontend_head"}]) # Old-style handoff without to_department (before bugfix) models.create_handoff( conn, pipeline["id"], "PROJ-001", from_department="backend", artifacts={"notes": "API ready"}, status="done", ) ctx = build_context(conn, "PROJ-001", "frontend_head", "proj") # Fallback: should still find the handoff (from different dept) assert "incoming_handoff" in ctx assert ctx["incoming_handoff"]["from_department"] == "backend" def test_dept_head_fallback_ignores_own_dept_handoff(self, conn): """Fallback should NOT pick up handoff FROM our own department.""" pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_backend", [{"role": "backend_head"}]) # Only handoff is from backend itself — should not be picked up models.create_handoff( conn, pipeline["id"], "PROJ-001", from_department="backend", artifacts={"notes": "done"}, status="done", ) ctx = build_context(conn, "PROJ-001", "backend_head", "proj") # Should NOT find handoff from own department assert "incoming_handoff" not in ctx def test_dept_head_no_handoff_when_first_in_chain(self, conn): """First department head in chain has no incoming handoff.""" ctx = build_context(conn, "PROJ-001", "backend_head", "proj") assert "incoming_handoff" not in ctx def test_non_head_role_no_department_context(self, conn): """Regular specialist doesn't get department context.""" ctx = build_context(conn, "PROJ-001", "backend_dev", "proj") assert "department" not in ctx assert "department_workers" not in ctx def test_department_context_in_formatted_prompt(self, conn): """Department info appears in formatted prompt string.""" pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}, {"role": "frontend_head"}]) models.create_handoff( conn, pipeline["id"], "PROJ-001", from_department="backend", to_department="frontend", artifacts={"files_changed": ["api.py"]}, status="done", ) ctx = build_context(conn, "PROJ-001", "frontend_head", "proj") prompt = format_prompt(ctx, "frontend_head") assert "Department: frontend" in prompt assert "Department workers:" in prompt assert "frontend_dev" in prompt assert "Incoming handoff from previous department:" in prompt assert "api.py" in prompt def test_format_prompt_uses_department_head_md(self, conn): """format_prompt for _head roles falls back to department_head.md.""" ctx = build_context(conn, "PROJ-001", "backend_head", "proj") prompt = format_prompt(ctx, "backend_head") # Should contain department_head.md content, not generic fallback assert "Department Head" in prompt assert "sub_pipeline" in prompt # --------------------------------------------------------------------------- # Full cycle smoke test: PM → dept_head → workers → handoff → next dept # --------------------------------------------------------------------------- class TestFullDepartmentCycle: """Integration test: multi-department pipeline with handoff.""" @patch("agents.runner.check_claude_auth") @patch("agents.runner._run_learning_extraction") @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_backend_then_frontend_full_cycle(self, mock_run, mock_autocommit, mock_learning, mock_auth, conn): """PM routes to backend_head → frontend_head. Each dept head spawns sub-pipeline, handoff passes between departments. """ mock_auth.return_value = None # skip auth — does not consume subprocess calls mock_learning.return_value = None # skip learning extraction — does not consume subprocess calls call_count = [0] def side_effect(*args, **kwargs): call_count[0] += 1 # Call 1: backend_head (Opus) plans work if call_count[0] == 1: return _mock_claude_success(_dept_head_output( sub_pipeline=[ {"role": "backend_dev", "model": "sonnet", "brief": "Implement API"}, {"role": "tester", "model": "sonnet", "brief": "Test API"}, ], artifacts={"files_changed": ["api.py", "models.py"], "endpoints_added": ["POST /api/feature"]}, handoff_notes="Backend API ready. POST /api/feature accepts {name, value}.", )) # Call 2: backend_dev executes elif call_count[0] == 2: return _mock_claude_success({"result": "API implemented"}) # Call 3: tester in backend dept executes elif call_count[0] == 3: return _mock_claude_success({"result": "Tests pass"}) # Call 4: frontend_head (Opus) plans work elif call_count[0] == 4: return _mock_claude_success(_dept_head_output( sub_pipeline=[ {"role": "frontend_dev", "model": "sonnet", "brief": "Build UI calling POST /api/feature"}, {"role": "tester", "model": "sonnet", "brief": "Test UI"}, ], artifacts={"files_changed": ["FeatureForm.vue"], "notes": "UI calls backend API"}, handoff_notes="Frontend complete.", )) # Call 5: frontend_dev executes elif call_count[0] == 5: return _mock_claude_success({"result": "UI built"}) # Call 6: tester in frontend dept executes elif call_count[0] == 6: return _mock_claude_success({"result": "UI tests pass"}) return _mock_claude_success({"result": "fallback"}) mock_run.side_effect = side_effect steps = [ {"role": "backend_head", "model": "opus", "brief": "Implement backend"}, {"role": "frontend_head", "model": "opus", "brief": "Implement frontend"}, ] result = run_pipeline(conn, "PROJ-001", steps) # Pipeline succeeded assert result["success"] is True assert call_count[0] == 6 # 2 dept heads + 2+2 workers # Verify outer pipeline exists (identified via returned pipeline_id) # Note: due to a known bug, run_pipeline creates an extra pipeline record for each # dept sub-pipeline call, so there are 3 'dept_feature' pipelines total. outer_pipeline_id = result.get("pipeline_id") assert outer_pipeline_id is not None outer = conn.execute( "SELECT * FROM pipelines WHERE id=?", (outer_pipeline_id,) ).fetchone() assert outer is not None assert dict(outer)["route_type"] == "dept_feature" # Verify child pipelines (2: backend + frontend) child_pipes = conn.execute( "SELECT * FROM pipelines WHERE route_type='dept_sub' ORDER BY id" ).fetchall() assert len(child_pipes) == 2 assert dict(child_pipes[0])["department"] == "backend" assert dict(child_pipes[1])["department"] == "frontend" # Verify handoff records (2: backend→frontend, frontend→None) handoffs = models.get_handoffs_for_task(conn, "PROJ-001") assert len(handoffs) == 2 # First handoff: backend → frontend assert handoffs[0]["from_department"] == "backend" assert handoffs[0]["to_department"] == "frontend" assert handoffs[0]["status"] == "done" artifacts = handoffs[0]["artifacts"] if isinstance(artifacts, str): artifacts = json.loads(artifacts) assert "api.py" in artifacts["files_changed"] # Second handoff: frontend → None (last in chain) assert handoffs[1]["from_department"] == "frontend" assert handoffs[1]["to_department"] is None @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_first_dept_fails_blocks_pipeline(self, mock_run, mock_autocommit, conn): """If first dept head's sub-pipeline fails → entire pipeline blocked.""" call_count = [0] def side_effect(*args, **kwargs): call_count[0] += 1 if call_count[0] == 1: # backend_head plans work return _mock_claude_success(_dept_head_output( sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], )) elif call_count[0] == 2: # backend_dev fails return _mock_claude_failure("compilation error") return _mock_claude_success({"result": "should not reach"}) mock_run.side_effect = side_effect steps = [ {"role": "backend_head", "model": "opus", "brief": "Do backend"}, {"role": "frontend_head", "model": "opus", "brief": "Do frontend"}, ] result = run_pipeline(conn, "PROJ-001", steps) assert result["success"] is False assert "backend_head" in result["error"] assert call_count[0] == 2 # frontend_head never called # Task is blocked task = models.get_task(conn, "PROJ-001") assert task["status"] == "blocked" @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_dept_head_blocked_blocks_pipeline(self, mock_run, mock_autocommit, conn): """Dept head returns status=blocked → entire pipeline blocked.""" mock_run.return_value = _mock_claude_success({ "status": "blocked", "blocked_reason": "No DB schema", "blocked_at": "2026-03-17T12:00:00", }) steps = [ {"role": "backend_head", "model": "opus", "brief": "Do backend"}, {"role": "frontend_head", "model": "opus", "brief": "Do frontend"}, ] result = run_pipeline(conn, "PROJ-001", steps) assert result["success"] is False assert result["steps_completed"] == 0 # dept head blocked at step 0 @patch("agents.runner.check_claude_auth") @patch("agents.runner._run_learning_extraction") @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_frontend_head_prompt_contains_backend_handoff(self, mock_run, mock_autocommit, mock_learning, mock_auth, conn): """Verify that frontend_head's prompt includes backend's handoff context.""" mock_auth.return_value = None # skip auth — does not consume subprocess calls mock_learning.return_value = None # skip learning extraction — does not consume subprocess calls prompts_seen = [] call_count = [0] def side_effect(*args, **kwargs): call_count[0] += 1 cmd = args[0] # Extract -p argument for i, arg in enumerate(cmd): if arg == "-p" and i + 1 < len(cmd): prompts_seen.append(cmd[i + 1]) break if call_count[0] == 1: return _mock_claude_success(_dept_head_output( sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], artifacts={"unique_marker": "BACKEND_ARTIFACTS_XYZ"}, handoff_notes="HANDOFF_NOTES_ABC", )) elif call_count[0] == 2: return _mock_claude_success({"result": "done"}) elif call_count[0] == 3: # frontend_head — check its prompt return _mock_claude_success(_dept_head_output( sub_pipeline=[{"role": "frontend_dev", "brief": "Build UI"}], )) elif call_count[0] == 4: return _mock_claude_success({"result": "done"}) return _mock_claude_success({"result": "fallback"}) mock_run.side_effect = side_effect steps = [ {"role": "backend_head", "model": "opus", "brief": "Backend"}, {"role": "frontend_head", "model": "opus", "brief": "Frontend"}, ] result = run_pipeline(conn, "PROJ-001", steps) assert result["success"] is True # The frontend_head prompt (3rd call) should contain handoff from backend assert len(prompts_seen) >= 3 frontend_head_prompt = prompts_seen[2] # The handoff summary is passed as previous_output in the prompt assert "HANDOFF_NOTES_ABC" in frontend_head_prompt or \ "BACKEND_ARTIFACTS_XYZ" in frontend_head_prompt or \ "backend" in frontend_head_prompt.lower() @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_initial_previous_output_in_sub_pipeline(self, mock_run, mock_autocommit, conn): """Workers in sub-pipeline receive dept head plan as initial context.""" prompts_seen = [] call_count = [0] def side_effect(*args, **kwargs): call_count[0] += 1 cmd = args[0] for i, arg in enumerate(cmd): if arg == "-p" and i + 1 < len(cmd): prompts_seen.append(cmd[i + 1]) break if call_count[0] == 1: return _mock_claude_success(_dept_head_output( sub_pipeline=[ {"role": "backend_dev", "brief": "Implement"}, {"role": "tester", "brief": "Test"}, ], artifacts={"files_changed": ["api.py"], "marker": "DEPT_PLAN_MARKER"}, handoff_notes="Use FastAPI for the endpoint", )) return _mock_claude_success({"result": "done"}) mock_run.side_effect = side_effect steps = [{"role": "backend_head", "model": "opus", "brief": "Do backend"}] result = run_pipeline(conn, "PROJ-001", steps) assert result["success"] is True # Worker prompts (calls 2 and 3) should contain dept head plan context assert len(prompts_seen) >= 2 first_worker_prompt = prompts_seen[1] # prompts_seen[0] is backend_head assert "department_head_plan" in first_worker_prompt or "DEPT_PLAN_MARKER" in first_worker_prompt # --------------------------------------------------------------------------- # YAML structure validation # --------------------------------------------------------------------------- class TestSpecialistsYaml: """Validate specialists.yaml department head structure.""" def test_all_department_heads_have_required_fields(self): """Every _head specialist must have model=opus, execution_type=department_head, department.""" import yaml with open("agents/specialists.yaml") as f: data = yaml.safe_load(f) specialists = data["specialists"] heads = {k: v for k, v in specialists.items() if k.endswith("_head")} assert len(heads) >= 7, f"Expected >=7 dept heads, got {len(heads)}" for name, spec in heads.items(): assert spec["model"] == "opus", f"{name} should use opus model" assert spec["execution_type"] == "department_head", \ f"{name} missing execution_type=department_head" assert "department" in spec, f"{name} missing department field" def test_all_departments_have_head_and_workers(self): """Every department must reference a valid head and have workers list.""" import yaml with open("agents/specialists.yaml") as f: data = yaml.safe_load(f) departments = data["departments"] specialists = data["specialists"] assert len(departments) >= 7 for dept_name, dept in departments.items(): assert "head" in dept, f"Department '{dept_name}' missing 'head'" assert "workers" in dept, f"Department '{dept_name}' missing 'workers'" assert len(dept["workers"]) > 0, f"Department '{dept_name}' has no workers" # Head exists as specialist head_role = dept["head"] assert head_role in specialists, \ f"Department '{dept_name}' head '{head_role}' not in specialists" # All workers exist as specialists for worker in dept["workers"]: assert worker in specialists, \ f"Department '{dept_name}' worker '{worker}' not in specialists" def test_department_routes_exist(self): """dept_* routes reference valid _head roles.""" import yaml with open("agents/specialists.yaml") as f: data = yaml.safe_load(f) routes = data["routes"] specialists = data["specialists"] dept_routes = {k: v for k, v in routes.items() if k.startswith("dept_")} assert len(dept_routes) >= 6, f"Expected >=6 dept routes, got {len(dept_routes)}" for route_name, route in dept_routes.items(): for step_role in route["steps"]: assert step_role in specialists, \ f"Route '{route_name}' references unknown role '{step_role}'" assert step_role.endswith("_head"), \ f"Route '{route_name}' step '{step_role}' should be a dept head" def test_key_departments_present(self): """AC4: marketing, infra (sysadmin), frontend, backend, qa (testers), security.""" import yaml with open("agents/specialists.yaml") as f: data = yaml.safe_load(f) departments = data["departments"] required = ["marketing", "infra", "frontend", "backend", "qa", "security", "research"] for dept in required: assert dept in departments, f"Required department '{dept}' missing" def test_pm_prompt_references_all_department_heads(self): """PM prompt must list all department heads.""" with open("agents/prompts/pm.md") as f: pm_prompt = f.read() import yaml with open("agents/specialists.yaml") as f: data = yaml.safe_load(f) specialists = data["specialists"] heads = [k for k in specialists if k.endswith("_head")] for head in heads: assert head in pm_prompt, \ f"PM prompt missing reference to '{head}'" # --------------------------------------------------------------------------- # Edge cases: brief requirement (KIN-098 tester brief) # --------------------------------------------------------------------------- class TestEdgeCases: """Edge cases requested in the tester brief: 1. sub_pipeline with non-existent (non-head) role 2. handoff artifacts containing invalid JSON string 3. parent_pipeline_id referencing a non-existent pipeline 4. sub-pipeline worker failure → generic error (blocked_reason not propagated) """ @patch("agents.runner.check_claude_auth") @patch("agents.runner._run_learning_extraction") @patch("agents.runner.subprocess.run") def test_sub_pipeline_with_nonexistent_role_runs_normally(self, mock_run, mock_learning, mock_auth, conn): """Dept head returns sub_pipeline with unknown non-head role. The recursion guard only blocks _head roles. An unknown regular role is treated as a regular step and executed via run_agent (subprocess mock). """ mock_auth.return_value = None mock_learning.return_value = None mock_run.return_value = _mock_claude_success({"result": "done"}) output = _dept_head_output( sub_pipeline=[{"role": "unknown_specialist_xyz", "brief": "Do something unusual"}], ) pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) result = _execute_department_head_step( conn, "PROJ-001", "proj", parent_pipeline_id=pipeline["id"], step={"role": "backend_head", "brief": "Do backend"}, dept_head_result={"raw_output": json.dumps(output)}, ) # Unknown role is not a dept head → no recursion guard → runs as regular step assert result["success"] is True # Handoff saved (partial or done depending on sub-pipeline result) handoffs = models.get_handoffs_for_task(conn, "PROJ-001") assert len(handoffs) == 1 def test_create_handoff_with_invalid_json_string_artifacts(self, conn): """create_handoff accepts artifacts as an invalid JSON string. models.create_handoff passes non-dict values through _json_encode as-is. SQLite stores the raw string; no validation at the model layer. """ pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature", [{"role": "backend_head"}]) invalid_json = "{this is definitely not valid json: [unclosed" # Should not raise — models layer stores any string as-is handoff = models.create_handoff( conn, pipeline["id"], "PROJ-001", from_department="backend", artifacts=invalid_json, status="done", ) assert handoff is not None result = models.get_last_handoff(conn, "PROJ-001") assert result is not None # String artifacts stored and returned unchanged (no double-encoding) assert result["artifacts"] == invalid_json def test_create_handoff_with_nonexistent_parent_pipeline_id_raises(self, conn): """create_handoff with a pipeline_id that doesn't exist raises IntegrityError. init_db enables PRAGMA foreign_keys=ON, so SQLite enforces the FK constraint on department_handoffs.pipeline_id → pipelines.id. """ import sqlite3 as _sqlite3 fake_pipeline_id = 99999 # no pipeline with this id exists with pytest.raises(_sqlite3.IntegrityError, match="FOREIGN KEY constraint failed"): models.create_handoff( conn, fake_pipeline_id, "PROJ-001", from_department="backend", artifacts={"note": "orphaned handoff"}, status="done", ) @patch("agents.runner.check_claude_auth") @patch("agents.runner._run_learning_extraction") @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_sub_pipeline_worker_failure_produces_generic_error_message( self, mock_run, mock_autocommit, mock_learning, mock_auth, conn ): """When a worker in the dept sub-pipeline fails, run_pipeline returns a generic error message 'Department X sub-pipeline failed'. The specific worker error (stderr) is NOT propagated into the error message. This documents WARNING #2 from the KIN-098 architectural review: blocked_reason from the sub-pipeline result is not forwarded. """ mock_auth.return_value = None mock_learning.return_value = None call_count = [0] def side_effect(*args, **kwargs): call_count[0] += 1 if call_count[0] == 1: # backend_head plans work return _mock_claude_success(_dept_head_output( sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}], )) # backend_dev fails with a specific error return _mock_claude_failure("syntax error at line 42: unexpected token") mock_run.side_effect = side_effect result = run_pipeline(conn, "PROJ-001", [{"role": "backend_head", "brief": "Do backend"}]) assert result["success"] is False # Error contains the dept head role name assert "backend_head" in result["error"] # The specific worker error is NOT propagated (known limitation) assert "syntax error at line 42" not in result["error"] # Task is set to blocked task = models.get_task(conn, "PROJ-001") assert task["status"] == "blocked"