kin: auto-commit after pipeline
This commit is contained in:
parent
a9d3086139
commit
7ee520e18e
5 changed files with 597 additions and 26 deletions
|
|
@ -1,13 +1,15 @@
|
|||
"""Tests for pipeline watchdog (KIN-099)."""
|
||||
"""Tests for pipeline watchdog (KIN-099, KIN-OBS-015)."""
|
||||
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from core.db import init_db
|
||||
from core.db import init_db, get_connection
|
||||
from core import models
|
||||
from core.watchdog import _check_dead_pipelines
|
||||
from core.watchdog import _check_dead_pipelines, start_watchdog
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
@ -231,3 +233,127 @@ def test_run_pipeline_aborts_when_parent_dies(conn):
|
|||
"SELECT status FROM pipelines WHERE task_id='VDOL-001' ORDER BY id DESC LIMIT 1"
|
||||
).fetchone()
|
||||
assert row["status"] == "failed"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# KIN-OBS-015: start_watchdog() double-start guard
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_start_watchdog_sets_started_flag(tmp_path):
|
||||
"""start_watchdog() устанавливает _watchdog_started = True."""
|
||||
import core.watchdog as wd
|
||||
wd._watchdog_started = False
|
||||
|
||||
db_path = tmp_path / "kin_flag.db"
|
||||
init_db(db_path=str(db_path))
|
||||
|
||||
with patch("core.watchdog.threading.Thread") as mock_thread_cls:
|
||||
mock_thread = MagicMock()
|
||||
mock_thread_cls.return_value = mock_thread
|
||||
|
||||
start_watchdog(db_path, interval=9999)
|
||||
|
||||
assert wd._watchdog_started is True
|
||||
|
||||
wd._watchdog_started = False # restore
|
||||
|
||||
|
||||
def test_start_watchdog_double_call_no_second_thread(tmp_path):
|
||||
"""Повторный вызов start_watchdog() не создаёт второй поток."""
|
||||
import core.watchdog as wd
|
||||
wd._watchdog_started = False
|
||||
|
||||
db_path = tmp_path / "kin_double.db"
|
||||
init_db(db_path=str(db_path))
|
||||
|
||||
with patch("core.watchdog.threading.Thread") as mock_thread_cls:
|
||||
mock_thread = MagicMock()
|
||||
mock_thread_cls.return_value = mock_thread
|
||||
|
||||
start_watchdog(db_path, interval=9999) # first call
|
||||
start_watchdog(db_path, interval=9999) # second call — guard должен заблокировать
|
||||
|
||||
# Thread создан и запущен ровно один раз
|
||||
assert mock_thread_cls.call_count == 1
|
||||
assert mock_thread.start.call_count == 1
|
||||
|
||||
wd._watchdog_started = False # restore
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# KIN-OBS-015: _check_dead_pipelines использует get_connection()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_check_dead_pipelines_calls_get_connection(tmp_path):
|
||||
"""_check_dead_pipelines вызывает get_connection() — WAL и foreign_keys PRAGMA применяются."""
|
||||
db_path = tmp_path / "kin_gc.db"
|
||||
real_conn = init_db(db_path=str(db_path))
|
||||
real_conn.close()
|
||||
|
||||
with patch("core.watchdog.get_connection", wraps=get_connection) as mock_gc:
|
||||
_check_dead_pipelines(db_path)
|
||||
mock_gc.assert_called_once_with(db_path)
|
||||
|
||||
|
||||
def test_get_connection_applies_wal_and_foreign_keys(tmp_path):
|
||||
"""get_connection() применяет PRAGMA journal_mode=WAL и foreign_keys=ON."""
|
||||
db_path = tmp_path / "kin_pragma.db"
|
||||
conn = get_connection(db_path)
|
||||
|
||||
jm = conn.execute("PRAGMA journal_mode").fetchone()[0]
|
||||
fk = conn.execute("PRAGMA foreign_keys").fetchone()[0]
|
||||
conn.close()
|
||||
|
||||
assert jm == "wal"
|
||||
assert fk == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# KIN-OBS-015: OSError+errno.ESRCH и другие errno — mock-based
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_check_dead_pipelines_esrch_via_mock(tmp_path):
|
||||
"""OSError с errno=ESRCH перехватывается как мёртвый процесс и блокирует задачу."""
|
||||
fake_pid = 54321
|
||||
db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, fake_pid)
|
||||
|
||||
def _fake_kill(pid, sig):
|
||||
if pid == fake_pid and sig == 0:
|
||||
raise OSError(errno.ESRCH, "No such process")
|
||||
|
||||
with patch("core.watchdog.os.kill", side_effect=_fake_kill):
|
||||
_check_dead_pipelines(db_path)
|
||||
|
||||
conn = init_db(db_path=str(db_path))
|
||||
task = models.get_task(conn, task_id)
|
||||
pipeline_row = conn.execute(
|
||||
"SELECT status FROM pipelines WHERE id=?", (pipeline_id,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
|
||||
assert task["status"] == "blocked"
|
||||
assert str(fake_pid) in (task.get("blocked_reason") or "")
|
||||
assert pipeline_row["status"] == "failed"
|
||||
|
||||
|
||||
def test_check_dead_pipelines_permission_error_ignored(tmp_path):
|
||||
"""OSError с errno=EACCES (процесс жив, нет прав) — статус задачи не меняется."""
|
||||
fake_pid = 54322
|
||||
db_path, task_id, pipeline_id = _db_with_running_pipeline(tmp_path, fake_pid)
|
||||
|
||||
def _fake_kill(pid, sig):
|
||||
if pid == fake_pid and sig == 0:
|
||||
raise OSError(errno.EACCES, "Operation not permitted")
|
||||
|
||||
with patch("core.watchdog.os.kill", side_effect=_fake_kill):
|
||||
_check_dead_pipelines(db_path)
|
||||
|
||||
conn = init_db(db_path=str(db_path))
|
||||
task = models.get_task(conn, task_id)
|
||||
pipeline_row = conn.execute(
|
||||
"SELECT status FROM pipelines WHERE id=?", (pipeline_id,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
|
||||
assert task["status"] != "blocked"
|
||||
assert pipeline_row["status"] == "running"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue