""" Kin pipeline watchdog — background thread that detects dead subprocesses. Every `interval` seconds: check all running pipelines with a known PID. If the PID is dead (ProcessLookupError / ESRCH), mark pipeline as failed and task as blocked with a descriptive reason. """ import logging import os import sqlite3 import threading import time from pathlib import Path from core import models from core.db import init_db _logger = logging.getLogger("kin.watchdog") def _check_dead_pipelines(db_path: Path) -> None: """Single watchdog pass: open a fresh connection, scan running pipelines.""" try: conn = sqlite3.connect(str(db_path), check_same_thread=False) conn.row_factory = sqlite3.Row try: running = models.get_running_pipelines_with_pid(conn) except Exception as exc: # Table may not exist yet on very first startup _logger.debug("Watchdog: could not query pipelines (%s)", exc) conn.close() return for row in running: pid = row["pid"] pipeline_id = row["id"] task_id = row["task_id"] try: os.kill(pid, 0) # signal 0 = existence check except ProcessLookupError: reason = f"Process died unexpectedly (PID {pid})" _logger.warning( "Watchdog: pipeline %s PID %s is dead — marking blocked (%s)", pipeline_id, pid, task_id, ) try: models.update_pipeline(conn, pipeline_id, status="failed") models.update_task(conn, task_id, status="blocked", blocked_reason=reason) except Exception as upd_exc: _logger.error("Watchdog: failed to update pipeline/task: %s", upd_exc) except PermissionError: # Process exists but we can't signal it (e.g. different user) — skip pass conn.close() except Exception as exc: _logger.error("Watchdog pass failed: %s", exc) def _watchdog_loop(db_path: Path, interval: int) -> None: """Daemon thread body: sleep then check, forever.""" _logger.info("Watchdog started (interval=%ds, db=%s)", interval, db_path) while True: time.sleep(interval) _check_dead_pipelines(db_path) def start_watchdog(db_path: Path, interval: int = 30) -> None: """Start the background watchdog thread (daemon=True, so it dies with the process).""" t = threading.Thread( target=_watchdog_loop, args=(db_path, interval), daemon=True, name="kin-watchdog", ) t.start() _logger.info("Watchdog thread launched (pid=%d)", os.getpid())