kin/tests/test_watchdog.py
2026-03-17 15:59:43 +02:00

233 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for pipeline watchdog (KIN-099)."""
import json
import os
import pytest
from unittest.mock import patch, MagicMock
from core.db import init_db
from core import models
from core.watchdog import _check_dead_pipelines
@pytest.fixture
def conn():
c = init_db(db_path=":memory:")
# Seed project + task used by run_pipeline tests
models.create_project(c, "vdol", "ВДОЛЬ", "~/projects/vdolipoperek",
tech_stack=["vue3"])
models.create_task(c, "VDOL-001", "vdol", "Fix bug",
brief={"route_type": "debug"})
yield c
c.close()
@pytest.fixture
def project_and_task(conn):
p = models.create_project(conn, "proj1", "Test Project", "/tmp/test")
t = models.create_task(conn, "T-001", "proj1", "Test task", priority=3)
return p, t
def _make_running_pipeline(conn, task_id, project_id, pid=None):
pipeline = models.create_pipeline(conn, task_id, project_id, "custom", [])
models.update_pipeline(conn, pipeline["id"], status="running", pid=pid)
return pipeline
# ---------------------------------------------------------------------------
# get_running_pipelines_with_pid
# ---------------------------------------------------------------------------
def test_get_running_pipelines_returns_only_with_pid(conn, project_and_task):
_, task = project_and_task
tid = task["id"]
_make_running_pipeline(conn, tid, "proj1", pid=12345)
_make_running_pipeline(conn, tid, "proj1", pid=None) # no pid — should be excluded
rows = models.get_running_pipelines_with_pid(conn)
assert len(rows) == 1
assert rows[0]["pid"] == 12345
def test_get_running_pipelines_excludes_completed(conn, project_and_task):
_, task = project_and_task
tid = task["id"]
pipeline = models.create_pipeline(conn, tid, "proj1", "custom", [])
# completed pipeline with pid — should NOT appear
models.update_pipeline(conn, pipeline["id"], status="completed", pid=99999)
rows = models.get_running_pipelines_with_pid(conn)
assert rows == []
# ---------------------------------------------------------------------------
# _check_dead_pipelines (watchdog logic via tmp db file)
# ---------------------------------------------------------------------------
def _db_with_running_pipeline(tmp_path, pid):
"""Create a temp DB file with one running pipeline and return (db_path, task_id)."""
db_path = tmp_path / "kin.db"
conn = init_db(db_path=str(db_path))
models.create_project(conn, "p1", "Proj", "/tmp/p")
task = models.create_task(conn, "T-W01", "p1", "watchdog task", priority=1)
tid = task["id"]
pipeline = models.create_pipeline(conn, tid, "p1", "custom", [])
models.update_pipeline(conn, pipeline["id"], status="running", pid=pid)
conn.close()
return db_path, tid, pipeline["id"]
def test_watchdog_alive_pid_no_change(tmp_path):
"""Живой PID не должен менять статус задачи."""
live_pid = os.getpid() # Our own PID — definitely alive
db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, live_pid)
_check_dead_pipelines(db_path)
conn = init_db(db_path=str(db_path))
task = models.get_task(conn, task_id)
pipeline_row = conn.execute("SELECT status FROM pipelines WHERE id=?", (pipeline_id,)).fetchone()
conn.close()
assert task["status"] != "blocked"
assert pipeline_row["status"] == "running"
def test_watchdog_dead_pid_blocks_task(tmp_path):
"""Мёртвый PID → pipeline=failed, task=blocked с blocked_reason."""
dead_pid = 2**31 - 1 # Guaranteed non-existent PID
db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, dead_pid)
# Ensure the PID is actually dead
with pytest.raises((ProcessLookupError, OSError)):
os.kill(dead_pid, 0)
_check_dead_pipelines(db_path)
conn = init_db(db_path=str(db_path))
task = models.get_task(conn, task_id)
pipeline_row = conn.execute("SELECT status FROM pipelines WHERE id=?", (pipeline_id,)).fetchone()
conn.close()
assert task["status"] == "blocked"
assert str(dead_pid) in (task.get("blocked_reason") or "")
assert pipeline_row["status"] == "failed"
def test_watchdog_pipeline_without_pid_skipped(tmp_path):
"""Pipeline без pid не трогается watchdog'ом."""
db_path = tmp_path / "kin.db"
conn = init_db(db_path=str(db_path))
models.create_project(conn, "p2", "Proj2", "/tmp/p2")
task = models.create_task(conn, "T-W02", "p2", "no-pid task", priority=1)
tid = task["id"]
pipeline = models.create_pipeline(conn, tid, "p2", "custom", [])
models.update_pipeline(conn, pipeline["id"], status="running") # pid=None
conn.close()
_check_dead_pipelines(db_path)
conn = init_db(db_path=str(db_path))
task = models.get_task(conn, tid)
conn.close()
assert task["status"] != "blocked"
# ---------------------------------------------------------------------------
# DB migration (KIN-099): pid column exists and is nullable
# ---------------------------------------------------------------------------
def test_db_migration_pid_column_exists(conn):
"""После init_db поле pid должно присутствовать в таблице pipelines."""
cols = {row[1] for row in conn.execute("PRAGMA table_info(pipelines)").fetchall()}
assert "pid" in cols
def test_db_migration_pid_column_nullable(conn, project_and_task):
"""pid nullable: INSERT pipeline без pid должен работать."""
_, task = project_and_task
# create_pipeline не передаёт pid — должно вставиться без ошибок
pipeline = models.create_pipeline(conn, task["id"], "proj1", "custom", [])
row = conn.execute("SELECT pid FROM pipelines WHERE id=?", (pipeline["id"],)).fetchone()
assert row["pid"] is None
# ---------------------------------------------------------------------------
# PID saved after run_pipeline()
# ---------------------------------------------------------------------------
def _make_mock_claude_success():
mock = MagicMock()
mock.stdout = json.dumps({
"result": "done",
"usage": {"total_tokens": 100},
"cost_usd": 0.001,
})
mock.stderr = ""
mock.returncode = 0
return mock
def test_run_pipeline_saves_pid(conn):
"""run_pipeline() сохраняет os.getpid() в поле pid таблицы pipelines."""
from agents.runner import run_pipeline
steps = [{"role": "tester", "model": "haiku", "brief": "test brief"}]
with patch("agents.runner.subprocess.run") as mock_run, \
patch("agents.runner.check_claude_auth"):
mock_run.return_value = _make_mock_claude_success()
run_pipeline(conn, "VDOL-001", steps)
# Проверяем, что хоть один pipeline записан с pid = os.getpid()
row = conn.execute(
"SELECT pid FROM pipelines WHERE task_id='VDOL-001' ORDER BY id DESC LIMIT 1"
).fetchone()
assert row is not None
assert row["pid"] == os.getpid()
# ---------------------------------------------------------------------------
# Parent process check per pipeline step
# ---------------------------------------------------------------------------
def test_run_pipeline_aborts_when_parent_dies(conn):
"""Если parent process мёртв — pipeline завершается с failed, task blocked."""
import errno as _errno
from agents.runner import run_pipeline
steps = [{"role": "tester", "model": "haiku", "brief": "test brief"}]
dead_ppid = 2**31 - 1 # guaranteed non-existent
def _fake_kill(pid, sig):
if pid == dead_ppid and sig == 0:
# Must carry errno=ESRCH so _check_parent_alive recognises it
raise OSError(_errno.ESRCH, os.strerror(_errno.ESRCH))
# Other PIDs (e.g. our own) — allow silently
return None
with patch("agents.runner.os.getppid", return_value=dead_ppid), \
patch("agents.runner.os.kill", side_effect=_fake_kill), \
patch("agents.runner.check_claude_auth"), \
patch("agents.runner.subprocess.run") as mock_run:
mock_run.return_value = _make_mock_claude_success()
result = run_pipeline(conn, "VDOL-001", steps)
assert result["success"] is False
assert result.get("error") == "parent_process_died"
# task должна быть blocked с упоминанием dead_ppid
task = models.get_task(conn, "VDOL-001")
assert task["status"] == "blocked"
assert str(dead_ppid) in (task.get("blocked_reason") or "")
# pipeline должен быть failed
row = conn.execute(
"SELECT status FROM pipelines WHERE task_id='VDOL-001' ORDER BY id DESC LIMIT 1"
).fetchone()
assert row["status"] == "failed"