diff --git a/agents/runner.py b/agents/runner.py index 471b683..0db3819 100644 --- a/agents/runner.py +++ b/agents/runner.py @@ -45,12 +45,6 @@ def _build_claude_env() -> dict: Also resolves ~/.nvm/versions/node/*/bin globs that launchctl may not expand. """ env = os.environ.copy() - - # Propagate canonical DB path to agent subprocesses so they don't fall back - # to module-relative path (which breaks in worktrees — KIN-129). - if "KIN_DB_PATH" not in env: - env["KIN_DB_PATH"] = str(Path.home() / ".kin" / "kin.db") - existing = env.get("PATH", "").split(":") extra = list(_EXTRA_PATH_DIRS) diff --git a/cli/main.py b/cli/main.py index 6b29e65..d00c9e7 100644 --- a/cli/main.py +++ b/cli/main.py @@ -616,98 +616,83 @@ def run_task(ctx, task_id, dry_run, allow_write): is_noninteractive = os.environ.get("KIN_NONINTERACTIVE") == "1" click.echo(f"Task: {task['id']} — {task['title']}") - # Step 1a: Check for pre-built steps from revise endpoint (e.g. with analyst injection). - # Decision #866: steps built in web/api.py are saved as pending_steps and consumed here. - pending_steps = task.get("pending_steps") - if pending_steps: - pipeline_steps = pending_steps - models.update_task(conn, task_id, pending_steps=None) - click.echo(f"Using pre-built pipeline ({len(pipeline_steps)} steps, skipping PM)...") - pm_result = None - pm_started_at = pm_ended_at = None - if is_noninteractive: - click.echo("\n[non-interactive] Auto-executing pipeline...") - elif not click.confirm("\nExecute pipeline?"): - click.echo("Aborted.") - return - else: - # Step 1b: PM decomposes - click.echo("Running PM to decompose task...") - pm_started_at = datetime.now(timezone.utc).isoformat() - pm_result = run_agent( - conn, "pm", task_id, project_id, - model="sonnet", dry_run=dry_run, - allow_write=allow_write, noninteractive=is_noninteractive, + # Step 1: PM decomposes + click.echo("Running PM to decompose task...") + pm_started_at = datetime.now(timezone.utc).isoformat() + pm_result = run_agent( + conn, "pm", task_id, project_id, + model="sonnet", dry_run=dry_run, + allow_write=allow_write, noninteractive=is_noninteractive, + ) + pm_ended_at = datetime.now(timezone.utc).isoformat() + + if dry_run: + click.echo("\n--- PM Prompt (dry-run) ---") + click.echo(pm_result.get("prompt", "")[:2000]) + click.echo("\n(Dry-run: PM would produce a pipeline JSON)") + return + + if not pm_result["success"]: + click.echo(f"PM failed: {pm_result.get('output', 'unknown error')}", err=True) + raise SystemExit(1) + + # Parse PM output for pipeline + output = pm_result.get("output") + if isinstance(output, str): + try: + output = json.loads(output) + except json.JSONDecodeError: + click.echo(f"PM returned non-JSON output:\n{output[:500]}", err=True) + raise SystemExit(1) + + if not isinstance(output, dict) or "pipeline" not in output: + click.echo(f"PM output missing 'pipeline' key:\n{json.dumps(output, indent=2)[:500]}", err=True) + raise SystemExit(1) + + pipeline_steps = output["pipeline"] + if not isinstance(pipeline_steps, list) or not pipeline_steps: + click.echo( + f"PM returned empty or invalid pipeline: {pipeline_steps!r}", err=True ) - pm_ended_at = datetime.now(timezone.utc).isoformat() + raise SystemExit(1) + analysis = output.get("analysis", "") - if dry_run: - click.echo("\n--- PM Prompt (dry-run) ---") - click.echo(pm_result.get("prompt", "")[:2000]) - click.echo("\n(Dry-run: PM would produce a pipeline JSON)") - return + # Save completion_mode from PM output to task (only if neither task nor project has explicit mode) + task_current = models.get_task(conn, task_id) + update_fields = {} + project = models.get_project(conn, project_id) + project_mode = project.get("execution_mode") if project else None + if not task_current.get("execution_mode") and not project_mode: + pm_completion_mode = models.validate_completion_mode( + output.get("completion_mode", "review") + ) + update_fields["execution_mode"] = pm_completion_mode + import logging + logging.getLogger("kin").info( + "PM set completion_mode=%s for task %s", pm_completion_mode, task_id + ) - if not pm_result["success"]: - click.echo(f"PM failed: {pm_result.get('output', 'unknown error')}", err=True) - raise SystemExit(1) + # Save category from PM output (only if task has no category yet) + if not task_current.get("category"): + pm_category = output.get("category") + if pm_category and isinstance(pm_category, str): + pm_category = pm_category.upper() + if pm_category in models.TASK_CATEGORIES: + update_fields["category"] = pm_category - # Parse PM output for pipeline - output = pm_result.get("output") - if isinstance(output, str): - try: - output = json.loads(output) - except json.JSONDecodeError: - click.echo(f"PM returned non-JSON output:\n{output[:500]}", err=True) - raise SystemExit(1) + if update_fields: + models.update_task(conn, task_id, **update_fields) - if not isinstance(output, dict) or "pipeline" not in output: - click.echo(f"PM output missing 'pipeline' key:\n{json.dumps(output, indent=2)[:500]}", err=True) - raise SystemExit(1) + click.echo(f"\nAnalysis: {analysis}") + click.echo(f"Pipeline ({len(pipeline_steps)} steps):") + for i, step in enumerate(pipeline_steps, 1): + click.echo(f" {i}. {step['role']} ({step.get('model', 'sonnet')}): {step.get('brief', '')}") - pipeline_steps = output["pipeline"] - if not isinstance(pipeline_steps, list) or not pipeline_steps: - click.echo( - f"PM returned empty or invalid pipeline: {pipeline_steps!r}", err=True - ) - raise SystemExit(1) - analysis = output.get("analysis", "") - - # Save completion_mode from PM output to task (only if neither task nor project has explicit mode) - task_current = models.get_task(conn, task_id) - update_fields = {} - project = models.get_project(conn, project_id) - project_mode = project.get("execution_mode") if project else None - if not task_current.get("execution_mode") and not project_mode: - pm_completion_mode = models.validate_completion_mode( - output.get("completion_mode", "review") - ) - update_fields["execution_mode"] = pm_completion_mode - import logging - logging.getLogger("kin").info( - "PM set completion_mode=%s for task %s", pm_completion_mode, task_id - ) - - # Save category from PM output (only if task has no category yet) - if not task_current.get("category"): - pm_category = output.get("category") - if pm_category and isinstance(pm_category, str): - pm_category = pm_category.upper() - if pm_category in models.TASK_CATEGORIES: - update_fields["category"] = pm_category - - if update_fields: - models.update_task(conn, task_id, **update_fields) - - click.echo(f"\nAnalysis: {analysis}") - click.echo(f"Pipeline ({len(pipeline_steps)} steps):") - for i, step in enumerate(pipeline_steps, 1): - click.echo(f" {i}. {step['role']} ({step.get('model', 'sonnet')}): {step.get('brief', '')}") - - if is_noninteractive: - click.echo("\n[non-interactive] Auto-executing pipeline...") - elif not click.confirm("\nExecute pipeline?"): - click.echo("Aborted.") - return + if is_noninteractive: + click.echo("\n[non-interactive] Auto-executing pipeline...") + elif not click.confirm("\nExecute pipeline?"): + click.echo("Aborted.") + return # Step 2: Execute pipeline click.echo("\nExecuting pipeline...") diff --git a/core/db.py b/core/db.py index 5d8e47b..51c41b6 100644 --- a/core/db.py +++ b/core/db.py @@ -3,11 +3,10 @@ Kin — SQLite database schema and connection management. All tables from DESIGN.md section 3.5 State Management. """ -import os import sqlite3 from pathlib import Path -DB_PATH = Path(os.environ.get("KIN_DB_PATH") or (Path.home() / ".kin" / "kin.db")) +DB_PATH = Path(__file__).parent.parent / "kin.db" SCHEMA = """ -- Проекты (центральный реестр) @@ -66,7 +65,6 @@ CREATE TABLE IF NOT EXISTS tasks ( revise_comment TEXT, revise_count INTEGER DEFAULT 0, revise_target_role TEXT DEFAULT NULL, - pending_steps JSON DEFAULT NULL, labels JSON, category TEXT DEFAULT NULL, telegram_sent BOOLEAN DEFAULT 0, @@ -402,10 +400,6 @@ def _migrate(conn: sqlite3.Connection): conn.execute("ALTER TABLE tasks ADD COLUMN revise_target_role TEXT DEFAULT NULL") conn.commit() - if "pending_steps" not in task_cols: - conn.execute("ALTER TABLE tasks ADD COLUMN pending_steps JSON DEFAULT NULL") - conn.commit() - if "obsidian_vault_path" not in proj_cols: conn.execute("ALTER TABLE projects ADD COLUMN obsidian_vault_path TEXT") conn.commit() diff --git a/core/models.py b/core/models.py index d4694ce..2553c47 100644 --- a/core/models.py +++ b/core/models.py @@ -37,7 +37,6 @@ _JSON_COLUMNS: frozenset[str] = frozenset({ "tech_stack", "brief", "spec", "review", "test_result", "security_result", "labels", "smoke_test_result", - "pending_steps", "tags", "dependencies", "steps", @@ -381,7 +380,7 @@ def update_task(conn: sqlite3.Connection, id: str, **fields) -> dict: """ if not fields: return get_task(conn, id) - json_cols = ("brief", "spec", "review", "test_result", "security_result", "labels", "smoke_test_result", "pending_steps") + json_cols = ("brief", "spec", "review", "test_result", "security_result", "labels", "smoke_test_result") for key in json_cols: if key in fields: fields[key] = _json_encode(fields[key]) diff --git a/tests/test_kin_arch_023_regression.py b/tests/test_kin_arch_023_regression.py deleted file mode 100644 index 6dae2df..0000000 --- a/tests/test_kin_arch_023_regression.py +++ /dev/null @@ -1,228 +0,0 @@ -"""Regression tests for KIN-ARCH-023 — Analyst injection into actual executed pipeline. - -Two concerns: -1. At revise_count >= 2, analyst MUST be the first step in pending_steps saved to DB - (not just in the response body). The subprocess picks steps from pending_steps, - so a test that only checks the response body misses the real execution path. -2. The condition simplification at api.py ~1056 is semantically correct: - `steps and (not steps or steps[0].get('role') != 'analyst')` - is equivalent to: - `steps and steps[0].get('role') != 'analyst'` - because when `steps` is truthy, `not steps` is always False. -""" - -import pytest -from unittest.mock import patch - -from core.db import init_db -from core import models - - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - -@pytest.fixture -def api_client(tmp_path): - """FastAPI TestClient with isolated DB (same pattern as test_kin_128_regression.py).""" - import web.api as api_module - api_module.DB_PATH = tmp_path / "test.db" - from web.api import app - from fastapi.testclient import TestClient - client = TestClient(app) - client.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"}) - client.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"}) - return client - - -@pytest.fixture -def api_client_with_conn(tmp_path): - """Returns (TestClient, db_path) so tests can query DB directly.""" - import web.api as api_module - db_path = tmp_path / "test.db" - api_module.DB_PATH = db_path - from web.api import app - from fastapi.testclient import TestClient - client = TestClient(app) - client.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"}) - client.post("/api/tasks", json={"project_id": "p1", "title": "Fix bug"}) - return client, db_path - - -# --------------------------------------------------------------------------- -# 1. Analyst actually persisted to pending_steps in DB (subprocess-visible path) -# --------------------------------------------------------------------------- - -class TestAnalystInjectedIntoPendingSteps: - """The subprocess reads steps from pending_steps in DB, not from the HTTP response. - These tests verify the DB state, not just the response body. - """ - - def test_second_revise_saves_analyst_first_in_pending_steps(self, api_client_with_conn): - """revise_count=2: pending_steps[0].role == 'analyst' in DB after revise call.""" - client, db_path = api_client_with_conn - steps = [{"role": "backend_dev", "model": "sonnet"}] - - with patch("web.api._launch_pipeline_subprocess"): - client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": steps}) - client.post("/api/tasks/P1-001/revise", json={"comment": "r2", "steps": steps}) - - conn = init_db(str(db_path)) - task = models.get_task(conn, "P1-001") - conn.close() - - pending = task.get("pending_steps") or [] - assert pending, "pending_steps не должен быть пустым после 2-й ревизии" - assert pending[0].get("role") == "analyst", ( - f"KIN-ARCH-023: analyst должен быть первым в pending_steps (subprocess читает именно их), " - f"получили: {pending[0].get('role')}" - ) - - def test_first_revise_does_not_add_analyst_to_pending_steps(self, api_client_with_conn): - """revise_count=1: analyst НЕ добавляется в pending_steps.""" - client, db_path = api_client_with_conn - steps = [{"role": "backend_dev", "model": "sonnet"}] - - with patch("web.api._launch_pipeline_subprocess"): - client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": steps}) - - conn = init_db(str(db_path)) - task = models.get_task(conn, "P1-001") - conn.close() - - pending = task.get("pending_steps") or [] - assert not pending or pending[0].get("role") != "analyst", ( - "analyst не должен быть первым шагом в pending_steps при revise_count=1" - ) - - def test_third_revise_saves_analyst_first_in_pending_steps(self, api_client_with_conn): - """revise_count=3: analyst также является первым в pending_steps.""" - client, db_path = api_client_with_conn - steps = [{"role": "frontend_dev", "model": "sonnet"}] - - with patch("web.api._launch_pipeline_subprocess"): - for comment in ("r1", "r2", "r3"): - client.post("/api/tasks/P1-001/revise", json={"comment": comment, "steps": steps}) - - conn = init_db(str(db_path)) - task = models.get_task(conn, "P1-001") - conn.close() - - pending = task.get("pending_steps") or [] - assert pending, "pending_steps не должен быть пустым после 3-й ревизии" - assert pending[0].get("role") == "analyst", ( - f"revise_count=3: первым в pending_steps должен быть analyst, получили: {pending[0].get('role')}" - ) - - def test_pending_steps_length_increases_by_one_after_analyst_injection(self, api_client_with_conn): - """После инжекции analyst длина pending_steps на 1 больше исходного списка шагов.""" - client, db_path = api_client_with_conn - original_steps = [ - {"role": "backend_dev", "model": "sonnet"}, - {"role": "reviewer", "model": "sonnet"}, - ] - - with patch("web.api._launch_pipeline_subprocess"): - client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": original_steps}) - client.post("/api/tasks/P1-001/revise", json={"comment": "r2", "steps": original_steps}) - - conn = init_db(str(db_path)) - task = models.get_task(conn, "P1-001") - conn.close() - - pending = task.get("pending_steps") or [] - assert len(pending) == len(original_steps) + 1, ( - f"Ожидали {len(original_steps) + 1} шагов (с analyst), получили {len(pending)}" - ) - - def test_analyst_not_duplicated_in_pending_steps_if_already_first(self, api_client_with_conn): - """Если analyst уже первый шаг — pending_steps не содержит дублей analyst.""" - client, db_path = api_client_with_conn - steps = [{"role": "analyst", "model": "sonnet"}, {"role": "backend_dev", "model": "sonnet"}] - - with patch("web.api._launch_pipeline_subprocess"): - client.post("/api/tasks/P1-001/revise", json={"comment": "r1", "steps": steps}) - client.post("/api/tasks/P1-001/revise", json={"comment": "r2", "steps": steps}) - - conn = init_db(str(db_path)) - task = models.get_task(conn, "P1-001") - conn.close() - - pending = task.get("pending_steps") or [] - analyst_count = sum(1 for s in pending if s.get("role") == "analyst") - assert analyst_count == 1, ( - f"analyst не должен дублироваться в pending_steps, нашли {analyst_count} вхождений" - ) - - -# --------------------------------------------------------------------------- -# 2. Condition simplification semantics (api.py ~1056) -# --------------------------------------------------------------------------- - -def _original_condition(steps): - """Original (verbose) condition from before the simplification.""" - return steps and (not steps or steps[0].get("role") != "analyst") - - -def _simplified_condition(steps): - """Simplified condition after KIN-ARCH-023 fix.""" - return steps and steps[0].get("role") != "analyst" - - -class TestConditionSimplificationEquivalence: - """Verify that the simplified condition produces identical results to the original - across all meaningful input variants. This guards against future refactors - accidentally diverging the two forms. - """ - - def test_empty_list_both_return_falsy(self): - """steps=[] → оба варианта возвращают falsy (инжекция не происходит).""" - assert not _original_condition([]) - assert not _simplified_condition([]) - - def test_none_both_return_falsy(self): - """steps=None → оба варианта возвращают falsy.""" - assert not _original_condition(None) - assert not _simplified_condition(None) - - def test_analyst_first_both_return_falsy(self): - """Если analyst уже первый — оба варианта возвращают falsy (инжекция пропускается).""" - steps = [{"role": "analyst"}, {"role": "backend_dev"}] - assert not _original_condition(steps) - assert not _simplified_condition(steps) - - def test_non_analyst_first_both_return_truthy(self): - """Если первый шаг не analyst — оба варианта truthy (инжекция происходит).""" - steps = [{"role": "backend_dev"}, {"role": "reviewer"}] - assert _original_condition(steps) - assert _simplified_condition(steps) - - def test_single_step_non_analyst_both_truthy(self): - """Одиночный шаг не-analyst → оба truthy.""" - steps = [{"role": "debugger"}] - assert _original_condition(steps) - assert _simplified_condition(steps) - - def test_step_without_role_key_both_truthy(self): - """Шаг без ключа 'role' → steps[0].get('role') возвращает None ≠ 'analyst' → truthy.""" - steps = [{"model": "sonnet"}] - assert _original_condition(steps) - assert _simplified_condition(steps) - - @pytest.mark.parametrize("steps", [ - [], - None, - [{"role": "analyst"}], - [{"role": "backend_dev"}], - [{"role": "reviewer"}, {"role": "backend_dev"}], - [{"role": "analyst"}, {"role": "backend_dev"}], - [{"model": "sonnet"}], - ]) - def test_parametrized_equivalence(self, steps): - """Для всех входных данных упрощённое условие идентично исходному.""" - original = bool(_original_condition(steps)) - simplified = bool(_simplified_condition(steps)) - assert original == simplified, ( - f"Условия расходятся для steps={steps}: " - f"original={original}, simplified={simplified}" - ) diff --git a/web/api.py b/web/api.py index 53c481b..9e2aa9b 100644 --- a/web/api.py +++ b/web/api.py @@ -1053,7 +1053,7 @@ def revise_task(task_id: str, body: TaskRevise): # KIN-128: On 2nd+ revision, inject analyst as first step for fresh perspective. # Guard: skip if analyst is already the first step (idempotent), or if steps is None. - if revise_count >= 2 and steps and steps[0].get("role") != "analyst": + if revise_count >= 2 and steps and (not steps or steps[0].get("role") != "analyst"): analyst_step = { "role": "analyst", "model": "sonnet", @@ -1065,11 +1065,6 @@ def revise_task(task_id: str, body: TaskRevise): } steps = [analyst_step] + list(steps) - # Persist computed steps so the subprocess can use them (avoids PM re-planning). - # Decision #866: modified pipeline steps must be saved to DB before subprocess launch. - if steps: - models.update_task(conn, task_id, pending_steps=steps) - conn.close() # Launch pipeline in background subprocess