diff --git a/agents/runner.py b/agents/runner.py index d2b67c2..2c2bbb5 100644 --- a/agents/runner.py +++ b/agents/runner.py @@ -1177,6 +1177,38 @@ def _execute_department_head_step( } +# --------------------------------------------------------------------------- +# Watchdog helpers +# --------------------------------------------------------------------------- + +import errno as _errno + + +def _check_parent_alive( + conn: sqlite3.Connection, + pipeline: dict, + task_id: str, + project_id: str, +) -> bool: + """Check if parent process is alive. Returns True if pipeline should abort. + + Only treats ESRCH (no such process) as dead parent. + PermissionError (pid 1 / init) and ValueError are ignored — pipeline continues. + """ + ppid = os.getppid() + try: + os.kill(ppid, 0) + except OSError as exc: + if exc.errno == _errno.ESRCH: + reason = f"Parent process died unexpectedly (PID {ppid})" + _logger.warning("Pipeline %s: %s — aborting", pipeline["id"], reason) + models.update_pipeline(conn, pipeline["id"], status="failed") + models.update_task(conn, task_id, status="blocked", blocked_reason=reason) + return True + # PermissionError (EPERM) — process exists but we can't signal it: continue + return False + + # --------------------------------------------------------------------------- # Pipeline executor # --------------------------------------------------------------------------- @@ -1234,6 +1266,9 @@ def run_pipeline( pipeline = models.create_pipeline( conn, task_id, project_id, route_type, steps, ) + # Save PID so watchdog can detect dead subprocesses (KIN-099) + models.update_pipeline(conn, pipeline["id"], pid=os.getpid()) + pipeline["pid"] = os.getpid() models.update_task(conn, task_id, status="in_progress") results = [] @@ -1248,6 +1283,19 @@ def run_pipeline( model = step.get("model", "sonnet") brief = step.get("brief") + # Check parent process is still alive (KIN-099 watchdog) + if not dry_run and pipeline: + if _check_parent_alive(conn, pipeline, task_id, project_id): + return { + "success": False, + "error": "parent_process_died", + "steps_completed": i, + "total_cost": total_cost, + "total_tokens": total_tokens, + "total_duration": total_duration, + "results": results, + } + # Worktree isolation: opt-in per project, for write-capable roles _WORKTREE_ROLES = {"backend_dev", "frontend_dev", "debugger"} worktree_path = None diff --git a/agents/specialists.yaml b/agents/specialists.yaml index e3c49eb..453361b 100644 --- a/agents/specialists.yaml +++ b/agents/specialists.yaml @@ -332,3 +332,11 @@ routes: dept_marketing: steps: [marketing_head] description: "Marketing task routed through department head" + + dept_infra: + steps: [infra_head] + description: "Infrastructure task routed through department head" + + dept_research: + steps: [research_head] + description: "Research task routed through department head" diff --git a/core/db.py b/core/db.py index 0f7a522..2272be7 100644 --- a/core/db.py +++ b/core/db.py @@ -619,6 +619,9 @@ def _migrate(conn: sqlite3.Connection): if "department" not in pipeline_cols: conn.execute("ALTER TABLE pipelines ADD COLUMN department TEXT") conn.commit() + if "pid" not in pipeline_cols: + conn.execute("ALTER TABLE pipelines ADD COLUMN pid INTEGER") + conn.commit() # Create department_handoffs table (KIN-098) if "department_handoffs" not in existing_tables: diff --git a/core/models.py b/core/models.py index fb85f4a..43d8155 100644 --- a/core/models.py +++ b/core/models.py @@ -496,6 +496,7 @@ def update_pipeline( total_cost_usd: float | None = None, total_tokens: int | None = None, total_duration_seconds: int | None = None, + pid: int | None = None, ) -> dict: """Update pipeline status and stats.""" fields: dict[str, Any] = {} @@ -509,6 +510,8 @@ def update_pipeline( fields["total_tokens"] = total_tokens if total_duration_seconds is not None: fields["total_duration_seconds"] = total_duration_seconds + if pid is not None: + fields["pid"] = pid if fields: sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] @@ -520,6 +523,14 @@ def update_pipeline( return _row_to_dict(row) +def get_running_pipelines_with_pid(conn: sqlite3.Connection) -> list[dict]: + """Return all running pipelines that have a known PID (used by watchdog).""" + rows = conn.execute( + "SELECT id, task_id, pid FROM pipelines WHERE status = 'running' AND pid IS NOT NULL" + ).fetchall() + return _rows_to_list(rows) + + # --------------------------------------------------------------------------- # Support # --------------------------------------------------------------------------- diff --git a/core/watchdog.py b/core/watchdog.py new file mode 100644 index 0000000..7426f8b --- /dev/null +++ b/core/watchdog.py @@ -0,0 +1,77 @@ +""" +Kin pipeline watchdog — background thread that detects dead subprocesses. + +Every `interval` seconds: check all running pipelines with a known PID. +If the PID is dead (ProcessLookupError / ESRCH), mark pipeline as failed +and task as blocked with a descriptive reason. +""" + +import logging +import os +import sqlite3 +import threading +import time +from pathlib import Path + +from core import models +from core.db import init_db + +_logger = logging.getLogger("kin.watchdog") + + +def _check_dead_pipelines(db_path: Path) -> None: + """Single watchdog pass: open a fresh connection, scan running pipelines.""" + try: + conn = sqlite3.connect(str(db_path), check_same_thread=False) + conn.row_factory = sqlite3.Row + try: + running = models.get_running_pipelines_with_pid(conn) + except Exception as exc: + # Table may not exist yet on very first startup + _logger.debug("Watchdog: could not query pipelines (%s)", exc) + conn.close() + return + + for row in running: + pid = row["pid"] + pipeline_id = row["id"] + task_id = row["task_id"] + try: + os.kill(pid, 0) # signal 0 = existence check + except ProcessLookupError: + reason = f"Process died unexpectedly (PID {pid})" + _logger.warning( + "Watchdog: pipeline %s PID %s is dead — marking blocked (%s)", + pipeline_id, pid, task_id, + ) + try: + models.update_pipeline(conn, pipeline_id, status="failed") + models.update_task(conn, task_id, status="blocked", blocked_reason=reason) + except Exception as upd_exc: + _logger.error("Watchdog: failed to update pipeline/task: %s", upd_exc) + except PermissionError: + # Process exists but we can't signal it (e.g. different user) — skip + pass + conn.close() + except Exception as exc: + _logger.error("Watchdog pass failed: %s", exc) + + +def _watchdog_loop(db_path: Path, interval: int) -> None: + """Daemon thread body: sleep then check, forever.""" + _logger.info("Watchdog started (interval=%ds, db=%s)", interval, db_path) + while True: + time.sleep(interval) + _check_dead_pipelines(db_path) + + +def start_watchdog(db_path: Path, interval: int = 30) -> None: + """Start the background watchdog thread (daemon=True, so it dies with the process).""" + t = threading.Thread( + target=_watchdog_loop, + args=(db_path, interval), + daemon=True, + name="kin-watchdog", + ) + t.start() + _logger.info("Watchdog thread launched (pid=%d)", os.getpid()) diff --git a/tests/test_api.py b/tests/test_api.py index f4d9924..47714e3 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2174,3 +2174,28 @@ def test_get_projects_includes_test_command(client): p1 = next((p for p in projects if p["id"] == "p1"), None) assert p1 is not None assert p1["test_command"] == "cargo test" + + +def test_patch_project_test_command_db_value_verified(client): + """KIN-ARCH-008 decision #318: PATCH test_command сохраняет значение и в response, и в БД.""" + r = client.patch("/api/projects/p1", json={"test_command": "pytest --tb=short"}) + assert r.status_code == 200 + assert r.json()["test_command"] == "pytest --tb=short" + + from core.db import init_db + conn = init_db(api_module.DB_PATH) + row = conn.execute("SELECT test_command FROM projects WHERE id = 'p1'").fetchone() + conn.close() + assert row[0] == "pytest --tb=short" + + +def test_patch_project_test_command_null_returns_400(client): + """KIN-ARCH-008: PATCH с test_command=null (и без других полей) → 400 'Nothing to update'. + + null трактуется как «поле не передано»: has_any=False → 400. + Это документирует текущее поведение: нет способа сбросить test_command через PATCH null. + """ + client.patch("/api/projects/p1", json={"test_command": "npm test"}) + r = client.patch("/api/projects/p1", json={"test_command": None}) + assert r.status_code == 400 + assert "Nothing to update" in r.json()["detail"] diff --git a/tests/test_kin_091_regression.py b/tests/test_kin_091_regression.py index e2b9f8a..47df75f 100644 --- a/tests/test_kin_091_regression.py +++ b/tests/test_kin_091_regression.py @@ -213,6 +213,22 @@ class TestRunProjectTests: assert result["success"] is False assert result["returncode"] == 127 + def test_run_project_tests_empty_string_returns_failure(self): + """KIN-ARCH-008 AC#7: пустая строка test_command возвращает returncode -1, success=False.""" + from agents.runner import _run_project_tests + result = _run_project_tests("/fake/path", test_command="") + assert result["success"] is False + assert result["returncode"] == -1 + assert "Empty test_command" in result["output"] + + def test_returncode_127_output_contains_not_found(self): + """KIN-ARCH-008 AC#7: при returncode 127 вывод содержит 'not found' для диагностики.""" + from agents.runner import _run_project_tests + with patch("agents.runner.subprocess.run", side_effect=FileNotFoundError), \ + patch("agents.runner.shutil.which", return_value=None): + result = _run_project_tests("/fake/path", test_command="badcmd") + assert "not found" in result["output"].lower() + def _mock_success(output="done"): m = MagicMock() @@ -359,6 +375,30 @@ class TestAutoTestInPipeline: called_test_command = mock_tests.call_args[0][1] assert called_test_command == "npm test" + @patch("agents.runner._run_autocommit") + @patch("agents.runner._run_project_tests") + @patch("agents.runner.subprocess.run") + def test_auto_test_returncode_127_blocks_task( + self, mock_run, mock_tests, mock_autocommit, conn + ): + """KIN-ARCH-008 AC#7: returncode 127 (команда не найдена) исчерпывает попытки и блокирует задачу.""" + from agents.runner import run_pipeline + from core import models + import os + + mock_run.return_value = _mock_success() + # Команда не найдена — всегда 127, always fails + mock_tests.return_value = {"success": False, "output": "badcmd not found in PATH", "returncode": 127} + models.update_project(conn, "vdol", auto_test_enabled=True, test_command="badcmd") + + with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "1"}): + steps = [{"role": "backend_dev", "brief": "implement"}] + result = run_pipeline(conn, "VDOL-001", steps) + + assert result["success"] is False + task = models.get_task(conn, "VDOL-001") + assert task["status"] == "blocked" + # --------------------------------------------------------------------------- # (3) Spec-driven workflow route diff --git a/tests/test_watchdog.py b/tests/test_watchdog.py new file mode 100644 index 0000000..eb03a71 --- /dev/null +++ b/tests/test_watchdog.py @@ -0,0 +1,233 @@ +"""Tests for pipeline watchdog (KIN-099).""" + +import json +import os +import pytest +from unittest.mock import patch, MagicMock + +from core.db import init_db +from core import models +from core.watchdog import _check_dead_pipelines + + +@pytest.fixture +def conn(): + c = init_db(db_path=":memory:") + # Seed project + task used by run_pipeline tests + models.create_project(c, "vdol", "ВДОЛЬ", "~/projects/vdolipoperek", + tech_stack=["vue3"]) + models.create_task(c, "VDOL-001", "vdol", "Fix bug", + brief={"route_type": "debug"}) + yield c + c.close() + + +@pytest.fixture +def project_and_task(conn): + p = models.create_project(conn, "proj1", "Test Project", "/tmp/test") + t = models.create_task(conn, "T-001", "proj1", "Test task", priority=3) + return p, t + + +def _make_running_pipeline(conn, task_id, project_id, pid=None): + pipeline = models.create_pipeline(conn, task_id, project_id, "custom", []) + models.update_pipeline(conn, pipeline["id"], status="running", pid=pid) + return pipeline + + +# --------------------------------------------------------------------------- +# get_running_pipelines_with_pid +# --------------------------------------------------------------------------- + +def test_get_running_pipelines_returns_only_with_pid(conn, project_and_task): + _, task = project_and_task + tid = task["id"] + + _make_running_pipeline(conn, tid, "proj1", pid=12345) + _make_running_pipeline(conn, tid, "proj1", pid=None) # no pid — should be excluded + + rows = models.get_running_pipelines_with_pid(conn) + assert len(rows) == 1 + assert rows[0]["pid"] == 12345 + + +def test_get_running_pipelines_excludes_completed(conn, project_and_task): + _, task = project_and_task + tid = task["id"] + + pipeline = models.create_pipeline(conn, tid, "proj1", "custom", []) + # completed pipeline with pid — should NOT appear + models.update_pipeline(conn, pipeline["id"], status="completed", pid=99999) + + rows = models.get_running_pipelines_with_pid(conn) + assert rows == [] + + +# --------------------------------------------------------------------------- +# _check_dead_pipelines (watchdog logic via tmp db file) +# --------------------------------------------------------------------------- + +def _db_with_running_pipeline(tmp_path, pid): + """Create a temp DB file with one running pipeline and return (db_path, task_id).""" + db_path = tmp_path / "kin.db" + conn = init_db(db_path=str(db_path)) + models.create_project(conn, "p1", "Proj", "/tmp/p") + task = models.create_task(conn, "T-W01", "p1", "watchdog task", priority=1) + tid = task["id"] + pipeline = models.create_pipeline(conn, tid, "p1", "custom", []) + models.update_pipeline(conn, pipeline["id"], status="running", pid=pid) + conn.close() + return db_path, tid, pipeline["id"] + + +def test_watchdog_alive_pid_no_change(tmp_path): + """Живой PID не должен менять статус задачи.""" + live_pid = os.getpid() # Our own PID — definitely alive + db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, live_pid) + + _check_dead_pipelines(db_path) + + conn = init_db(db_path=str(db_path)) + task = models.get_task(conn, task_id) + pipeline_row = conn.execute("SELECT status FROM pipelines WHERE id=?", (pipeline_id,)).fetchone() + conn.close() + + assert task["status"] != "blocked" + assert pipeline_row["status"] == "running" + + +def test_watchdog_dead_pid_blocks_task(tmp_path): + """Мёртвый PID → pipeline=failed, task=blocked с blocked_reason.""" + dead_pid = 2**31 - 1 # Guaranteed non-existent PID + db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, dead_pid) + + # Ensure the PID is actually dead + with pytest.raises((ProcessLookupError, OSError)): + os.kill(dead_pid, 0) + + _check_dead_pipelines(db_path) + + conn = init_db(db_path=str(db_path)) + task = models.get_task(conn, task_id) + pipeline_row = conn.execute("SELECT status FROM pipelines WHERE id=?", (pipeline_id,)).fetchone() + conn.close() + + assert task["status"] == "blocked" + assert str(dead_pid) in (task.get("blocked_reason") or "") + assert pipeline_row["status"] == "failed" + + +def test_watchdog_pipeline_without_pid_skipped(tmp_path): + """Pipeline без pid не трогается watchdog'ом.""" + db_path = tmp_path / "kin.db" + conn = init_db(db_path=str(db_path)) + models.create_project(conn, "p2", "Proj2", "/tmp/p2") + task = models.create_task(conn, "T-W02", "p2", "no-pid task", priority=1) + tid = task["id"] + pipeline = models.create_pipeline(conn, tid, "p2", "custom", []) + models.update_pipeline(conn, pipeline["id"], status="running") # pid=None + conn.close() + + _check_dead_pipelines(db_path) + + conn = init_db(db_path=str(db_path)) + task = models.get_task(conn, tid) + conn.close() + + assert task["status"] != "blocked" + + +# --------------------------------------------------------------------------- +# DB migration (KIN-099): pid column exists and is nullable +# --------------------------------------------------------------------------- + +def test_db_migration_pid_column_exists(conn): + """После init_db поле pid должно присутствовать в таблице pipelines.""" + cols = {row[1] for row in conn.execute("PRAGMA table_info(pipelines)").fetchall()} + assert "pid" in cols + + +def test_db_migration_pid_column_nullable(conn, project_and_task): + """pid nullable: INSERT pipeline без pid должен работать.""" + _, task = project_and_task + # create_pipeline не передаёт pid — должно вставиться без ошибок + pipeline = models.create_pipeline(conn, task["id"], "proj1", "custom", []) + row = conn.execute("SELECT pid FROM pipelines WHERE id=?", (pipeline["id"],)).fetchone() + assert row["pid"] is None + + +# --------------------------------------------------------------------------- +# PID saved after run_pipeline() +# --------------------------------------------------------------------------- + +def _make_mock_claude_success(): + mock = MagicMock() + mock.stdout = json.dumps({ + "result": "done", + "usage": {"total_tokens": 100}, + "cost_usd": 0.001, + }) + mock.stderr = "" + mock.returncode = 0 + return mock + + +def test_run_pipeline_saves_pid(conn): + """run_pipeline() сохраняет os.getpid() в поле pid таблицы pipelines.""" + from agents.runner import run_pipeline + + steps = [{"role": "tester", "model": "haiku", "brief": "test brief"}] + + with patch("agents.runner.subprocess.run") as mock_run, \ + patch("agents.runner.check_claude_auth"): + mock_run.return_value = _make_mock_claude_success() + run_pipeline(conn, "VDOL-001", steps) + + # Проверяем, что хоть один pipeline записан с pid = os.getpid() + row = conn.execute( + "SELECT pid FROM pipelines WHERE task_id='VDOL-001' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert row is not None + assert row["pid"] == os.getpid() + + +# --------------------------------------------------------------------------- +# Parent process check per pipeline step +# --------------------------------------------------------------------------- + +def test_run_pipeline_aborts_when_parent_dies(conn): + """Если parent process мёртв — pipeline завершается с failed, task blocked.""" + import errno as _errno + from agents.runner import run_pipeline + + steps = [{"role": "tester", "model": "haiku", "brief": "test brief"}] + + dead_ppid = 2**31 - 1 # guaranteed non-existent + + def _fake_kill(pid, sig): + if pid == dead_ppid and sig == 0: + # Must carry errno=ESRCH so _check_parent_alive recognises it + raise OSError(_errno.ESRCH, os.strerror(_errno.ESRCH)) + # Other PIDs (e.g. our own) — allow silently + return None + + with patch("agents.runner.os.getppid", return_value=dead_ppid), \ + patch("agents.runner.os.kill", side_effect=_fake_kill), \ + patch("agents.runner.check_claude_auth"), \ + patch("agents.runner.subprocess.run") as mock_run: + mock_run.return_value = _make_mock_claude_success() + result = run_pipeline(conn, "VDOL-001", steps) + + assert result["success"] is False + assert result.get("error") == "parent_process_died" + + # task должна быть blocked с упоминанием dead_ppid + task = models.get_task(conn, "VDOL-001") + assert task["status"] == "blocked" + assert str(dead_ppid) in (task.get("blocked_reason") or "") + + # pipeline должен быть failed + row = conn.execute( + "SELECT status FROM pipelines WHERE task_id='VDOL-001' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert row["status"] == "failed" diff --git a/web/api.py b/web/api.py index e331e65..4ee3e8c 100644 --- a/web/api.py +++ b/web/api.py @@ -87,6 +87,10 @@ _check_git_available() app = FastAPI(title="Kin API", version="0.1.0") +# Start pipeline watchdog (KIN-099): detects dead subprocess PIDs every 30s +from core.watchdog import start_watchdog as _start_watchdog +_start_watchdog(DB_PATH) + app.add_middleware( CORSMiddleware, allow_origins=["*"],