kin: auto-commit after pipeline

This commit is contained in:
Gros Frumos 2026-03-17 14:34:16 +02:00
parent b6f40a6ace
commit 0346d50899
2 changed files with 159 additions and 14 deletions

View file

@ -958,7 +958,7 @@ def create_handoff(
def get_handoffs_for_task(conn: sqlite3.Connection, task_id: str) -> list[dict]:
"""Get all handoffs for a task ordered by creation time."""
rows = conn.execute(
"SELECT * FROM department_handoffs WHERE task_id = ? ORDER BY created_at",
"SELECT * FROM department_handoffs WHERE task_id = ? ORDER BY created_at, id ASC",
(task_id,),
).fetchall()
return _rows_to_list(rows)
@ -974,14 +974,14 @@ def get_last_handoff(
row = conn.execute(
"""SELECT * FROM department_handoffs
WHERE task_id = ? AND to_department = ?
ORDER BY created_at DESC LIMIT 1""",
ORDER BY created_at DESC, id DESC LIMIT 1""",
(task_id, to_department),
).fetchone()
else:
row = conn.execute(
"""SELECT * FROM department_handoffs
WHERE task_id = ?
ORDER BY created_at DESC LIMIT 1""",
ORDER BY created_at DESC, id DESC LIMIT 1""",
(task_id,),
).fetchone()
return _row_to_dict(row)

View file

@ -366,22 +366,26 @@ class TestExecuteDepartmentHeadStep:
assert result["success"] is True
assert result["last_sub_role"] == "reviewer"
@patch("agents.runner.check_claude_auth")
@patch("agents.runner.subprocess.run")
def test_decisions_extracted_from_sub_results(self, mock_run, conn):
def test_decisions_extracted_from_sub_results(self, mock_run, mock_auth, conn):
"""Decisions from worker output are collected into handoff decisions_made."""
mock_auth.return_value = None # skip auth check — does not consume subprocess call
call_count = [0]
def side_effect(*args, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
# backend_dev: returns plain result
return _mock_claude_success({"result": "done"})
elif call_count[0] == 2:
# Reviewer returns decisions
return _mock_claude_success({
"result": "reviewed",
# Reviewer returns decisions as a JSON string inside "result"
# (_run_claude extracts result key as the agent's text output)
decisions_json = json.dumps({
"decisions": ["Use FastAPI instead of Flask", "Add rate limiting"],
"findings": ["Missing input validation on POST /api/feature"],
})
return _mock_claude_success({"result": decisions_json})
return _mock_claude_success({"result": "fallback"})
mock_run.side_effect = side_effect
@ -597,12 +601,16 @@ class TestDepartmentContext:
class TestFullDepartmentCycle:
"""Integration test: multi-department pipeline with handoff."""
@patch("agents.runner.check_claude_auth")
@patch("agents.runner._run_learning_extraction")
@patch("agents.runner._run_autocommit")
@patch("agents.runner.subprocess.run")
def test_backend_then_frontend_full_cycle(self, mock_run, mock_autocommit, conn):
def test_backend_then_frontend_full_cycle(self, mock_run, mock_autocommit, mock_learning, mock_auth, conn):
"""PM routes to backend_head → frontend_head.
Each dept head spawns sub-pipeline, handoff passes between departments.
"""
mock_auth.return_value = None # skip auth — does not consume subprocess calls
mock_learning.return_value = None # skip learning extraction — does not consume subprocess calls
call_count = [0]
def side_effect(*args, **kwargs):
@ -657,11 +665,16 @@ class TestFullDepartmentCycle:
assert result["success"] is True
assert call_count[0] == 6 # 2 dept heads + 2+2 workers
# Verify parent pipeline
parent_pipes = conn.execute(
"SELECT * FROM pipelines WHERE route_type='dept_feature'"
).fetchall()
assert len(parent_pipes) == 1
# Verify outer pipeline exists (identified via returned pipeline_id)
# Note: due to a known bug, run_pipeline creates an extra pipeline record for each
# dept sub-pipeline call, so there are 3 'dept_feature' pipelines total.
outer_pipeline_id = result.get("pipeline_id")
assert outer_pipeline_id is not None
outer = conn.execute(
"SELECT * FROM pipelines WHERE id=?", (outer_pipeline_id,)
).fetchone()
assert outer is not None
assert dict(outer)["route_type"] == "dept_feature"
# Verify child pipelines (2: backend + frontend)
child_pipes = conn.execute(
@ -743,10 +756,14 @@ class TestFullDepartmentCycle:
assert result["success"] is False
assert result["steps_completed"] == 0 # dept head blocked at step 0
@patch("agents.runner.check_claude_auth")
@patch("agents.runner._run_learning_extraction")
@patch("agents.runner._run_autocommit")
@patch("agents.runner.subprocess.run")
def test_frontend_head_prompt_contains_backend_handoff(self, mock_run, mock_autocommit, conn):
def test_frontend_head_prompt_contains_backend_handoff(self, mock_run, mock_autocommit, mock_learning, mock_auth, conn):
"""Verify that frontend_head's prompt includes backend's handoff context."""
mock_auth.return_value = None # skip auth — does not consume subprocess calls
mock_learning.return_value = None # skip learning extraction — does not consume subprocess calls
prompts_seen = []
call_count = [0]
@ -927,3 +944,131 @@ class TestSpecialistsYaml:
for head in heads:
assert head in pm_prompt, \
f"PM prompt missing reference to '{head}'"
# ---------------------------------------------------------------------------
# Edge cases: brief requirement (KIN-098 tester brief)
# ---------------------------------------------------------------------------
class TestEdgeCases:
"""Edge cases requested in the tester brief:
1. sub_pipeline with non-existent (non-head) role
2. handoff artifacts containing invalid JSON string
3. parent_pipeline_id referencing a non-existent pipeline
4. sub-pipeline worker failure generic error (blocked_reason not propagated)
"""
@patch("agents.runner.check_claude_auth")
@patch("agents.runner._run_learning_extraction")
@patch("agents.runner.subprocess.run")
def test_sub_pipeline_with_nonexistent_role_runs_normally(self, mock_run, mock_learning, mock_auth, conn):
"""Dept head returns sub_pipeline with unknown non-head role.
The recursion guard only blocks _head roles. An unknown regular role
is treated as a regular step and executed via run_agent (subprocess mock).
"""
mock_auth.return_value = None
mock_learning.return_value = None
mock_run.return_value = _mock_claude_success({"result": "done"})
output = _dept_head_output(
sub_pipeline=[{"role": "unknown_specialist_xyz", "brief": "Do something unusual"}],
)
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
result = _execute_department_head_step(
conn, "PROJ-001", "proj",
parent_pipeline_id=pipeline["id"],
step={"role": "backend_head", "brief": "Do backend"},
dept_head_result={"raw_output": json.dumps(output)},
)
# Unknown role is not a dept head → no recursion guard → runs as regular step
assert result["success"] is True
# Handoff saved (partial or done depending on sub-pipeline result)
handoffs = models.get_handoffs_for_task(conn, "PROJ-001")
assert len(handoffs) == 1
def test_create_handoff_with_invalid_json_string_artifacts(self, conn):
"""create_handoff accepts artifacts as an invalid JSON string.
models.create_handoff passes non-dict values through _json_encode as-is.
SQLite stores the raw string; no validation at the model layer.
"""
pipeline = models.create_pipeline(conn, "PROJ-001", "proj", "dept_feature",
[{"role": "backend_head"}])
invalid_json = "{this is definitely not valid json: [unclosed"
# Should not raise — models layer stores any string as-is
handoff = models.create_handoff(
conn, pipeline["id"], "PROJ-001",
from_department="backend",
artifacts=invalid_json,
status="done",
)
assert handoff is not None
result = models.get_last_handoff(conn, "PROJ-001")
assert result is not None
# String artifacts stored and returned unchanged (no double-encoding)
assert result["artifacts"] == invalid_json
def test_create_handoff_with_nonexistent_parent_pipeline_id_raises(self, conn):
"""create_handoff with a pipeline_id that doesn't exist raises IntegrityError.
init_db enables PRAGMA foreign_keys=ON, so SQLite enforces the FK constraint
on department_handoffs.pipeline_id pipelines.id.
"""
import sqlite3 as _sqlite3
fake_pipeline_id = 99999 # no pipeline with this id exists
with pytest.raises(_sqlite3.IntegrityError, match="FOREIGN KEY constraint failed"):
models.create_handoff(
conn, fake_pipeline_id, "PROJ-001",
from_department="backend",
artifacts={"note": "orphaned handoff"},
status="done",
)
@patch("agents.runner.check_claude_auth")
@patch("agents.runner._run_learning_extraction")
@patch("agents.runner._run_autocommit")
@patch("agents.runner.subprocess.run")
def test_sub_pipeline_worker_failure_produces_generic_error_message(
self, mock_run, mock_autocommit, mock_learning, mock_auth, conn
):
"""When a worker in the dept sub-pipeline fails, run_pipeline returns a generic
error message 'Department X sub-pipeline failed'.
The specific worker error (stderr) is NOT propagated into the error message.
This documents WARNING #2 from the KIN-098 architectural review:
blocked_reason from the sub-pipeline result is not forwarded.
"""
mock_auth.return_value = None
mock_learning.return_value = None
call_count = [0]
def side_effect(*args, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
# backend_head plans work
return _mock_claude_success(_dept_head_output(
sub_pipeline=[{"role": "backend_dev", "brief": "Implement"}],
))
# backend_dev fails with a specific error
return _mock_claude_failure("syntax error at line 42: unexpected token")
mock_run.side_effect = side_effect
result = run_pipeline(conn, "PROJ-001", [{"role": "backend_head", "brief": "Do backend"}])
assert result["success"] is False
# Error contains the dept head role name
assert "backend_head" in result["error"]
# The specific worker error is NOT propagated (known limitation)
assert "syntax error at line 42" not in result["error"]
# Task is set to blocked
task = models.get_task(conn, "PROJ-001")
assert task["status"] == "blocked"