78 lines
2.7 KiB
Python
78 lines
2.7 KiB
Python
|
|
"""
|
||
|
|
Kin pipeline watchdog — background thread that detects dead subprocesses.
|
||
|
|
|
||
|
|
Every `interval` seconds: check all running pipelines with a known PID.
|
||
|
|
If the PID is dead (ProcessLookupError / ESRCH), mark pipeline as failed
|
||
|
|
and task as blocked with a descriptive reason.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
import sqlite3
|
||
|
|
import threading
|
||
|
|
import time
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
from core import models
|
||
|
|
from core.db import init_db
|
||
|
|
|
||
|
|
_logger = logging.getLogger("kin.watchdog")
|
||
|
|
|
||
|
|
|
||
|
|
def _check_dead_pipelines(db_path: Path) -> None:
|
||
|
|
"""Single watchdog pass: open a fresh connection, scan running pipelines."""
|
||
|
|
try:
|
||
|
|
conn = sqlite3.connect(str(db_path), check_same_thread=False)
|
||
|
|
conn.row_factory = sqlite3.Row
|
||
|
|
try:
|
||
|
|
running = models.get_running_pipelines_with_pid(conn)
|
||
|
|
except Exception as exc:
|
||
|
|
# Table may not exist yet on very first startup
|
||
|
|
_logger.debug("Watchdog: could not query pipelines (%s)", exc)
|
||
|
|
conn.close()
|
||
|
|
return
|
||
|
|
|
||
|
|
for row in running:
|
||
|
|
pid = row["pid"]
|
||
|
|
pipeline_id = row["id"]
|
||
|
|
task_id = row["task_id"]
|
||
|
|
try:
|
||
|
|
os.kill(pid, 0) # signal 0 = existence check
|
||
|
|
except ProcessLookupError:
|
||
|
|
reason = f"Process died unexpectedly (PID {pid})"
|
||
|
|
_logger.warning(
|
||
|
|
"Watchdog: pipeline %s PID %s is dead — marking blocked (%s)",
|
||
|
|
pipeline_id, pid, task_id,
|
||
|
|
)
|
||
|
|
try:
|
||
|
|
models.update_pipeline(conn, pipeline_id, status="failed")
|
||
|
|
models.update_task(conn, task_id, status="blocked", blocked_reason=reason)
|
||
|
|
except Exception as upd_exc:
|
||
|
|
_logger.error("Watchdog: failed to update pipeline/task: %s", upd_exc)
|
||
|
|
except PermissionError:
|
||
|
|
# Process exists but we can't signal it (e.g. different user) — skip
|
||
|
|
pass
|
||
|
|
conn.close()
|
||
|
|
except Exception as exc:
|
||
|
|
_logger.error("Watchdog pass failed: %s", exc)
|
||
|
|
|
||
|
|
|
||
|
|
def _watchdog_loop(db_path: Path, interval: int) -> None:
|
||
|
|
"""Daemon thread body: sleep then check, forever."""
|
||
|
|
_logger.info("Watchdog started (interval=%ds, db=%s)", interval, db_path)
|
||
|
|
while True:
|
||
|
|
time.sleep(interval)
|
||
|
|
_check_dead_pipelines(db_path)
|
||
|
|
|
||
|
|
|
||
|
|
def start_watchdog(db_path: Path, interval: int = 30) -> None:
|
||
|
|
"""Start the background watchdog thread (daemon=True, so it dies with the process)."""
|
||
|
|
t = threading.Thread(
|
||
|
|
target=_watchdog_loop,
|
||
|
|
args=(db_path, interval),
|
||
|
|
daemon=True,
|
||
|
|
name="kin-watchdog",
|
||
|
|
)
|
||
|
|
t.start()
|
||
|
|
_logger.info("Watchdog thread launched (pid=%d)", os.getpid())
|