""" Kin pipeline watchdog — background thread that detects dead subprocesses. Every `interval` seconds: check all running pipelines with a known PID. If the PID is dead (ProcessLookupError / ESRCH), mark pipeline as failed and task as blocked with a descriptive reason. """ import errno import logging import os import threading import time from pathlib import Path from core import models from core.db import get_connection, init_db _logger = logging.getLogger("kin.watchdog") _watchdog_started = False def _check_dead_pipelines(db_path: Path) -> None: """Single watchdog pass: open a fresh connection, scan running pipelines.""" conn = get_connection(db_path) try: try: running = models.get_running_pipelines_with_pid(conn) except Exception as exc: # Table may not exist yet on very first startup _logger.debug("Watchdog: could not query pipelines (%s)", exc) return for row in running: pid = row["pid"] pipeline_id = row["id"] task_id = row["task_id"] try: os.kill(pid, 0) # signal 0 = existence check except OSError as e: if e.errno == errno.ESRCH: reason = f"Process died unexpectedly (PID {pid})" _logger.warning( "Watchdog: pipeline %s PID %s is dead — marking blocked (%s)", pipeline_id, pid, task_id, ) try: models.update_pipeline(conn, pipeline_id, status="failed") models.update_task(conn, task_id, status="blocked", blocked_reason=reason) except Exception as upd_exc: _logger.error("Watchdog: failed to update pipeline/task: %s", upd_exc) # else: PermissionError (EACCES) — process exists but we can't signal it, skip # Cleanup stale pipelines with no PID (zombie entries) try: stale = conn.execute( "SELECT id, task_id FROM pipelines WHERE status = 'running' AND pid IS NULL " "AND created_at < datetime('now', '-2 hours')" ).fetchall() for row in stale: _logger.warning( "Watchdog: stale pipeline %s (no PID) — marking failed (%s)", row["id"], row["task_id"], ) models.update_pipeline(conn, row["id"], status="failed") models.update_task( conn, row["task_id"], status="blocked", blocked_reason="Stale pipeline (no PID recorded)", ) except Exception as stale_exc: _logger.error("Watchdog: stale cleanup failed: %s", stale_exc) except Exception as exc: _logger.error("Watchdog pass failed: %s", exc) finally: conn.close() def _watchdog_loop(db_path: Path, interval: int) -> None: """Daemon thread body: sleep then check, forever.""" _logger.info("Watchdog started (interval=%ds, db=%s)", interval, db_path) while True: time.sleep(interval) _check_dead_pipelines(db_path) def start_watchdog(db_path: Path, interval: int = 30) -> None: """Start the background watchdog thread (daemon=True, so it dies with the process).""" global _watchdog_started if _watchdog_started: return _watchdog_started = True t = threading.Thread( target=_watchdog_loop, args=(db_path, interval), daemon=True, name="kin-watchdog", ) t.start() _logger.info("Watchdog thread launched (pid=%d)", os.getpid())