diff --git a/core/watchdog.py b/core/watchdog.py index bf406f2..660cc51 100644 --- a/core/watchdog.py +++ b/core/watchdog.py @@ -51,6 +51,25 @@ def _check_dead_pipelines(db_path: Path) -> None: except Exception as upd_exc: _logger.error("Watchdog: failed to update pipeline/task: %s", upd_exc) # else: PermissionError (EACCES) — process exists but we can't signal it, skip + + # Cleanup stale pipelines with no PID (zombie entries) + try: + stale = conn.execute( + "SELECT id, task_id FROM pipelines WHERE status = 'running' AND pid IS NULL " + "AND created_at < datetime('now', '-2 hours')" + ).fetchall() + for row in stale: + _logger.warning( + "Watchdog: stale pipeline %s (no PID) — marking failed (%s)", + row["id"], row["task_id"], + ) + models.update_pipeline(conn, row["id"], status="failed") + models.update_task( + conn, row["task_id"], status="blocked", + blocked_reason="Stale pipeline (no PID recorded)", + ) + except Exception as stale_exc: + _logger.error("Watchdog: stale cleanup failed: %s", stale_exc) except Exception as exc: _logger.error("Watchdog pass failed: %s", exc) finally: diff --git a/web/api.py b/web/api.py index 53c481b..3136a38 100644 --- a/web/api.py +++ b/web/api.py @@ -6,6 +6,7 @@ Run: uvicorn web.api:app --reload --port 8420 import glob as _glob import logging import mimetypes +import os import shutil import sqlite3 import subprocess @@ -107,16 +108,30 @@ def get_conn(): return init_db(DB_PATH) -def _launch_pipeline_subprocess(task_id: str) -> None: - """Spawn `cli.main run {task_id}` in a detached background subprocess. +_MAX_CONCURRENT_PIPELINES = 5 - Used by auto-trigger (label 'auto') and revise endpoint. - Never raises — subprocess errors are logged only. + +def _spawn_pipeline(task_id: str, extra_log: str = "") -> subprocess.Popen | None: + """Spawn pipeline subprocess with process isolation and concurrency limit. + + Uses start_new_session=True so pipeline survives API restarts. + Returns Popen or None on failure/limit. Never raises. """ - import os + conn = get_conn() + count = conn.execute( + "SELECT COUNT(*) FROM pipelines WHERE status = 'running'" + ).fetchone()[0] + conn.close() + if count >= _MAX_CONCURRENT_PIPELINES: + _logger.warning( + "Concurrency limit (%d/%d): skipping %s", + count, _MAX_CONCURRENT_PIPELINES, task_id, + ) + return None + kin_root = Path(__file__).parent.parent - cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), "run", task_id] - cmd.append("--allow-write") + cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), + "run", task_id, "--allow-write"] env = os.environ.copy() if 'SSH_AUTH_SOCK' not in env: _socks = _glob.glob('/private/tmp/com.apple.launchd.*/Listeners') @@ -125,16 +140,21 @@ def _launch_pipeline_subprocess(task_id: str) -> None: env["KIN_NONINTERACTIVE"] = "1" try: proc = subprocess.Popen( - cmd, - cwd=str(kin_root), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - env=env, + cmd, cwd=str(kin_root), + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, env=env, + start_new_session=True, ) - _logger.info("Auto-triggered pipeline for %s, pid=%d", task_id, proc.pid) + _logger.info("Pipeline spawned for %s%s, pid=%d", task_id, extra_log, proc.pid) + return proc except Exception as exc: - _logger.warning("Failed to launch pipeline for %s: %s", task_id, exc) + _logger.warning("Failed to spawn pipeline for %s: %s", task_id, exc) + return None + + +def _launch_pipeline_subprocess(task_id: str) -> None: + """Backward-compat wrapper for auto-trigger and revise.""" + _spawn_pipeline(task_id) # --------------------------------------------------------------------------- @@ -621,28 +641,9 @@ def start_project_phase(project_id: str): models.update_task(conn, task_id, status="in_progress") conn.close() - kin_root = Path(__file__).parent.parent - cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), - "run", task_id] - cmd.append("--allow-write") # always required: subprocess runs non-interactively (stdin=DEVNULL) - - import os - env = os.environ.copy() - env["KIN_NONINTERACTIVE"] = "1" - - try: - proc = subprocess.Popen( - cmd, - cwd=str(kin_root), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - env=env, - ) - _logger.info("Phase agent started for task %s (phase %d), pid=%d", - task_id, active_phase["id"], proc.pid) - except Exception as e: - raise HTTPException(500, f"Failed to start phase agent: {e}") + proc = _spawn_pipeline(task_id, extra_log=f" (phase {active_phase['id']})") + if proc is None: + raise HTTPException(429, "Concurrency limit reached — try again later") return JSONResponse( {"status": "started", "phase_id": active_phase["id"], "task_id": task_id}, @@ -1125,29 +1126,10 @@ def run_task(task_id: str): # Set task to in_progress immediately so UI updates models.update_task(conn, task_id, status="in_progress") conn.close() - # Launch kin run in background subprocess - kin_root = Path(__file__).parent.parent - cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), - "run", task_id] - cmd.append("--allow-write") # always required: subprocess runs non-interactively (stdin=DEVNULL) - import os - env = os.environ.copy() - env["KIN_NONINTERACTIVE"] = "1" - - try: - proc = subprocess.Popen( - cmd, - cwd=str(kin_root), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - env=env, - ) - import logging - logging.getLogger("kin").info(f"Pipeline started for {task_id}, pid={proc.pid}") - except Exception as e: - raise HTTPException(500, f"Failed to start pipeline: {e}") + proc = _spawn_pipeline(task_id) + if proc is None: + raise HTTPException(429, "Concurrency limit reached — try again later") return JSONResponse({"status": "started", "task_id": task_id}, status_code=202) @@ -1416,23 +1398,7 @@ def _trigger_sysadmin_scan(conn, project_id: str, env: dict) -> str: ) models.update_task(conn, task_id, status="in_progress") - kin_root = Path(__file__).parent.parent - cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), "run", task_id] - cmd.append("--allow-write") - import os as _os - env_vars = _os.environ.copy() - env_vars["KIN_NONINTERACTIVE"] = "1" - try: - subprocess.Popen( - cmd, - cwd=str(kin_root), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - env=env_vars, - ) - except Exception as e: - _logger.warning("Failed to start sysadmin scan for %s: %s", task_id, e) + _spawn_pipeline(task_id, extra_log=" (sysadmin scan)") return task_id @@ -1808,22 +1774,7 @@ def send_chat_message(project_id: str, body: ChatMessageIn): ) task = t - import os as _os - env_vars = _os.environ.copy() - env_vars["KIN_NONINTERACTIVE"] = "1" - kin_root = Path(__file__).parent.parent - try: - subprocess.Popen( - [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), - "run", task_id, "--allow-write"], - cwd=str(kin_root), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - env=env_vars, - ) - except Exception as e: - _logger.warning("Failed to start pipeline for chat task %s: %s", task_id, e) + _spawn_pipeline(task_id, extra_log=" (chat)") assistant_content = f"Создал задачу {task_id}: {title}" assistant_msg = models.add_chat_message( diff --git a/web/frontend/src/views/ProjectView.vue b/web/frontend/src/views/ProjectView.vue index 00c896b..09110c1 100644 --- a/web/frontend/src/views/ProjectView.vue +++ b/web/frontend/src/views/ProjectView.vue @@ -175,8 +175,8 @@ function phaseStatusColor(s: string) { } // Tab groups -const PRIMARY_TABS = ['tasks', 'kanban', 'phases', 'decisions', 'modules', 'links'] as const -const MORE_TABS = ['environments', 'settings'] as const +const PRIMARY_TABS = ['tasks', 'kanban', 'decisions'] as const +const MORE_TABS = ['phases', 'modules', 'environments', 'links', 'settings'] as const function tabLabel(tab: string): string { const labels: Record = { @@ -1193,16 +1193,7 @@ async function addDecision() { class="px-1.5 py-0.5 text-xs text-gray-600 hover:text-red-400 rounded">✕
- - - +
- - - - + +
+
+ +
+ + + + +
+