From 2be94f0c68a2fb2b3ee4815fe0051b946762b44c Mon Sep 17 00:00:00 2001 From: Gros Frumos Date: Wed, 18 Mar 2026 22:27:06 +0200 Subject: [PATCH] kin: KIN-ARCH-023-debugger --- cli/main.py | 149 +++++++++++++++++++++++++++---------------------- core/db.py | 5 ++ core/models.py | 3 +- web/api.py | 5 ++ 4 files changed, 94 insertions(+), 68 deletions(-) diff --git a/cli/main.py b/cli/main.py index d00c9e7..6b29e65 100644 --- a/cli/main.py +++ b/cli/main.py @@ -616,83 +616,98 @@ def run_task(ctx, task_id, dry_run, allow_write): is_noninteractive = os.environ.get("KIN_NONINTERACTIVE") == "1" click.echo(f"Task: {task['id']} — {task['title']}") - # Step 1: PM decomposes - click.echo("Running PM to decompose task...") - pm_started_at = datetime.now(timezone.utc).isoformat() - pm_result = run_agent( - conn, "pm", task_id, project_id, - model="sonnet", dry_run=dry_run, - allow_write=allow_write, noninteractive=is_noninteractive, - ) - pm_ended_at = datetime.now(timezone.utc).isoformat() + # Step 1a: Check for pre-built steps from revise endpoint (e.g. with analyst injection). + # Decision #866: steps built in web/api.py are saved as pending_steps and consumed here. + pending_steps = task.get("pending_steps") + if pending_steps: + pipeline_steps = pending_steps + models.update_task(conn, task_id, pending_steps=None) + click.echo(f"Using pre-built pipeline ({len(pipeline_steps)} steps, skipping PM)...") + pm_result = None + pm_started_at = pm_ended_at = None + if is_noninteractive: + click.echo("\n[non-interactive] Auto-executing pipeline...") + elif not click.confirm("\nExecute pipeline?"): + click.echo("Aborted.") + return + else: + # Step 1b: PM decomposes + click.echo("Running PM to decompose task...") + pm_started_at = datetime.now(timezone.utc).isoformat() + pm_result = run_agent( + conn, "pm", task_id, project_id, + model="sonnet", dry_run=dry_run, + allow_write=allow_write, noninteractive=is_noninteractive, + ) + pm_ended_at = datetime.now(timezone.utc).isoformat() - if dry_run: - click.echo("\n--- PM Prompt (dry-run) ---") - click.echo(pm_result.get("prompt", "")[:2000]) - click.echo("\n(Dry-run: PM would produce a pipeline JSON)") - return + if dry_run: + click.echo("\n--- PM Prompt (dry-run) ---") + click.echo(pm_result.get("prompt", "")[:2000]) + click.echo("\n(Dry-run: PM would produce a pipeline JSON)") + return - if not pm_result["success"]: - click.echo(f"PM failed: {pm_result.get('output', 'unknown error')}", err=True) - raise SystemExit(1) - - # Parse PM output for pipeline - output = pm_result.get("output") - if isinstance(output, str): - try: - output = json.loads(output) - except json.JSONDecodeError: - click.echo(f"PM returned non-JSON output:\n{output[:500]}", err=True) + if not pm_result["success"]: + click.echo(f"PM failed: {pm_result.get('output', 'unknown error')}", err=True) raise SystemExit(1) - if not isinstance(output, dict) or "pipeline" not in output: - click.echo(f"PM output missing 'pipeline' key:\n{json.dumps(output, indent=2)[:500]}", err=True) - raise SystemExit(1) + # Parse PM output for pipeline + output = pm_result.get("output") + if isinstance(output, str): + try: + output = json.loads(output) + except json.JSONDecodeError: + click.echo(f"PM returned non-JSON output:\n{output[:500]}", err=True) + raise SystemExit(1) - pipeline_steps = output["pipeline"] - if not isinstance(pipeline_steps, list) or not pipeline_steps: - click.echo( - f"PM returned empty or invalid pipeline: {pipeline_steps!r}", err=True - ) - raise SystemExit(1) - analysis = output.get("analysis", "") + if not isinstance(output, dict) or "pipeline" not in output: + click.echo(f"PM output missing 'pipeline' key:\n{json.dumps(output, indent=2)[:500]}", err=True) + raise SystemExit(1) - # Save completion_mode from PM output to task (only if neither task nor project has explicit mode) - task_current = models.get_task(conn, task_id) - update_fields = {} - project = models.get_project(conn, project_id) - project_mode = project.get("execution_mode") if project else None - if not task_current.get("execution_mode") and not project_mode: - pm_completion_mode = models.validate_completion_mode( - output.get("completion_mode", "review") - ) - update_fields["execution_mode"] = pm_completion_mode - import logging - logging.getLogger("kin").info( - "PM set completion_mode=%s for task %s", pm_completion_mode, task_id - ) + pipeline_steps = output["pipeline"] + if not isinstance(pipeline_steps, list) or not pipeline_steps: + click.echo( + f"PM returned empty or invalid pipeline: {pipeline_steps!r}", err=True + ) + raise SystemExit(1) + analysis = output.get("analysis", "") - # Save category from PM output (only if task has no category yet) - if not task_current.get("category"): - pm_category = output.get("category") - if pm_category and isinstance(pm_category, str): - pm_category = pm_category.upper() - if pm_category in models.TASK_CATEGORIES: - update_fields["category"] = pm_category + # Save completion_mode from PM output to task (only if neither task nor project has explicit mode) + task_current = models.get_task(conn, task_id) + update_fields = {} + project = models.get_project(conn, project_id) + project_mode = project.get("execution_mode") if project else None + if not task_current.get("execution_mode") and not project_mode: + pm_completion_mode = models.validate_completion_mode( + output.get("completion_mode", "review") + ) + update_fields["execution_mode"] = pm_completion_mode + import logging + logging.getLogger("kin").info( + "PM set completion_mode=%s for task %s", pm_completion_mode, task_id + ) - if update_fields: - models.update_task(conn, task_id, **update_fields) + # Save category from PM output (only if task has no category yet) + if not task_current.get("category"): + pm_category = output.get("category") + if pm_category and isinstance(pm_category, str): + pm_category = pm_category.upper() + if pm_category in models.TASK_CATEGORIES: + update_fields["category"] = pm_category - click.echo(f"\nAnalysis: {analysis}") - click.echo(f"Pipeline ({len(pipeline_steps)} steps):") - for i, step in enumerate(pipeline_steps, 1): - click.echo(f" {i}. {step['role']} ({step.get('model', 'sonnet')}): {step.get('brief', '')}") + if update_fields: + models.update_task(conn, task_id, **update_fields) - if is_noninteractive: - click.echo("\n[non-interactive] Auto-executing pipeline...") - elif not click.confirm("\nExecute pipeline?"): - click.echo("Aborted.") - return + click.echo(f"\nAnalysis: {analysis}") + click.echo(f"Pipeline ({len(pipeline_steps)} steps):") + for i, step in enumerate(pipeline_steps, 1): + click.echo(f" {i}. {step['role']} ({step.get('model', 'sonnet')}): {step.get('brief', '')}") + + if is_noninteractive: + click.echo("\n[non-interactive] Auto-executing pipeline...") + elif not click.confirm("\nExecute pipeline?"): + click.echo("Aborted.") + return # Step 2: Execute pipeline click.echo("\nExecuting pipeline...") diff --git a/core/db.py b/core/db.py index 51c41b6..7b27293 100644 --- a/core/db.py +++ b/core/db.py @@ -65,6 +65,7 @@ CREATE TABLE IF NOT EXISTS tasks ( revise_comment TEXT, revise_count INTEGER DEFAULT 0, revise_target_role TEXT DEFAULT NULL, + pending_steps JSON DEFAULT NULL, labels JSON, category TEXT DEFAULT NULL, telegram_sent BOOLEAN DEFAULT 0, @@ -400,6 +401,10 @@ def _migrate(conn: sqlite3.Connection): conn.execute("ALTER TABLE tasks ADD COLUMN revise_target_role TEXT DEFAULT NULL") conn.commit() + if "pending_steps" not in task_cols: + conn.execute("ALTER TABLE tasks ADD COLUMN pending_steps JSON DEFAULT NULL") + conn.commit() + if "obsidian_vault_path" not in proj_cols: conn.execute("ALTER TABLE projects ADD COLUMN obsidian_vault_path TEXT") conn.commit() diff --git a/core/models.py b/core/models.py index 2553c47..d4694ce 100644 --- a/core/models.py +++ b/core/models.py @@ -37,6 +37,7 @@ _JSON_COLUMNS: frozenset[str] = frozenset({ "tech_stack", "brief", "spec", "review", "test_result", "security_result", "labels", "smoke_test_result", + "pending_steps", "tags", "dependencies", "steps", @@ -380,7 +381,7 @@ def update_task(conn: sqlite3.Connection, id: str, **fields) -> dict: """ if not fields: return get_task(conn, id) - json_cols = ("brief", "spec", "review", "test_result", "security_result", "labels", "smoke_test_result") + json_cols = ("brief", "spec", "review", "test_result", "security_result", "labels", "smoke_test_result", "pending_steps") for key in json_cols: if key in fields: fields[key] = _json_encode(fields[key]) diff --git a/web/api.py b/web/api.py index 9e2aa9b..52c6fae 100644 --- a/web/api.py +++ b/web/api.py @@ -1065,6 +1065,11 @@ def revise_task(task_id: str, body: TaskRevise): } steps = [analyst_step] + list(steps) + # Persist computed steps so the subprocess can use them (avoids PM re-planning). + # Decision #866: modified pipeline steps must be saved to DB before subprocess launch. + if steps: + models.update_task(conn, task_id, pending_steps=steps) + conn.close() # Launch pipeline in background subprocess