diff --git a/DESIGN.md b/DESIGN.md index 3e60a99..8d7aff2 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -1280,39 +1280,6 @@ Real-time через SSE (Server-Sent Events) — runner пишет лог → A --- -## ЧАСТЬ 5: Критические ограничения реализации - -### 5.1 Запрет блокирующего time.sleep в pipeline execution thread - -**Корень бага KIN-145.** Pipeline агентов выполняется в отдельном потоке (ThreadPoolExecutor или threading.Thread). Этот поток является единственным для всего пайплайна задачи. Любой блокирующий вызов внутри него блокирует весь runner-процесс на время сна. - -**Механизм каскадного падения:** -1. `runner.py` вызывает `merge_worktree(worktree_path, p_path, max_retries=3, retry_delay_s=15)` (старый баг) -2. При git-конфликте `merge_worktree` вызывает `time.sleep(15)` трижды → 45 секунд блокировки -3. Родительский web-сервер (FastAPI) теряет ответ от воркера -4. `_check_parent_alive()` во всех других пайплайнах видит `ESRCH` (процесс недоступен) или таймаут -5. Все in-flight пайплайны помечаются как `failed` → массовое падение всех задач во всех проектах - -**Почему retry для git-конфликтов бессмысленен:** -Git merge conflict — детерминированная ошибка. Конфликт возник потому что два ветки расходятся в одном месте файла. Повторная попытка через 15 секунд не изменит содержимое файлов и снова завершится конфликтом. Retry тут не помогает, только блокирует. - -**Допустимые альтернативы в зависимости от контекста:** -- `async`-задержка (`asyncio.sleep`) — если код работает в async-окружении -- отдельный поток (`threading.Thread`) — если нужен polling или retry с задержкой -- отказ от retry — для детерминированных ошибок (git merge conflict, validation error) -- `Event.wait(timeout)` — если нужно ждать внешнего события без CPU spin - -**Правило:** -> `time.sleep()` в `agents/` ЗАПРЕЩЁН без исключений. -> В `core/` разрешён только в файлах с явным daemon-thread или retry-контекстом: -> — `core/watchdog.py` (daemon-поток, отдельный от pipeline) -> — `core/worktree.py` (retry-delay, вызывается только с явными kwargs, не из runner.py) - -**Static regression guard:** `tests/test_kin_fix_025_regression.py` сканирует `agents/` и `core/` -на наличие `time.sleep` — любой новый вызов вне allowlist ломает тест. - ---- - ## Заметки **Архитектура:** Изоляция контекста через процессы. Decisions = внешняя память PM. PM тупой/памятливый, workers умные/забывчивые. context-builder фильтрует по роли. ~22 роли = полная софтверная компания. diff --git a/core/watchdog.py b/core/watchdog.py index 660cc51..bf406f2 100644 --- a/core/watchdog.py +++ b/core/watchdog.py @@ -51,25 +51,6 @@ def _check_dead_pipelines(db_path: Path) -> None: except Exception as upd_exc: _logger.error("Watchdog: failed to update pipeline/task: %s", upd_exc) # else: PermissionError (EACCES) — process exists but we can't signal it, skip - - # Cleanup stale pipelines with no PID (zombie entries) - try: - stale = conn.execute( - "SELECT id, task_id FROM pipelines WHERE status = 'running' AND pid IS NULL " - "AND created_at < datetime('now', '-2 hours')" - ).fetchall() - for row in stale: - _logger.warning( - "Watchdog: stale pipeline %s (no PID) — marking failed (%s)", - row["id"], row["task_id"], - ) - models.update_pipeline(conn, row["id"], status="failed") - models.update_task( - conn, row["task_id"], status="blocked", - blocked_reason="Stale pipeline (no PID recorded)", - ) - except Exception as stale_exc: - _logger.error("Watchdog: stale cleanup failed: %s", stale_exc) except Exception as exc: _logger.error("Watchdog pass failed: %s", exc) finally: diff --git a/tests/test_kin_fix_025_regression.py b/tests/test_kin_fix_025_regression.py deleted file mode 100644 index c5f8791..0000000 --- a/tests/test_kin_fix_025_regression.py +++ /dev/null @@ -1,143 +0,0 @@ -"""Regression tests for KIN-FIX-025 — запрет time.sleep в pipeline execution thread. - -Корень бага KIN-145: blocking sleep в pipeline-треде останавливает весь runner-процесс. -При краше родителя через _check_parent_alive роняет все in-flight pipeline. - -Правило: - - agents/ — time.sleep ЗАПРЕЩЁН без исключений - - core/ — time.sleep разрешён только в явно задокументированных файлах (ALLOWLIST) - -ALLOWLIST (core-файлы с законным time.sleep): - - core/watchdog.py — daemon-поток, работает независимо от pipeline, sleep-цикл штатный - - core/worktree.py — retry-delay, вызывается только с explicit max_retries kwargs, - runner.py НЕ передаёт эти kwargs (guard: test_KIN-145_regression.py) - -Тест сканирует все .py файлы в core/ и agents/, исключая test_*.py и *.pyc. -Падает при нахождении time.sleep вне allowlist, выводя путь и строку нарушения. -""" - -import re -from pathlib import Path - -import pytest - - -# ───────────────────────────────────────────────────────────────────────────── -# Конфигурация -# ───────────────────────────────────────────────────────────────────────────── - -PROJECT_ROOT = Path(__file__).parent.parent - -# Файлы core/, в которых time.sleep допустим (с задокументированной причиной) -CORE_SLEEP_ALLOWLIST: set[str] = { - "core/watchdog.py", # daemon-поток: sleep-цикл мониторинга, не pipeline - "core/worktree.py", # retry-delay: вызывается с explicit kwargs, НЕ из runner.py -} - -SLEEP_PATTERN = re.compile(r"\btime\.sleep\s*\(") - - -def _find_sleep_violations(directory: Path, allowlist: set[str]) -> list[str]: - """Ищет time.sleep в .py файлах директории, возвращает список нарушений.""" - violations = [] - for py_file in sorted(directory.rglob("*.py")): - # Исключаем тест-файлы - if py_file.name.startswith("test_"): - continue - # Нормализуем путь относительно проекта для сравнения с allowlist - rel_path = py_file.relative_to(PROJECT_ROOT) - rel_str = str(rel_path).replace("\\", "/") - if rel_str in allowlist: - continue - # Сканируем строки файла - try: - lines = py_file.read_text(encoding="utf-8").splitlines() - except (OSError, UnicodeDecodeError): - continue - for lineno, line in enumerate(lines, start=1): - if SLEEP_PATTERN.search(line): - violations.append(f"{rel_str}:{lineno}: {line.strip()}") - return violations - - -# ───────────────────────────────────────────────────────────────────────────── -# Тест 1: agents/ — time.sleep запрещён полностью -# ───────────────────────────────────────────────────────────────────────────── - -class TestNoSleepInAgents: - def test_no_time_sleep_in_agents(self): - """KIN-FIX-025: agents/ не должен содержать time.sleep. - - Blocking sleep в pipeline-треде → каскадное падение через _check_parent_alive. - Допустимые альтернативы: asyncio.sleep, threading.Event.wait, отдельный поток. - """ - agents_dir = PROJECT_ROOT / "agents" - violations = _find_sleep_violations(agents_dir, allowlist=set()) - - assert not violations, ( - "НАРУШЕНИЕ KIN-FIX-025: time.sleep найден в agents/ (pipeline thread).\n" - "Blocking sleep в pipeline-треде → каскадное падение всех in-flight задач.\n" - "Используй asyncio.sleep / threading.Event.wait / отдельный поток.\n" - "Нарушения:\n" + "\n".join(f" {v}" for v in violations) - ) - - -# ───────────────────────────────────────────────────────────────────────────── -# Тест 2: core/ — time.sleep запрещён вне allowlist -# ───────────────────────────────────────────────────────────────────────────── - -class TestNoSleepInCoreOutsideAllowlist: - def test_no_time_sleep_in_core_outside_allowlist(self): - """KIN-FIX-025: core/ не должен содержать time.sleep вне CORE_SLEEP_ALLOWLIST. - - Разрешённые исключения (CORE_SLEEP_ALLOWLIST): - - core/watchdog.py — daemon-поток, не pipeline - - core/worktree.py — retry-delay с explicit kwargs - - Любой новый time.sleep в core/ требует явного добавления в allowlist - с документированным обоснованием. - """ - core_dir = PROJECT_ROOT / "core" - violations = _find_sleep_violations(core_dir, allowlist=CORE_SLEEP_ALLOWLIST) - - assert not violations, ( - "НАРУШЕНИЕ KIN-FIX-025: time.sleep найден в core/ вне CORE_SLEEP_ALLOWLIST.\n" - "Если sleep необходим — добавь файл в CORE_SLEEP_ALLOWLIST в этом тест-файле\n" - "с задокументированным обоснованием (daemon-поток или retry с explicit kwargs).\n" - "Нарушения:\n" + "\n".join(f" {v}" for v in violations) - ) - - def test_allowlisted_files_still_exist(self): - """KIN-FIX-025: файлы из CORE_SLEEP_ALLOWLIST должны существовать. - - Если файл удалён — убери его из allowlist, чтобы allowlist не гнил. - """ - missing = [] - for rel_path in CORE_SLEEP_ALLOWLIST: - full_path = PROJECT_ROOT / rel_path - if not full_path.exists(): - missing.append(rel_path) - - assert not missing, ( - "Файлы из CORE_SLEEP_ALLOWLIST не найдены — удали их из allowlist:\n" - + "\n".join(f" {p}" for p in missing) - ) - - def test_allowlisted_files_actually_contain_sleep(self): - """KIN-FIX-025: файлы из CORE_SLEEP_ALLOWLIST должны реально содержать time.sleep. - - Если sleep убрали — убери файл из allowlist, чтобы allowlist не стал dead weight. - """ - stale = [] - for rel_path in CORE_SLEEP_ALLOWLIST: - full_path = PROJECT_ROOT / rel_path - if not full_path.exists(): - continue # покрывается тестом выше - content = full_path.read_text(encoding="utf-8") - if not SLEEP_PATTERN.search(content): - stale.append(rel_path) - - assert not stale, ( - "Файлы из CORE_SLEEP_ALLOWLIST не содержат time.sleep — удали их из allowlist:\n" - + "\n".join(f" {p}" for p in stale) - ) diff --git a/web/api.py b/web/api.py index 3136a38..53c481b 100644 --- a/web/api.py +++ b/web/api.py @@ -6,7 +6,6 @@ Run: uvicorn web.api:app --reload --port 8420 import glob as _glob import logging import mimetypes -import os import shutil import sqlite3 import subprocess @@ -108,30 +107,16 @@ def get_conn(): return init_db(DB_PATH) -_MAX_CONCURRENT_PIPELINES = 5 +def _launch_pipeline_subprocess(task_id: str) -> None: + """Spawn `cli.main run {task_id}` in a detached background subprocess. - -def _spawn_pipeline(task_id: str, extra_log: str = "") -> subprocess.Popen | None: - """Spawn pipeline subprocess with process isolation and concurrency limit. - - Uses start_new_session=True so pipeline survives API restarts. - Returns Popen or None on failure/limit. Never raises. + Used by auto-trigger (label 'auto') and revise endpoint. + Never raises — subprocess errors are logged only. """ - conn = get_conn() - count = conn.execute( - "SELECT COUNT(*) FROM pipelines WHERE status = 'running'" - ).fetchone()[0] - conn.close() - if count >= _MAX_CONCURRENT_PIPELINES: - _logger.warning( - "Concurrency limit (%d/%d): skipping %s", - count, _MAX_CONCURRENT_PIPELINES, task_id, - ) - return None - + import os kin_root = Path(__file__).parent.parent - cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), - "run", task_id, "--allow-write"] + cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), "run", task_id] + cmd.append("--allow-write") env = os.environ.copy() if 'SSH_AUTH_SOCK' not in env: _socks = _glob.glob('/private/tmp/com.apple.launchd.*/Listeners') @@ -140,21 +125,16 @@ def _spawn_pipeline(task_id: str, extra_log: str = "") -> subprocess.Popen | Non env["KIN_NONINTERACTIVE"] = "1" try: proc = subprocess.Popen( - cmd, cwd=str(kin_root), - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, env=env, - start_new_session=True, + cmd, + cwd=str(kin_root), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + env=env, ) - _logger.info("Pipeline spawned for %s%s, pid=%d", task_id, extra_log, proc.pid) - return proc + _logger.info("Auto-triggered pipeline for %s, pid=%d", task_id, proc.pid) except Exception as exc: - _logger.warning("Failed to spawn pipeline for %s: %s", task_id, exc) - return None - - -def _launch_pipeline_subprocess(task_id: str) -> None: - """Backward-compat wrapper for auto-trigger and revise.""" - _spawn_pipeline(task_id) + _logger.warning("Failed to launch pipeline for %s: %s", task_id, exc) # --------------------------------------------------------------------------- @@ -641,9 +621,28 @@ def start_project_phase(project_id: str): models.update_task(conn, task_id, status="in_progress") conn.close() - proc = _spawn_pipeline(task_id, extra_log=f" (phase {active_phase['id']})") - if proc is None: - raise HTTPException(429, "Concurrency limit reached — try again later") + kin_root = Path(__file__).parent.parent + cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), + "run", task_id] + cmd.append("--allow-write") # always required: subprocess runs non-interactively (stdin=DEVNULL) + + import os + env = os.environ.copy() + env["KIN_NONINTERACTIVE"] = "1" + + try: + proc = subprocess.Popen( + cmd, + cwd=str(kin_root), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + env=env, + ) + _logger.info("Phase agent started for task %s (phase %d), pid=%d", + task_id, active_phase["id"], proc.pid) + except Exception as e: + raise HTTPException(500, f"Failed to start phase agent: {e}") return JSONResponse( {"status": "started", "phase_id": active_phase["id"], "task_id": task_id}, @@ -1126,10 +1125,29 @@ def run_task(task_id: str): # Set task to in_progress immediately so UI updates models.update_task(conn, task_id, status="in_progress") conn.close() + # Launch kin run in background subprocess + kin_root = Path(__file__).parent.parent + cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), + "run", task_id] + cmd.append("--allow-write") # always required: subprocess runs non-interactively (stdin=DEVNULL) - proc = _spawn_pipeline(task_id) - if proc is None: - raise HTTPException(429, "Concurrency limit reached — try again later") + import os + env = os.environ.copy() + env["KIN_NONINTERACTIVE"] = "1" + + try: + proc = subprocess.Popen( + cmd, + cwd=str(kin_root), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + env=env, + ) + import logging + logging.getLogger("kin").info(f"Pipeline started for {task_id}, pid={proc.pid}") + except Exception as e: + raise HTTPException(500, f"Failed to start pipeline: {e}") return JSONResponse({"status": "started", "task_id": task_id}, status_code=202) @@ -1398,7 +1416,23 @@ def _trigger_sysadmin_scan(conn, project_id: str, env: dict) -> str: ) models.update_task(conn, task_id, status="in_progress") - _spawn_pipeline(task_id, extra_log=" (sysadmin scan)") + kin_root = Path(__file__).parent.parent + cmd = [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), "run", task_id] + cmd.append("--allow-write") + import os as _os + env_vars = _os.environ.copy() + env_vars["KIN_NONINTERACTIVE"] = "1" + try: + subprocess.Popen( + cmd, + cwd=str(kin_root), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + env=env_vars, + ) + except Exception as e: + _logger.warning("Failed to start sysadmin scan for %s: %s", task_id, e) return task_id @@ -1774,7 +1808,22 @@ def send_chat_message(project_id: str, body: ChatMessageIn): ) task = t - _spawn_pipeline(task_id, extra_log=" (chat)") + import os as _os + env_vars = _os.environ.copy() + env_vars["KIN_NONINTERACTIVE"] = "1" + kin_root = Path(__file__).parent.parent + try: + subprocess.Popen( + [sys.executable, "-m", "cli.main", "--db", str(DB_PATH), + "run", task_id, "--allow-write"], + cwd=str(kin_root), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + env=env_vars, + ) + except Exception as e: + _logger.warning("Failed to start pipeline for chat task %s: %s", task_id, e) assistant_content = f"Создал задачу {task_id}: {title}" assistant_msg = models.add_chat_message( diff --git a/web/frontend/src/views/ProjectView.vue b/web/frontend/src/views/ProjectView.vue index 09110c1..00c896b 100644 --- a/web/frontend/src/views/ProjectView.vue +++ b/web/frontend/src/views/ProjectView.vue @@ -175,8 +175,8 @@ function phaseStatusColor(s: string) { } // Tab groups -const PRIMARY_TABS = ['tasks', 'kanban', 'decisions'] as const -const MORE_TABS = ['phases', 'modules', 'environments', 'links', 'settings'] as const +const PRIMARY_TABS = ['tasks', 'kanban', 'phases', 'decisions', 'modules', 'links'] as const +const MORE_TABS = ['environments', 'settings'] as const function tabLabel(tab: string): string { const labels: Record = { @@ -1193,7 +1193,16 @@ async function addDecision() { class="px-1.5 py-0.5 text-xs text-gray-600 hover:text-red-400 rounded">✕
- + + +
- -
-
- -
- - - - -
-
+ + + +