"""Tests for pipeline watchdog (KIN-099, KIN-OBS-015, KIN-OBS-019).""" import errno import inspect import json import os import threading import pytest from unittest.mock import patch, MagicMock from core.db import init_db, get_connection from core import models from core.watchdog import _check_dead_pipelines, start_watchdog @pytest.fixture def conn(): c = init_db(db_path=":memory:") # Seed project + task used by run_pipeline tests models.create_project(c, "vdol", "ВДОЛЬ", "~/projects/vdolipoperek", tech_stack=["vue3"]) models.create_task(c, "VDOL-001", "vdol", "Fix bug", brief={"route_type": "debug"}) yield c c.close() @pytest.fixture def project_and_task(conn): p = models.create_project(conn, "proj1", "Test Project", "/tmp/test") t = models.create_task(conn, "T-001", "proj1", "Test task", priority=3) return p, t def _make_running_pipeline(conn, task_id, project_id, pid=None): pipeline = models.create_pipeline(conn, task_id, project_id, "custom", []) models.update_pipeline(conn, pipeline["id"], status="running", pid=pid) return pipeline # --------------------------------------------------------------------------- # get_running_pipelines_with_pid # --------------------------------------------------------------------------- def test_get_running_pipelines_returns_only_with_pid(conn, project_and_task): _, task = project_and_task tid = task["id"] _make_running_pipeline(conn, tid, "proj1", pid=12345) _make_running_pipeline(conn, tid, "proj1", pid=None) # no pid — should be excluded rows = models.get_running_pipelines_with_pid(conn) assert len(rows) == 1 assert rows[0]["pid"] == 12345 def test_get_running_pipelines_excludes_completed(conn, project_and_task): _, task = project_and_task tid = task["id"] pipeline = models.create_pipeline(conn, tid, "proj1", "custom", []) # completed pipeline with pid — should NOT appear models.update_pipeline(conn, pipeline["id"], status="completed", pid=99999) rows = models.get_running_pipelines_with_pid(conn) assert rows == [] # --------------------------------------------------------------------------- # _check_dead_pipelines (watchdog logic via tmp db file) # --------------------------------------------------------------------------- def _db_with_running_pipeline(tmp_path, pid): """Create a temp DB file with one running pipeline and return (db_path, task_id).""" db_path = tmp_path / "kin.db" conn = init_db(db_path=str(db_path)) models.create_project(conn, "p1", "Proj", "/tmp/p") task = models.create_task(conn, "T-W01", "p1", "watchdog task", priority=1) tid = task["id"] pipeline = models.create_pipeline(conn, tid, "p1", "custom", []) models.update_pipeline(conn, pipeline["id"], status="running", pid=pid) conn.close() return db_path, tid, pipeline["id"] def test_watchdog_alive_pid_no_change(tmp_path): """Живой PID не должен менять статус задачи.""" live_pid = os.getpid() # Our own PID — definitely alive db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, live_pid) _check_dead_pipelines(db_path) conn = init_db(db_path=str(db_path)) task = models.get_task(conn, task_id) pipeline_row = conn.execute("SELECT status FROM pipelines WHERE id=?", (pipeline_id,)).fetchone() conn.close() assert task["status"] != "blocked" assert pipeline_row["status"] == "running" def test_watchdog_dead_pid_blocks_task(tmp_path): """Мёртвый PID → pipeline=failed, task=blocked с blocked_reason.""" dead_pid = 2**31 - 1 # Guaranteed non-existent PID db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, dead_pid) # Ensure the PID is actually dead with pytest.raises((ProcessLookupError, OSError)): os.kill(dead_pid, 0) _check_dead_pipelines(db_path) conn = init_db(db_path=str(db_path)) task = models.get_task(conn, task_id) pipeline_row = conn.execute("SELECT status FROM pipelines WHERE id=?", (pipeline_id,)).fetchone() conn.close() assert task["status"] == "blocked" assert str(dead_pid) in (task.get("blocked_reason") or "") assert pipeline_row["status"] == "failed" def test_watchdog_pipeline_without_pid_skipped(tmp_path): """Pipeline без pid не трогается watchdog'ом.""" db_path = tmp_path / "kin.db" conn = init_db(db_path=str(db_path)) models.create_project(conn, "p2", "Proj2", "/tmp/p2") task = models.create_task(conn, "T-W02", "p2", "no-pid task", priority=1) tid = task["id"] pipeline = models.create_pipeline(conn, tid, "p2", "custom", []) models.update_pipeline(conn, pipeline["id"], status="running") # pid=None conn.close() _check_dead_pipelines(db_path) conn = init_db(db_path=str(db_path)) task = models.get_task(conn, tid) conn.close() assert task["status"] != "blocked" # --------------------------------------------------------------------------- # DB migration (KIN-099): pid column exists and is nullable # --------------------------------------------------------------------------- def test_db_migration_pid_column_exists(conn): """После init_db поле pid должно присутствовать в таблице pipelines.""" cols = {row[1] for row in conn.execute("PRAGMA table_info(pipelines)").fetchall()} assert "pid" in cols def test_db_migration_pid_column_nullable(conn, project_and_task): """pid nullable: INSERT pipeline без pid должен работать.""" _, task = project_and_task # create_pipeline не передаёт pid — должно вставиться без ошибок pipeline = models.create_pipeline(conn, task["id"], "proj1", "custom", []) row = conn.execute("SELECT pid FROM pipelines WHERE id=?", (pipeline["id"],)).fetchone() assert row["pid"] is None # --------------------------------------------------------------------------- # PID saved after run_pipeline() # --------------------------------------------------------------------------- def _make_mock_claude_success(): mock = MagicMock() mock.stdout = json.dumps({ "result": "done", "usage": {"total_tokens": 100}, "cost_usd": 0.001, }) mock.stderr = "" mock.returncode = 0 return mock def test_run_pipeline_saves_pid(conn): """run_pipeline() сохраняет os.getpid() в поле pid таблицы pipelines.""" from agents.runner import run_pipeline steps = [{"role": "tester", "model": "haiku", "brief": "test brief"}] with patch("agents.runner.subprocess.run") as mock_run, \ patch("agents.runner.check_claude_auth"): mock_run.return_value = _make_mock_claude_success() run_pipeline(conn, "VDOL-001", steps) # Проверяем, что хоть один pipeline записан с pid = os.getpid() row = conn.execute( "SELECT pid FROM pipelines WHERE task_id='VDOL-001' ORDER BY id DESC LIMIT 1" ).fetchone() assert row is not None assert row["pid"] == os.getpid() # --------------------------------------------------------------------------- # Parent process check per pipeline step # --------------------------------------------------------------------------- def test_run_pipeline_aborts_when_parent_dies(conn): """Если parent process мёртв — pipeline завершается с failed, task blocked.""" import errno as _errno from agents.runner import run_pipeline steps = [{"role": "tester", "model": "haiku", "brief": "test brief"}] dead_ppid = 2**31 - 1 # guaranteed non-existent def _fake_kill(pid, sig): if pid == dead_ppid and sig == 0: # Must carry errno=ESRCH so _check_parent_alive recognises it raise OSError(_errno.ESRCH, os.strerror(_errno.ESRCH)) # Other PIDs (e.g. our own) — allow silently return None with patch("agents.runner.os.getppid", return_value=dead_ppid), \ patch("agents.runner.os.kill", side_effect=_fake_kill), \ patch("agents.runner.check_claude_auth"), \ patch("agents.runner.subprocess.run") as mock_run: mock_run.return_value = _make_mock_claude_success() result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False assert result.get("error") == "parent_process_died" # task должна быть blocked с упоминанием dead_ppid task = models.get_task(conn, "VDOL-001") assert task["status"] == "blocked" assert str(dead_ppid) in (task.get("blocked_reason") or "") # pipeline должен быть failed row = conn.execute( "SELECT status FROM pipelines WHERE task_id='VDOL-001' ORDER BY id DESC LIMIT 1" ).fetchone() assert row["status"] == "failed" # --------------------------------------------------------------------------- # KIN-OBS-015: start_watchdog() double-start guard # --------------------------------------------------------------------------- def test_start_watchdog_sets_started_flag(tmp_path): """start_watchdog() устанавливает _watchdog_started = True.""" import core.watchdog as wd wd._watchdog_started = False db_path = tmp_path / "kin_flag.db" init_db(db_path=str(db_path)) with patch("core.watchdog.threading.Thread") as mock_thread_cls: mock_thread = MagicMock() mock_thread_cls.return_value = mock_thread start_watchdog(db_path, interval=9999) assert wd._watchdog_started is True wd._watchdog_started = False # restore def test_start_watchdog_double_call_no_second_thread(tmp_path): """Повторный вызов start_watchdog() не создаёт второй поток.""" import core.watchdog as wd wd._watchdog_started = False db_path = tmp_path / "kin_double.db" init_db(db_path=str(db_path)) with patch("core.watchdog.threading.Thread") as mock_thread_cls: mock_thread = MagicMock() mock_thread_cls.return_value = mock_thread start_watchdog(db_path, interval=9999) # first call start_watchdog(db_path, interval=9999) # second call — guard должен заблокировать # Thread создан и запущен ровно один раз assert mock_thread_cls.call_count == 1 assert mock_thread.start.call_count == 1 wd._watchdog_started = False # restore # --------------------------------------------------------------------------- # KIN-OBS-015: _check_dead_pipelines использует get_connection() # --------------------------------------------------------------------------- def test_check_dead_pipelines_calls_get_connection(tmp_path): """_check_dead_pipelines вызывает get_connection() — WAL и foreign_keys PRAGMA применяются.""" db_path = tmp_path / "kin_gc.db" real_conn = init_db(db_path=str(db_path)) real_conn.close() with patch("core.watchdog.get_connection", wraps=get_connection) as mock_gc: _check_dead_pipelines(db_path) mock_gc.assert_called_once_with(db_path) def test_get_connection_applies_wal_and_foreign_keys(tmp_path): """get_connection() применяет PRAGMA journal_mode=WAL и foreign_keys=ON.""" db_path = tmp_path / "kin_pragma.db" conn = get_connection(db_path) jm = conn.execute("PRAGMA journal_mode").fetchone()[0] fk = conn.execute("PRAGMA foreign_keys").fetchone()[0] conn.close() assert jm == "wal" assert fk == 1 # --------------------------------------------------------------------------- # KIN-OBS-015: OSError+errno.ESRCH и другие errno — mock-based # --------------------------------------------------------------------------- def test_check_dead_pipelines_esrch_via_mock(tmp_path): """OSError с errno=ESRCH перехватывается как мёртвый процесс и блокирует задачу.""" fake_pid = 54321 db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, fake_pid) def _fake_kill(pid, sig): if pid == fake_pid and sig == 0: raise OSError(errno.ESRCH, "No such process") with patch("core.watchdog.os.kill", side_effect=_fake_kill): _check_dead_pipelines(db_path) conn = init_db(db_path=str(db_path)) task = models.get_task(conn, task_id) pipeline_row = conn.execute( "SELECT status FROM pipelines WHERE id=?", (pipeline_id,) ).fetchone() conn.close() assert task["status"] == "blocked" assert str(fake_pid) in (task.get("blocked_reason") or "") assert pipeline_row["status"] == "failed" def test_check_dead_pipelines_permission_error_ignored(tmp_path): """OSError с errno=EACCES (процесс жив, нет прав) — статус задачи не меняется.""" fake_pid = 54322 db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, fake_pid) def _fake_kill(pid, sig): if pid == fake_pid and sig == 0: raise OSError(errno.EACCES, "Operation not permitted") with patch("core.watchdog.os.kill", side_effect=_fake_kill): _check_dead_pipelines(db_path) conn = init_db(db_path=str(db_path)) task = models.get_task(conn, task_id) pipeline_row = conn.execute( "SELECT status FROM pipelines WHERE id=?", (pipeline_id,) ).fetchone() conn.close() assert task["status"] != "blocked" assert pipeline_row["status"] == "running" # --------------------------------------------------------------------------- # _check_parent_alive: EACCES и ProcessLookupError (KIN-099, решения #341/#358) # --------------------------------------------------------------------------- def test_check_parent_alive_eacces_does_not_abort(conn): """EACCES при os.kill(ppid, 0) — процесс жив (нет прав) → pipeline продолжается (решение #358).""" import errno as _errno from agents.runner import _check_parent_alive pipeline = models.create_pipeline(conn, "VDOL-001", "vdol", "custom", []) eacces_ppid = 99990 with patch("agents.runner.os.getppid", return_value=eacces_ppid), \ patch("agents.runner.os.kill", side_effect=OSError(_errno.EACCES, "Operation not permitted")): result = _check_parent_alive(conn, pipeline, "VDOL-001", "vdol") assert result is False task = models.get_task(conn, "VDOL-001") assert task["status"] != "blocked" def test_check_parent_alive_process_lookup_error_aborts(conn): """ProcessLookupError (errno=ESRCH) при os.kill → pipeline прерывается (решение #341).""" import errno as _errno from agents.runner import _check_parent_alive pipeline = models.create_pipeline(conn, "VDOL-001", "vdol", "custom", []) dead_ppid = 99991 with patch("agents.runner.os.getppid", return_value=dead_ppid), \ patch("agents.runner.os.kill", side_effect=ProcessLookupError(_errno.ESRCH, "No such process")): result = _check_parent_alive(conn, pipeline, "VDOL-001", "vdol") assert result is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "blocked" assert str(dead_ppid) in (task.get("blocked_reason") or "") # --------------------------------------------------------------------------- # KIN-OBS-019: регрессионные тесты через inspect.getsource() — decision #367 # --------------------------------------------------------------------------- def test_watchdog_source_has_no_sqlite3_connect(): """Регрессия: watchdog не должен содержать прямой вызов sqlite3.connect() (decision #367).""" import core.watchdog as wd source = inspect.getsource(wd) assert "sqlite3.connect" not in source, ( "REGRESSION: core/watchdog.py содержит sqlite3.connect() — " "должен использовать get_connection() из core.db" ) def test_watchdog_source_has_no_explicit_process_lookup_error(): """Регрессия: watchdog не должен перехватывать ProcessLookupError явно (decision #357). Обработка мёртвых процессов унифицирована через OSError + errno.ESRCH. Проверяем через AST: в except-обработчиках не должно быть ProcessLookupError. """ import ast import core.watchdog as wd source = inspect.getsource(wd) tree = ast.parse(source) process_lookup_in_except = False for node in ast.walk(tree): if isinstance(node, ast.ExceptHandler) and node.type is not None: for name_node in ast.walk(node.type): if isinstance(name_node, ast.Name) and name_node.id == "ProcessLookupError": process_lookup_in_except = True break assert not process_lookup_in_except, ( "REGRESSION: core/watchdog.py перехватывает ProcessLookupError явно — " "должен использовать OSError + errno.ESRCH" ) def test_check_dead_pipelines_kill_succeeds_no_change(tmp_path): """os.kill успешно (нет исключения) → процесс жив, статус задачи не меняется.""" fake_pid = 54323 db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, fake_pid) # os.kill returns None (no exception) — process is alive with patch("core.watchdog.os.kill", return_value=None): _check_dead_pipelines(db_path) conn = init_db(db_path=str(db_path)) task = models.get_task(conn, task_id) pipeline_row = conn.execute( "SELECT status FROM pipelines WHERE id=?", (pipeline_id,) ).fetchone() conn.close() assert task["status"] != "blocked" assert pipeline_row["status"] == "running"