"""Tests for pipeline watchdog (KIN-099).""" import json import os import pytest from unittest.mock import patch, MagicMock from core.db import init_db from core import models from core.watchdog import _check_dead_pipelines @pytest.fixture def conn(): c = init_db(db_path=":memory:") # Seed project + task used by run_pipeline tests models.create_project(c, "vdol", "ВДОЛЬ", "~/projects/vdolipoperek", tech_stack=["vue3"]) models.create_task(c, "VDOL-001", "vdol", "Fix bug", brief={"route_type": "debug"}) yield c c.close() @pytest.fixture def project_and_task(conn): p = models.create_project(conn, "proj1", "Test Project", "/tmp/test") t = models.create_task(conn, "T-001", "proj1", "Test task", priority=3) return p, t def _make_running_pipeline(conn, task_id, project_id, pid=None): pipeline = models.create_pipeline(conn, task_id, project_id, "custom", []) models.update_pipeline(conn, pipeline["id"], status="running", pid=pid) return pipeline # --------------------------------------------------------------------------- # get_running_pipelines_with_pid # --------------------------------------------------------------------------- def test_get_running_pipelines_returns_only_with_pid(conn, project_and_task): _, task = project_and_task tid = task["id"] _make_running_pipeline(conn, tid, "proj1", pid=12345) _make_running_pipeline(conn, tid, "proj1", pid=None) # no pid — should be excluded rows = models.get_running_pipelines_with_pid(conn) assert len(rows) == 1 assert rows[0]["pid"] == 12345 def test_get_running_pipelines_excludes_completed(conn, project_and_task): _, task = project_and_task tid = task["id"] pipeline = models.create_pipeline(conn, tid, "proj1", "custom", []) # completed pipeline with pid — should NOT appear models.update_pipeline(conn, pipeline["id"], status="completed", pid=99999) rows = models.get_running_pipelines_with_pid(conn) assert rows == [] # --------------------------------------------------------------------------- # _check_dead_pipelines (watchdog logic via tmp db file) # --------------------------------------------------------------------------- def _db_with_running_pipeline(tmp_path, pid): """Create a temp DB file with one running pipeline and return (db_path, task_id).""" db_path = tmp_path / "kin.db" conn = init_db(db_path=str(db_path)) models.create_project(conn, "p1", "Proj", "/tmp/p") task = models.create_task(conn, "T-W01", "p1", "watchdog task", priority=1) tid = task["id"] pipeline = models.create_pipeline(conn, tid, "p1", "custom", []) models.update_pipeline(conn, pipeline["id"], status="running", pid=pid) conn.close() return db_path, tid, pipeline["id"] def test_watchdog_alive_pid_no_change(tmp_path): """Живой PID не должен менять статус задачи.""" live_pid = os.getpid() # Our own PID — definitely alive db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, live_pid) _check_dead_pipelines(db_path) conn = init_db(db_path=str(db_path)) task = models.get_task(conn, task_id) pipeline_row = conn.execute("SELECT status FROM pipelines WHERE id=?", (pipeline_id,)).fetchone() conn.close() assert task["status"] != "blocked" assert pipeline_row["status"] == "running" def test_watchdog_dead_pid_blocks_task(tmp_path): """Мёртвый PID → pipeline=failed, task=blocked с blocked_reason.""" dead_pid = 2**31 - 1 # Guaranteed non-existent PID db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, dead_pid) # Ensure the PID is actually dead with pytest.raises((ProcessLookupError, OSError)): os.kill(dead_pid, 0) _check_dead_pipelines(db_path) conn = init_db(db_path=str(db_path)) task = models.get_task(conn, task_id) pipeline_row = conn.execute("SELECT status FROM pipelines WHERE id=?", (pipeline_id,)).fetchone() conn.close() assert task["status"] == "blocked" assert str(dead_pid) in (task.get("blocked_reason") or "") assert pipeline_row["status"] == "failed" def test_watchdog_pipeline_without_pid_skipped(tmp_path): """Pipeline без pid не трогается watchdog'ом.""" db_path = tmp_path / "kin.db" conn = init_db(db_path=str(db_path)) models.create_project(conn, "p2", "Proj2", "/tmp/p2") task = models.create_task(conn, "T-W02", "p2", "no-pid task", priority=1) tid = task["id"] pipeline = models.create_pipeline(conn, tid, "p2", "custom", []) models.update_pipeline(conn, pipeline["id"], status="running") # pid=None conn.close() _check_dead_pipelines(db_path) conn = init_db(db_path=str(db_path)) task = models.get_task(conn, tid) conn.close() assert task["status"] != "blocked" # --------------------------------------------------------------------------- # DB migration (KIN-099): pid column exists and is nullable # --------------------------------------------------------------------------- def test_db_migration_pid_column_exists(conn): """После init_db поле pid должно присутствовать в таблице pipelines.""" cols = {row[1] for row in conn.execute("PRAGMA table_info(pipelines)").fetchall()} assert "pid" in cols def test_db_migration_pid_column_nullable(conn, project_and_task): """pid nullable: INSERT pipeline без pid должен работать.""" _, task = project_and_task # create_pipeline не передаёт pid — должно вставиться без ошибок pipeline = models.create_pipeline(conn, task["id"], "proj1", "custom", []) row = conn.execute("SELECT pid FROM pipelines WHERE id=?", (pipeline["id"],)).fetchone() assert row["pid"] is None # --------------------------------------------------------------------------- # PID saved after run_pipeline() # --------------------------------------------------------------------------- def _make_mock_claude_success(): mock = MagicMock() mock.stdout = json.dumps({ "result": "done", "usage": {"total_tokens": 100}, "cost_usd": 0.001, }) mock.stderr = "" mock.returncode = 0 return mock def test_run_pipeline_saves_pid(conn): """run_pipeline() сохраняет os.getpid() в поле pid таблицы pipelines.""" from agents.runner import run_pipeline steps = [{"role": "tester", "model": "haiku", "brief": "test brief"}] with patch("agents.runner.subprocess.run") as mock_run, \ patch("agents.runner.check_claude_auth"): mock_run.return_value = _make_mock_claude_success() run_pipeline(conn, "VDOL-001", steps) # Проверяем, что хоть один pipeline записан с pid = os.getpid() row = conn.execute( "SELECT pid FROM pipelines WHERE task_id='VDOL-001' ORDER BY id DESC LIMIT 1" ).fetchone() assert row is not None assert row["pid"] == os.getpid() # --------------------------------------------------------------------------- # Parent process check per pipeline step # --------------------------------------------------------------------------- def test_run_pipeline_aborts_when_parent_dies(conn): """Если parent process мёртв — pipeline завершается с failed, task blocked.""" import errno as _errno from agents.runner import run_pipeline steps = [{"role": "tester", "model": "haiku", "brief": "test brief"}] dead_ppid = 2**31 - 1 # guaranteed non-existent def _fake_kill(pid, sig): if pid == dead_ppid and sig == 0: # Must carry errno=ESRCH so _check_parent_alive recognises it raise OSError(_errno.ESRCH, os.strerror(_errno.ESRCH)) # Other PIDs (e.g. our own) — allow silently return None with patch("agents.runner.os.getppid", return_value=dead_ppid), \ patch("agents.runner.os.kill", side_effect=_fake_kill), \ patch("agents.runner.check_claude_auth"), \ patch("agents.runner.subprocess.run") as mock_run: mock_run.return_value = _make_mock_claude_success() result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False assert result.get("error") == "parent_process_died" # task должна быть blocked с упоминанием dead_ppid task = models.get_task(conn, "VDOL-001") assert task["status"] == "blocked" assert str(dead_ppid) in (task.get("blocked_reason") or "") # pipeline должен быть failed row = conn.execute( "SELECT status FROM pipelines WHERE task_id='VDOL-001' ORDER BY id DESC LIMIT 1" ).fetchone() assert row["status"] == "failed"