From eab9e951ab41e16f036b302d93124dc750c0abce Mon Sep 17 00:00:00 2001 From: Gros Frumos Date: Tue, 17 Mar 2026 17:26:31 +0200 Subject: [PATCH] kin: auto-commit after pipeline --- agents/runner.py | 138 ++++++++++ cli/main.py | 21 ++ cli/watch.py | 177 +++++++++++++ core/db.py | 46 ++++ core/deploy.py | 223 ++++++++++++++++ core/models.py | 121 +++++++++ tasks/kin-084-spec.md | 217 +++++++++++++++ tests/test_api.py | 66 +++++ tests/test_db.py | 39 +++ tests/test_kin_092_watch_ps.py | 464 +++++++++++++++++++++++++++++++++ tests/test_models.py | 73 ++++++ web/api.py | 116 ++++++++- 12 files changed, 1696 insertions(+), 5 deletions(-) create mode 100644 cli/watch.py create mode 100644 core/deploy.py create mode 100644 tasks/kin-084-spec.md create mode 100644 tests/test_kin_092_watch_ps.py diff --git a/agents/runner.py b/agents/runner.py index 6876a40..40832a2 100644 --- a/agents/runner.py +++ b/agents/runner.py @@ -1058,6 +1058,17 @@ def _execute_department_head_step( # Blocked status from dept head if parsed.get("status") == "blocked": reason = parsed.get("blocked_reason", "Department head reported blocked") + # KIN-084: log dept head blocked + if parent_pipeline_id: + try: + models.write_log( + conn, parent_pipeline_id, + f"Dept {step['role']} blocked: {reason}", + level="WARN", + extra={"role": step["role"], "blocked_reason": reason}, + ) + except Exception: + pass return { "success": False, "output": json.dumps(parsed, ensure_ascii=False), @@ -1095,6 +1106,18 @@ def _execute_department_head_step( }, }, ensure_ascii=False) + # KIN-084: log sub-pipeline start + if parent_pipeline_id: + try: + sub_roles = [s.get("role", "") for s in sub_pipeline] + models.write_log( + conn, parent_pipeline_id, + f"Sub-pipeline start: dept={dept_name}, steps={len(sub_pipeline)}", + extra={"dept_name": dept_name, "sub_steps": len(sub_pipeline), "sub_roles": sub_roles}, + ) + except Exception: + pass + # Run the sub-pipeline (noninteractive=True — Opus already reviewed the plan) # pass parent_pipeline_id and department so run_pipeline creates the child # pipeline with correct attributes (route_type='dept_sub') — no double create @@ -1108,6 +1131,19 @@ def _execute_department_head_step( department=dept_name, ) + # KIN-084: log sub-pipeline done + if parent_pipeline_id: + try: + sub_success = sub_result.get("success", False) + models.write_log( + conn, parent_pipeline_id, + f"Sub-pipeline done: dept={dept_name}, success={sub_success}, steps={sub_result.get('steps_completed', 0)}", + level="INFO" if sub_success else "ERROR", + extra={"dept_name": dept_name, "success": sub_success, "steps_completed": sub_result.get("steps_completed", 0)}, + ) + except Exception: + pass + # Extract decisions from sub-pipeline results for handoff decisions_made = [] sub_results = sub_result.get("results", []) @@ -1268,6 +1304,15 @@ def run_pipeline( # Save PID so watchdog can detect dead subprocesses (KIN-099) pipeline = models.update_pipeline(conn, pipeline["id"], pid=os.getpid()) models.update_task(conn, task_id, status="in_progress") + # KIN-084: log pipeline start + try: + models.write_log( + conn, pipeline["id"], + f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}", + extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode}, + ) + except Exception: + pass results = [] total_cost = 0.0 @@ -1281,6 +1326,19 @@ def run_pipeline( model = step.get("model", "sonnet") brief = step.get("brief") + # KIN-084: log step start + try: + if pipeline: + brief_preview = (step.get("brief") or "")[:100] + models.write_log( + conn, pipeline["id"], + f"Step {i+1}/{len(steps)} start: role={role}, model={model}", + level="INFO", + extra={"role": role, "model": model, "brief_preview": brief_preview}, + ) + except Exception: + pass + # Check parent process is still alive (KIN-099 watchdog) if not dry_run and pipeline: if _check_parent_alive(conn, pipeline, task_id, project_id): @@ -1335,6 +1393,16 @@ def run_pipeline( total_tokens=total_tokens, total_duration_seconds=total_duration, ) + # KIN-084: log step exception + try: + models.write_log( + conn, pipeline["id"], + f"Step {i+1}/{len(steps)} exception: {exc}", + level="ERROR", + extra={"role": role, "exc_type": type(exc).__name__}, + ) + except Exception: + pass models.log_agent_run( conn, project_id=project_id, @@ -1382,6 +1450,23 @@ def run_pipeline( total_tokens += result.get("tokens_used") or 0 total_duration += result.get("duration_seconds") or 0 + # KIN-084: log step done + try: + if pipeline: + models.write_log( + conn, pipeline["id"], + f"Step {i+1}/{len(steps)} done: role={role}, success={result.get('success')}", + level="INFO", + extra={ + "role": role, + "tokens_used": result.get("tokens_used") or 0, + "cost_usd": result.get("cost_usd") or 0, + "duration_seconds": result.get("duration_seconds") or 0, + }, + ) + except Exception: + pass + if not result["success"]: # Auto mode: retry once with allow_write on permission error if mode == "auto_complete" and not allow_write and _is_permission_error(result): @@ -1433,6 +1518,17 @@ def run_pipeline( total_duration_seconds=total_duration, ) agent_error = result.get("error") or "" + # KIN-084: log step failed + try: + if pipeline: + models.write_log( + conn, pipeline["id"], + f"Step {i+1}/{len(steps)} failed: {agent_error}", + level="ERROR", + extra={"role": role, "error": agent_error}, + ) + except Exception: + pass error_msg = f"Step {i+1}/{len(steps)} ({role}) failed" if agent_error: error_msg += f": {agent_error}" @@ -1499,6 +1595,16 @@ def run_pipeline( total_tokens=total_tokens, total_duration_seconds=total_duration, ) + # KIN-084: log step blocked + try: + models.write_log( + conn, pipeline["id"], + f"Step {i+1}/{len(steps)} blocked: {blocked_info['reason']}", + level="WARN", + extra={"role": role, "reason": blocked_info["reason"]}, + ) + except Exception: + pass models.update_task( conn, task_id, status="blocked", @@ -1746,6 +1852,20 @@ def run_pipeline( total_tokens=total_tokens, total_duration_seconds=total_duration, ) + # KIN-084: log pipeline completed + try: + models.write_log( + conn, pipeline["id"], + f"Pipeline completed: {len(steps)} steps, cost=${total_cost:.4f}, tokens={total_tokens}, duration={total_duration}s", + extra={ + "total_cost_usd": total_cost, + "total_tokens": total_tokens, + "total_duration_seconds": total_duration, + "steps_count": len(steps), + }, + ) + except Exception: + pass task_modules = models.get_modules(conn, project_id) @@ -1771,6 +1891,15 @@ def run_pipeline( elif mode == "auto_complete" and auto_eligible: # Auto-complete mode: last step is tester/reviewer — skip review, approve immediately models.update_task(conn, task_id, status="done") + # KIN-084: log task status + try: + models.write_log( + conn, pipeline["id"], + "Task status: done", + extra={"task_status": "done", "mode": mode}, + ) + except Exception: + pass try: run_hooks(conn, project_id, task_id, event="task_auto_approved", task_modules=task_modules) @@ -1800,6 +1929,15 @@ def run_pipeline( else: # Review mode: wait for manual approval models.update_task(conn, task_id, status="review", execution_mode="review") + # KIN-084: log task status + try: + models.write_log( + conn, pipeline["id"], + "Task status: review", + extra={"task_status": "review", "mode": mode}, + ) + except Exception: + pass # Run post-pipeline hooks (failures don't affect pipeline status) try: diff --git a/cli/main.py b/cli/main.py index 428565e..3e7a4c4 100644 --- a/cli/main.py +++ b/cli/main.py @@ -703,6 +703,27 @@ def run_task(ctx, task_id, dry_run, allow_write): click.echo(f"Duration: {result['total_duration_seconds']}s") +# =========================================================================== +# watch / ps +# =========================================================================== + +@cli.command("watch") +@click.argument("task_id") +@click.pass_context +def watch(ctx, task_id): + """Monitor a running task in real time (updates every 5s).""" + from cli.watch import cmd_watch + cmd_watch(ctx.obj["conn"], task_id) + + +@cli.command("ps") +@click.pass_context +def ps(ctx): + """List all running pipelines with PID and current step.""" + from cli.watch import cmd_ps + cmd_ps(ctx.obj["conn"]) + + # =========================================================================== # audit # =========================================================================== diff --git a/cli/watch.py b/cli/watch.py new file mode 100644 index 0000000..2671d37 --- /dev/null +++ b/cli/watch.py @@ -0,0 +1,177 @@ +""" +Kin CLI — kin watch and kin ps commands. + +kin watch: polling monitor, updates every 5s (like tail -f for a pipeline). +kin ps: one-shot list of all running pipelines with PID and current step. +""" + +import json +import time +from datetime import datetime + +from core import models + + +def _clear_screen() -> None: + print('\033[2J\033[H', end='', flush=True) + + +def _format_elapsed(dt_iso: str) -> str: + """Format elapsed time from SQLite CURRENT_TIMESTAMP (UTC naive, space separator).""" + try: + # SQLite stores "YYYY-MM-DD HH:MM:SS"; fromisoformat requires 'T' in Python < 3.11 + normalized = dt_iso.replace(' ', 'T') if ' ' in dt_iso else dt_iso + started = datetime.fromisoformat(normalized) + total_secs = int((datetime.utcnow() - started).total_seconds()) + if total_secs < 0: + total_secs = 0 + hours = total_secs // 3600 + minutes = (total_secs % 3600) // 60 + secs = total_secs % 60 + if hours > 0: + return f"{hours}h {minutes}m" + elif minutes > 0: + return f"{minutes}m {secs}s" + else: + return f"{secs}s" + except (ValueError, TypeError): + return "?" + + +def _parse_total_steps(steps) -> int | str: + """Return total planned steps from pipeline.steps (may be list or raw JSON string).""" + if steps is None: + return '?' + if isinstance(steps, list): + return len(steps) + # Fallback: raw JSON string (legacy or unexpected path) + if isinstance(steps, str): + try: + parsed = json.loads(steps) + if isinstance(parsed, list): + return len(parsed) + except (json.JSONDecodeError, ValueError): + pass + return '?' + + +def _render_watch( + task: dict, + pipeline: dict | None, + log: dict | None, + step_num: int, + total_steps: int | str, +) -> None: + """Render the watch screen to stdout.""" + sep = '─' * 60 + title = task['title'] or '' + title_short = (title[:50] + '…') if len(title) > 50 else title + + print(f"kin watch {task['id']}") + print(sep) + print(f"Task: {task['id']} {title_short}") + print(f"Status: {task['status']}") + + if pipeline: + pid_str = str(pipeline['pid']) if pipeline.get('pid') else '-' + print(f"Pipeline: #{pipeline['id']} [{pipeline['status']}] PID: {pid_str}") + elapsed = _format_elapsed(pipeline['created_at']) + print(f"Elapsed: {elapsed} | Agents run: {step_num} / Total planned: {total_steps}") + print() + current = log['agent_role'] if log else 'Waiting for first agent...' + print(f"Current agent: {current}") + else: + print() + print("No pipeline started yet.") + + print(sep) + + if log and log.get('output_summary'): + lines = (log['output_summary'] or '').splitlines() + for line in lines[-15:]: + print(line) + elif pipeline: + print("Waiting for first agent...") + + print(sep) + + terminal_statuses = ('completed', 'failed', 'cancelled') + if pipeline and pipeline['status'] in terminal_statuses: + print(f"[Pipeline {pipeline['status']}. Exiting.]") + else: + print("[Updating every 5s. Ctrl+C to stop]") + + +def _render_ps(rows: list[dict]) -> None: + """Render the running pipelines table to stdout.""" + if not rows: + print("No running pipelines.") + print("\nUse 'kin watch ' to monitor.") + return + + print(f"Running pipelines: {len(rows)}\n") + + headers = ["ID", "TASK", "PROJECT", "PID", "STARTED", "AGENT", "PARENT"] + col_widths = [len(h) for h in headers] + + formatted_rows = [] + for r in rows: + title = r.get('title') or '' + title_short = (title[:20] + '…') if len(title) > 20 else title + pid_str = str(r['pid']) if r.get('pid') else '-' + started = (_format_elapsed(r['created_at']) + ' ago') if r.get('created_at') else '-' + agent = r.get('current_agent') or '-' + parent = str(r['parent_pipeline_id']) if r.get('parent_pipeline_id') else '-' + row = [f"#{r['id']}", title_short, r.get('project_name') or '-', + pid_str, started, agent, parent] + formatted_rows.append(row) + for i, cell in enumerate(row): + col_widths[i] = max(col_widths[i], len(str(cell))) + + fmt = ' '.join(f'{{:<{w}}}' for w in col_widths) + print(fmt.format(*headers)) + print(fmt.format(*('-' * w for w in col_widths))) + for row in formatted_rows: + print(fmt.format(*row)) + + print("\nUse 'kin watch ' to monitor.") + + +def cmd_watch(conn, task_id: str) -> None: + """Polling monitor for a task. Updates every 5s. Exits on terminal pipeline state.""" + import sqlite3 + task = models.get_task(conn, task_id) + if not task: + print(f"Task '{task_id}' not found.") + return + + try: + while True: + _clear_screen() + pipeline = models.get_pipeline_for_watch(conn, task_id) + + if pipeline: + log = models.get_current_agent_log(conn, task_id, pipeline['created_at']) + step_num = conn.execute( + 'SELECT COUNT(*) FROM agent_logs WHERE task_id = ? AND created_at >= ?', + (task_id, pipeline['created_at']), + ).fetchone()[0] + total = _parse_total_steps(pipeline.get('steps')) + else: + log, step_num, total = None, 0, '?' + + _render_watch(task, pipeline, log, step_num, total) + + if pipeline and pipeline['status'] in ('completed', 'failed', 'cancelled'): + break + + time.sleep(5) + + except KeyboardInterrupt: + print('\nExiting watch mode.') + + +def cmd_ps(conn) -> None: + """One-shot list of all running pipelines.""" + rows = models.get_all_running_pipelines(conn) + _render_ps(rows) diff --git a/core/db.py b/core/db.py index 99ebfd9..618523a 100644 --- a/core/db.py +++ b/core/db.py @@ -29,6 +29,10 @@ CREATE TABLE IF NOT EXISTS projects ( ssh_key_path TEXT, ssh_proxy_jump TEXT, description TEXT, + deploy_host TEXT, + deploy_path TEXT, + deploy_runtime TEXT, + deploy_restart_cmd TEXT, autocommit_enabled INTEGER DEFAULT 0, obsidian_vault_path TEXT, worktrees_enabled INTEGER DEFAULT 0, @@ -302,6 +306,18 @@ CREATE TABLE IF NOT EXISTS task_attachments ( ); CREATE INDEX IF NOT EXISTS idx_task_attachments_task ON task_attachments(task_id); + +-- Live console log (KIN-084) +CREATE TABLE IF NOT EXISTS pipeline_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline_id INTEGER NOT NULL REFERENCES pipelines(id), + ts TEXT NOT NULL DEFAULT (datetime('now')), + level TEXT NOT NULL DEFAULT 'INFO', + message TEXT NOT NULL, + extra_json TEXT +); + +CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id); """ @@ -419,6 +435,22 @@ def _migrate(conn: sqlite3.Connection): conn.execute("ALTER TABLE projects ADD COLUMN description TEXT") conn.commit() + if "deploy_host" not in proj_cols: + conn.execute("ALTER TABLE projects ADD COLUMN deploy_host TEXT") + conn.commit() + + if "deploy_path" not in proj_cols: + conn.execute("ALTER TABLE projects ADD COLUMN deploy_path TEXT") + conn.commit() + + if "deploy_runtime" not in proj_cols: + conn.execute("ALTER TABLE projects ADD COLUMN deploy_runtime TEXT") + conn.commit() + + if "deploy_restart_cmd" not in proj_cols: + conn.execute("ALTER TABLE projects ADD COLUMN deploy_restart_cmd TEXT") + conn.commit() + # Migrate audit_log + project_phases tables existing_tables = {r[0] for r in conn.execute( "SELECT name FROM sqlite_master WHERE type='table'" @@ -612,6 +644,20 @@ def _migrate(conn: sqlite3.Connection): """) conn.commit() + if "pipeline_log" not in existing_tables: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS pipeline_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline_id INTEGER NOT NULL REFERENCES pipelines(id), + ts TEXT NOT NULL DEFAULT (datetime('now')), + level TEXT NOT NULL DEFAULT 'INFO', + message TEXT NOT NULL, + extra_json TEXT + ); + CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id); + """) + conn.commit() + # Migrate pipelines: add parent_pipeline_id and department columns (KIN-098) # Guard: table may not exist in legacy schemas without pipelines (old test fixtures) if "pipelines" in existing_tables: diff --git a/core/deploy.py b/core/deploy.py new file mode 100644 index 0000000..92ec254 --- /dev/null +++ b/core/deploy.py @@ -0,0 +1,223 @@ +""" +Kin — structured deploy module. + +Business logic for project deployments: +- Runtime-based step templates (docker/node/python/static) +- Local and SSH execution +- Dependency chain traversal via project_links +""" + +import sqlite3 +import subprocess +import time + +VALID_RUNTIMES = {"docker", "node", "python", "static"} + +# Base command templates per runtime. +# deploy_restart_cmd (if set) is appended as the final step for all runtimes. +RUNTIME_STEPS = { + "docker": ["git pull", "docker compose up -d --build"], + "node": ["git pull", "npm install --production", "pm2 restart all"], + "python": ["git pull", "pip install -r requirements.txt"], + "static": ["git pull", "nginx -s reload"], +} + + +def build_deploy_steps(project: dict) -> list[str]: + """Build deploy command list based on runtime and project config. + + Returns empty list if deploy_runtime is not set or invalid. + Appends deploy_restart_cmd as the last step if provided. + """ + runtime = project.get("deploy_runtime") + if not runtime or runtime not in RUNTIME_STEPS: + return [] + + steps = list(RUNTIME_STEPS[runtime]) + + restart_cmd = project.get("deploy_restart_cmd") + if restart_cmd and restart_cmd.strip(): + steps.append(restart_cmd.strip()) + + return steps + + +def _build_ssh_cmd(project: dict, command: str) -> list[str]: + """Build SSH subprocess command list to run a shell command on deploy_host.""" + deploy_host = project.get("deploy_host") or project.get("ssh_host") + ssh_user = project.get("ssh_user") or "root" + ssh_key = project.get("ssh_key_path") + proxy_jump = project.get("ssh_proxy_jump") + deploy_path = project.get("deploy_path") + + full_cmd = f"cd {deploy_path} && {command}" if deploy_path else command + + cmd = ["ssh"] + if ssh_key: + cmd += ["-i", ssh_key] + if proxy_jump: + cmd += ["-J", proxy_jump] + cmd += ["-o", "StrictHostKeyChecking=no", "-o", "BatchMode=yes"] + cmd += [f"{ssh_user}@{deploy_host}", full_cmd] + return cmd + + +def execute_deploy(project: dict, conn: sqlite3.Connection) -> dict: + """Execute deploy steps for a project. Returns structured result. + + Returns: + { + "success": bool, + "steps": list[str], + "results": list[{"step", "stdout", "stderr", "exit_code"}], + } + """ + deploy_host = project.get("deploy_host") or project.get("ssh_host") + steps = build_deploy_steps(project) + + if not steps: + return { + "success": False, + "steps": [], + "results": [], + "error": "No deploy steps: deploy_runtime not set or invalid", + } + + deploy_path = project.get("deploy_path") or project.get("path") or None + results = [] + overall_success = True + + for step in steps: + try: + if deploy_host: + cmd = _build_ssh_cmd(project, step) + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=120, + ) + else: + # WARNING: shell=True — deploy commands are admin-only, set by project owner + proc = subprocess.run( + step, + shell=True, # WARNING: shell=True — command is admin-only + cwd=deploy_path, + capture_output=True, + text=True, + timeout=120, + ) + except subprocess.TimeoutExpired: + results.append({ + "step": step, + "stdout": "", + "stderr": "Timed out after 120 seconds", + "exit_code": -1, + }) + overall_success = False + break + except Exception as e: + results.append({ + "step": step, + "stdout": "", + "stderr": str(e), + "exit_code": -1, + }) + overall_success = False + break + + results.append({ + "step": step, + "stdout": proc.stdout, + "stderr": proc.stderr, + "exit_code": proc.returncode, + }) + + if proc.returncode != 0: + overall_success = False + break # Stop on first failure + + return { + "success": overall_success, + "steps": steps, + "results": results, + } + + +def get_deploy_chain(conn: sqlite3.Connection, project_id: str) -> list[str]: + """Get ordered deploy chain: project + all downstream dependents (BFS, cycle-safe). + + Traverses project_links where type='depends_on': + - from_project depends_on to_project + - When to_project changes, from_project must also be redeployed. + + Returns list starting with project_id, followed by dependents in BFS order. + """ + visited: set[str] = {project_id} + chain: list[str] = [project_id] + queue: list[str] = [project_id] + + while queue: + current = queue.pop(0) + rows = conn.execute( + "SELECT from_project FROM project_links" + " WHERE to_project = ? AND type = 'depends_on'", + (current,), + ).fetchall() + for row in rows: + dep_id = row[0] + if dep_id not in visited: + visited.add(dep_id) + chain.append(dep_id) + queue.append(dep_id) + + return chain + + +def deploy_with_dependents(conn: sqlite3.Connection, project_id: str) -> dict: + """Deploy project and all dependents in chain order. + + Returns: + { + "success": bool, + "steps": list[str], # main project steps + "results": list[dict], # main project step results + "dependents_deployed": list[str], + } + """ + from core.models import get_project + + chain = get_deploy_chain(conn, project_id) + dependents = [pid for pid in chain if pid != project_id] + + # Deploy main project first + main_project = get_project(conn, project_id) + if not main_project: + return { + "success": False, + "steps": [], + "results": [], + "dependents_deployed": [], + "error": f"Project '{project_id}' not found", + } + + main_result = execute_deploy(main_project, conn) + overall_success = main_result["success"] + + dependents_deployed = [] + if main_result["success"] and dependents: + for dep_id in dependents: + dep_project = get_project(conn, dep_id) + if dep_project: + dep_result = execute_deploy(dep_project, conn) + if dep_result["success"]: + dependents_deployed.append(dep_id) + else: + overall_success = False + + return { + "success": overall_success, + "steps": main_result.get("steps", []), + "results": main_result.get("results", []), + "dependents_deployed": dependents_deployed, + } diff --git a/core/models.py b/core/models.py index 43d8155..baf6973 100644 --- a/core/models.py +++ b/core/models.py @@ -531,6 +531,86 @@ def get_running_pipelines_with_pid(conn: sqlite3.Connection) -> list[dict]: return _rows_to_list(rows) +def get_pipeline_for_watch(conn: sqlite3.Connection, task_id: str) -> dict | None: + """Return the most recent top-level pipeline for a task (for kin watch).""" + row = conn.execute( + """SELECT id, task_id, project_id, status, pid, steps, created_at, completed_at + FROM pipelines + WHERE task_id = ? AND parent_pipeline_id IS NULL + ORDER BY created_at DESC, id DESC LIMIT 1""", + (task_id,), + ).fetchone() + return _row_to_dict(row) + + +def get_current_agent_log( + conn: sqlite3.Connection, task_id: str, since_iso: str +) -> dict | None: + """Return the most recent agent log for a task since a given datetime (for kin watch).""" + row = conn.execute( + """SELECT agent_role, output_summary, duration_seconds, success, created_at + FROM agent_logs + WHERE task_id = ? AND created_at >= ? + ORDER BY id DESC LIMIT 1""", + (task_id, since_iso), + ).fetchone() + return _row_to_dict(row) + + +def write_log( + conn: sqlite3.Connection, + pipeline_id: int, + message: str, + level: str = "INFO", + extra: dict | list | None = None, +) -> dict: + """Insert a pipeline log entry. Returns inserted row as dict.""" + extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None + cur = conn.execute( + """INSERT INTO pipeline_log (pipeline_id, message, level, extra_json) + VALUES (?, ?, ?, ?)""", + (pipeline_id, message, level, extra_json), + ) + conn.commit() + row = conn.execute( + "SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,) + ).fetchone() + return _row_to_dict(row) + + +def get_pipeline_logs( + conn: sqlite3.Connection, + pipeline_id: int, + since_id: int = 0, +) -> list[dict]: + """Get pipeline log entries after since_id in chronological order.""" + rows = conn.execute( + """SELECT * FROM pipeline_log + WHERE pipeline_id = ? AND id > ? + ORDER BY id ASC""", + (pipeline_id, since_id), + ).fetchall() + return _rows_to_list(rows) + + +def get_all_running_pipelines(conn: sqlite3.Connection) -> list[dict]: + """Return all running pipelines with task/project info and current agent (for kin ps).""" + rows = conn.execute( + """SELECT p.id, p.task_id, p.status, p.pid, p.created_at, + p.parent_pipeline_id, + t.title, proj.name AS project_name, + (SELECT agent_role FROM agent_logs + WHERE task_id = p.task_id AND created_at >= p.created_at + ORDER BY id DESC LIMIT 1) AS current_agent + FROM pipelines p + JOIN tasks t ON p.task_id = t.id + JOIN projects proj ON p.project_id = proj.id + WHERE p.status = 'running' + ORDER BY p.created_at DESC""" + ).fetchall() + return _rows_to_list(rows) + + # --------------------------------------------------------------------------- # Support # --------------------------------------------------------------------------- @@ -998,6 +1078,47 @@ def get_last_handoff( return _row_to_dict(row) +# --------------------------------------------------------------------------- +# Project Links (KIN-079) +# --------------------------------------------------------------------------- + +def create_project_link( + conn: sqlite3.Connection, + from_project: str, + to_project: str, + type: str, + description: str | None = None, +) -> dict: + """Create a project dependency link. Returns the created link as dict.""" + cur = conn.execute( + """INSERT INTO project_links (from_project, to_project, type, description) + VALUES (?, ?, ?, ?)""", + (from_project, to_project, type, description), + ) + conn.commit() + row = conn.execute( + "SELECT * FROM project_links WHERE id = ?", (cur.lastrowid,) + ).fetchone() + return _row_to_dict(row) + + +def get_project_links(conn: sqlite3.Connection, project_id: str) -> list[dict]: + """Get all project links where project_id is from_project or to_project.""" + rows = conn.execute( + "SELECT * FROM project_links WHERE from_project = ? OR to_project = ?" + " ORDER BY created_at", + (project_id, project_id), + ).fetchall() + return _rows_to_list(rows) + + +def delete_project_link(conn: sqlite3.Connection, link_id: int) -> bool: + """Delete a project link by id. Returns True if deleted, False if not found.""" + cur = conn.execute("DELETE FROM project_links WHERE id = ?", (link_id,)) + conn.commit() + return cur.rowcount > 0 + + def get_chat_messages( conn: sqlite3.Connection, project_id: str, diff --git a/tasks/kin-084-spec.md b/tasks/kin-084-spec.md new file mode 100644 index 0000000..2993bf2 --- /dev/null +++ b/tasks/kin-084-spec.md @@ -0,0 +1,217 @@ +# KIN-084 — Live Console: архитектурная спека + +## 1. DDL — таблица pipeline_log + +```sql +CREATE TABLE IF NOT EXISTS pipeline_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline_id INTEGER NOT NULL REFERENCES pipelines(id), + ts TEXT NOT NULL DEFAULT (datetime('now')), + level TEXT NOT NULL DEFAULT 'INFO', + message TEXT NOT NULL, + extra_json TEXT +); + +CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id); +``` + +**Замечание**: Brief указывает `pipeline_id TEXT`, но `pipelines.id` — `INTEGER PRIMARY KEY AUTOINCREMENT`. +Используем `INTEGER` (соответствует паттерну `department_handoffs.pipeline_id INTEGER`). + +**Где добавить**: +1. В `SCHEMA` в `core/db.py` — в конец строки SCHEMA (после `task_attachments`) +2. В `_migrate()` в `core/db.py` — guard `if "pipeline_log" not in existing_tables:` (по аналогии с `task_attachments`) + +--- + +## 2. Функции в core/models.py + +### write_log() + +```python +def write_log( + conn: sqlite3.Connection, + pipeline_id: int, + message: str, + level: str = "INFO", + extra: dict | list | None = None, +) -> dict: + """Insert a pipeline log entry. Returns inserted row as dict. + + extra: optional structured data (serialized to JSON string). + level: INFO | DEBUG | ERROR | WARN + """ + extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None + cur = conn.execute( + """INSERT INTO pipeline_log (pipeline_id, message, level, extra_json) + VALUES (?, ?, ?, ?)""", + (pipeline_id, message, level, extra_json), + ) + conn.commit() + row = conn.execute( + "SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,) + ).fetchone() + return _row_to_dict(row) +``` + +### get_pipeline_logs() + +```python +def get_pipeline_logs( + conn: sqlite3.Connection, + pipeline_id: int, + since_id: int = 0, +) -> list[dict]: + """Get pipeline log entries after since_id in chronological order. + + since_id=0 returns all entries (id > 0 matches all). + """ + rows = conn.execute( + """SELECT * FROM pipeline_log + WHERE pipeline_id = ? AND id > ? + ORDER BY id ASC""", + (pipeline_id, since_id), + ).fetchall() + return _rows_to_list(rows) +``` + +**Замечание по extra_json**: `_row_to_dict()` автоматически декодирует JSON-строки начинающиеся с `{` или `[`. +Так что в ответе API `extra_json` будет уже распарсенным dict/list. + +--- + +## 3. Точки инструментации в agents/runner.py + +### Функция run_pipeline() (строки 1210–1835) + +Все write_log в run_pipeline должны: +- Быть защищены `if pipeline:` (dry_run не создаёт pipeline) +- Оборачиваться в `try/except Exception: pass` (не блокировать pipeline) + +| # | После строки | Событие | Level | extra | +|---|---|---|---|---| +| 1 | ~1270 после `update_pipeline(pid=...)` | Pipeline start | INFO | `{route_type, steps_count, mode}` | +| 2 | ~1280 начало тела цикла `for i, step` | Step N start | INFO | `{role, model, brief_preview}` | +| 3 | ~1383 после аккумуляции stats | Step N done (success) | INFO | `{tokens, cost_usd, duration_seconds}` | +| 4 | ~1329 в `except Exception as exc:` | Step exception | ERROR | `{role, exc_type}` | +| 5 | ~1436-1438 в блоке `if not result["success"]` | Step failed | ERROR | `{role, error}` | +| 6 | ~1525 в блоке `if blocked_info:` | Step blocked | WARN | `{role, reason}` | +| 7 | ~1748 после `update_pipeline(completed)` | Pipeline completed | INFO | `{steps, cost_usd, tokens, duration_s}` | +| 8 | ~1771–1802 финальный статус задачи | Task status | INFO | `{task_status, mode}` | + +### Функция _execute_department_head_step() (строки 1028–1172) + +Логируем в `parent_pipeline_id` (существует в момент вызова). +Guard: `if parent_pipeline_id:`. + +| # | После строки | Событие | Level | extra | +|---|---|---|---|---| +| 9 | ~1059-1066 (dept blocked) | Dept head blocked | WARN | `{role, blocked_reason}` | +| 10 | ~1100 перед вызовом sub run_pipeline | Sub-pipeline start | INFO | `{dept_name, sub_steps}` | +| 11 | ~1112 после вызова sub run_pipeline | Sub-pipeline done | INFO/ERROR | `{success, steps_completed}` | + +**Пример вызова (точка 2)**: +```python +try: + if pipeline: + brief_preview = (step.get("brief") or "")[:100] + models.write_log( + conn, pipeline["id"], + f"Step {i+1}/{len(steps)} start: role={role}, model={model}", + level="INFO", + extra={"role": role, "model": model, "brief_preview": brief_preview}, + ) +except Exception: + pass +``` + +**Пример вызова (точка 1 — pipeline start)**: +```python +try: + models.write_log( + conn, pipeline["id"], + f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}", + extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode}, + ) +except Exception: + pass +``` + +--- + +## 4. API Endpoint + +### GET /api/pipelines/{pipeline_id}/logs + +**Контракт**: +- Путь: `/api/pipelines/{pipeline_id}/logs` +- Query: `since_id: int = 0` +- Response: `list[PipelineLogEntry]` +- 404 если pipeline не найден + +**Схема ответа** (один элемент): +```json +{ + "id": 42, + "pipeline_id": 7, + "ts": "2026-03-17 14:23:01", + "level": "INFO", + "message": "Step 1/3 start: role=backend_dev, model=sonnet", + "extra_json": {"role": "backend_dev", "model": "sonnet", "brief_preview": "..."} +} +``` + +**Реализация в web/api.py**: +```python +@app.get("/api/pipelines/{pipeline_id}/logs") +def get_pipeline_logs(pipeline_id: int, since_id: int = 0): + """Get pipeline log entries after since_id (for live console polling).""" + conn = get_conn() + row = conn.execute("SELECT id FROM pipelines WHERE id = ?", (pipeline_id,)).fetchone() + if not row: + conn.close() + raise HTTPException(404, f"Pipeline {pipeline_id} not found") + logs = models.get_pipeline_logs(conn, pipeline_id, since_id=since_id) + conn.close() + return logs +``` + +**Поллинг-протокол (для фронтенда)**: +1. `since_id=0` — первый запрос, получить всё +2. `since_id = max(id)` из последнего ответа для инкрементного обновления +3. Поллить каждые 1-2 секунды пока `pipeline.status == 'running'` +4. Остановить поллинг когда pipeline завершён + +--- + +## 5. Тесты (обязательны для core/) + +### tests/test_db.py +- SCHEMA-строка содержит CREATE TABLE pipeline_log (convention #387) +- `_migrate()` создаёт pipeline_log если таблицы нет + +### tests/test_models.py +- `write_log()` корректно вставляет запись, возвращает dict +- `write_log()` с `extra=None` — extra_json = NULL +- `write_log()` с `extra={dict}` — extra_json декодируется в dict +- `get_pipeline_logs()` с `since_id=0` возвращает все записи +- `get_pipeline_logs()` с `since_id=N` возвращает только id > N +- Записи в хронологическом порядке (ORDER BY id ASC) + +### tests/test_api.py +- GET /api/pipelines/{id}/logs → 200, list +- GET /api/pipelines/{id}/logs?since_id=5 → только id > 5 +- GET /api/pipelines/9999/logs → 404 + +--- + +## 6. Совместимость (#375/#376) + +`write_log()` пишет ТОЛЬКО в `pipeline_log`, не затрагивает `pipelines.status` и `watchdog`. +Watchdog (core/watchdog.py) проверяет `pipelines.status = 'running' AND pid IS NOT NULL` — не зависит от pipeline_log. +Нет конфликтов. + +**FK guard**: write_log вызывается только ПОСЛЕ `create_pipeline()` (т.е. когда pipeline_id уже в БД). +PM-шаг (cli/main.py) запускается ДО `run_pipeline()`, поэтому pipeline_id ещё не существует. +PM-логи недоступны в pipeline_log — это ограничение текущей архитектуры. +Для KIN-084 MVP: логируем от момента создания pipeline_record до финала. diff --git a/tests/test_api.py b/tests/test_api.py index 47714e3..80cfa04 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2199,3 +2199,69 @@ def test_patch_project_test_command_null_returns_400(client): r = client.patch("/api/projects/p1", json={"test_command": None}) assert r.status_code == 400 assert "Nothing to update" in r.json()["detail"] + + +# --------------------------------------------------------------------------- +# KIN-084: GET /api/pipelines/{pipeline_id}/logs +# --------------------------------------------------------------------------- + +def _seed_pipeline(db_path, project_id="p1", task_id="P1-001") -> int: + """Создаёт pipeline в БД и возвращает его id.""" + from core.db import init_db + from core import models + conn = init_db(db_path) + pipe = models.create_pipeline(conn, task_id, project_id, "feature", [{"role": "dev"}]) + conn.close() + return pipe["id"] + + +def test_get_pipeline_logs_returns_200_empty(client): + """KIN-084: GET /api/pipelines/{id}/logs → 200 с пустым списком если логов нет.""" + pid = _seed_pipeline(api_module.DB_PATH) + r = client.get(f"/api/pipelines/{pid}/logs") + assert r.status_code == 200 + assert r.json() == [] + + +def test_get_pipeline_logs_returns_entries(client): + """KIN-084: GET /api/pipelines/{id}/logs → список записей с нужными полями.""" + from core.db import init_db + from core import models + pid = _seed_pipeline(api_module.DB_PATH) + conn = init_db(api_module.DB_PATH) + models.write_log(conn, pid, "Pipeline started", extra={"steps_count": 3}) + models.write_log(conn, pid, "Step 1 done") + conn.close() + + r = client.get(f"/api/pipelines/{pid}/logs") + assert r.status_code == 200 + logs = r.json() + assert len(logs) == 2 + assert logs[0]["message"] == "Pipeline started" + assert isinstance(logs[0]["extra_json"], dict) + assert logs[0]["extra_json"]["steps_count"] == 3 + assert logs[1]["message"] == "Step 1 done" + + +def test_get_pipeline_logs_since_id_filters(client): + """KIN-084: ?since_id=N возвращает только id > N.""" + from core.db import init_db + from core import models + pid = _seed_pipeline(api_module.DB_PATH, task_id="P1-001") + conn = init_db(api_module.DB_PATH) + e1 = models.write_log(conn, pid, "Entry 1") + models.write_log(conn, pid, "Entry 2") + models.write_log(conn, pid, "Entry 3") + conn.close() + + r = client.get(f"/api/pipelines/{pid}/logs?since_id={e1['id']}") + assert r.status_code == 200 + logs = r.json() + assert len(logs) == 2 + assert all(log["id"] > e1["id"] for log in logs) + + +def test_get_pipeline_logs_not_found(client): + """KIN-084: GET /api/pipelines/9999/logs → 404.""" + r = client.get("/api/pipelines/9999/logs") + assert r.status_code == 404 diff --git a/tests/test_db.py b/tests/test_db.py index d3d3a70..a622171 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -502,3 +502,42 @@ def test_migrate_pid_guard_is_idempotent(): assert before == after assert "pid" in after conn.close() + + +# --------------------------------------------------------------------------- +# Schema KIN-084: pipeline_log в SCHEMA-строке и _migrate() +# Конвенция #387: DDL-парсинг SCHEMA-строки +# --------------------------------------------------------------------------- + +def test_schema_string_contains_pipeline_log_table(): + """Регрессия KIN-084: SCHEMA-строка содержит CREATE TABLE pipeline_log.""" + from core.db import SCHEMA + assert "CREATE TABLE IF NOT EXISTS pipeline_log" in SCHEMA + + +def test_schema_pipeline_log_has_required_columns(): + """SCHEMA pipeline_log содержит все обязательные колонки.""" + from core.db import SCHEMA + start = SCHEMA.index("CREATE TABLE IF NOT EXISTS pipeline_log") + end = SCHEMA.index(");", start) + 2 + ddl = SCHEMA[start:end] + assert "pipeline_id INTEGER" in ddl + assert "ts " in ddl + assert "level " in ddl + assert "message " in ddl + assert "extra_json " in ddl + + +def test_migrate_creates_pipeline_log_on_fresh_init(conn): + """Свежая инициализация: pipeline_log существует.""" + tables = {r["name"] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} + assert "pipeline_log" in tables + + +def test_migrate_creates_pipeline_log_if_missing(): + """_migrate создаёт pipeline_log если таблица отсутствует в старой БД.""" + conn = _old_schema_conn() + _migrate(conn) + tables = {r["name"] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} + assert "pipeline_log" in tables + conn.close() diff --git a/tests/test_kin_092_watch_ps.py b/tests/test_kin_092_watch_ps.py new file mode 100644 index 0000000..89503ce --- /dev/null +++ b/tests/test_kin_092_watch_ps.py @@ -0,0 +1,464 @@ +"""Tests for KIN-092: cli/watch.py — kin watch and kin ps commands. + +Покрывает: +- _format_elapsed: секунды, минуты, часы, ноль, отрицательное, невалидное +- _parse_total_steps: list, JSON-строка, None, невалидный JSON, не-list JSON +- get_pipeline_for_watch: только top-level, самый свежий, None если нет +- get_current_agent_log: последний лог, фильтрация по дате пайплайна +- get_all_running_pipelines: только running, PID=NULL, current_agent +- _render_ps: пустой список, PID=NULL → '-', обрезка title +- _render_watch: нет пайплайна, PID=NULL → '-', terminal status, output lines +- cmd_watch: несуществующий task_id → понятная ошибка, sleep не вызывается +""" + +import pytest +from datetime import datetime, timedelta +from unittest.mock import patch + +from core.db import init_db +from core import models +from cli.watch import ( + _format_elapsed, + _parse_total_steps, + _render_watch, + _render_ps, + cmd_watch, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def conn(): + """Fresh in-memory DB for each test.""" + c = init_db(db_path=":memory:") + yield c + c.close() + + +@pytest.fixture +def project_and_task(conn): + """Create a project and a task; return (project, task).""" + proj = models.create_project(conn, "p1", "Project One", "/path/one", tech_stack=["python"]) + task = models.create_task(conn, "KIN-092-T1", "p1", "Watch task") + return proj, task + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _call_format_elapsed(dt_iso: str, elapsed_seconds: int) -> str: + """Call _format_elapsed with datetime.utcnow frozen to (started + elapsed_seconds).""" + normalized = dt_iso.replace(' ', 'T') if ' ' in dt_iso else dt_iso + started = datetime.fromisoformat(normalized) + frozen_now = started + timedelta(seconds=elapsed_seconds) + + with patch("cli.watch.datetime") as mock_dt: + mock_dt.fromisoformat = datetime.fromisoformat + mock_dt.utcnow.return_value = frozen_now + return _format_elapsed(dt_iso) + + +# --------------------------------------------------------------------------- +# _format_elapsed +# --------------------------------------------------------------------------- + +def test_format_elapsed_seconds(): + assert _call_format_elapsed("2026-03-17 12:00:00", 45) == "45s" + + +def test_format_elapsed_exactly_one_minute(): + assert _call_format_elapsed("2026-03-17 12:00:00", 60) == "1m 0s" + + +def test_format_elapsed_minutes_and_seconds(): + assert _call_format_elapsed("2026-03-17 12:00:00", 90) == "1m 30s" + + +def test_format_elapsed_hours(): + assert _call_format_elapsed("2026-03-17 12:00:00", 3661) == "1h 1m" + + +def test_format_elapsed_zero(): + """Нулевое время → '0s'.""" + assert _call_format_elapsed("2026-03-17 12:00:00", 0) == "0s" + + +def test_format_elapsed_negative_clipped_to_zero(): + """Отрицательный elapsed (будущая дата) → '0s', не падает.""" + assert _call_format_elapsed("2026-03-17 12:00:00", -10) == "0s" + + +def test_format_elapsed_sqlite_space_format(): + """SQLite хранит 'YYYY-MM-DD HH:MM:SS' через пробел — должно парситься.""" + assert _call_format_elapsed("2026-03-17 10:30:00", 65) == "1m 5s" + + +def test_format_elapsed_invalid_string(): + assert _format_elapsed("not-a-date") == "?" + + +def test_format_elapsed_none(): + assert _format_elapsed(None) == "?" + + +# --------------------------------------------------------------------------- +# _parse_total_steps +# --------------------------------------------------------------------------- + +def test_parse_total_steps_list(): + assert _parse_total_steps(["pm", "dev", "qa"]) == 3 + + +def test_parse_total_steps_empty_list(): + assert _parse_total_steps([]) == 0 + + +def test_parse_total_steps_json_string(): + assert _parse_total_steps('["pm", "dev", "qa"]') == 3 + + +def test_parse_total_steps_json_string_empty_array(): + assert _parse_total_steps('[]') == 0 + + +def test_parse_total_steps_none(): + assert _parse_total_steps(None) == "?" + + +def test_parse_total_steps_invalid_json(): + assert _parse_total_steps("not-json") == "?" + + +def test_parse_total_steps_json_non_list(): + """JSON-объект (не массив) → '?'.""" + assert _parse_total_steps('{"key": "value"}') == "?" + + +# --------------------------------------------------------------------------- +# SQL: get_pipeline_for_watch +# --------------------------------------------------------------------------- + +def test_get_pipeline_for_watch_returns_none_when_no_pipeline(conn, project_and_task): + _, task = project_and_task + result = models.get_pipeline_for_watch(conn, task["id"]) + assert result is None + + +def test_get_pipeline_for_watch_returns_top_level_pipeline(conn, project_and_task): + _, task = project_and_task + pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"]) + result = models.get_pipeline_for_watch(conn, task["id"]) + assert result is not None + assert result["id"] == pipeline["id"] + + +def test_get_pipeline_for_watch_excludes_sub_pipelines(conn, project_and_task): + """Саб-пайплайны (parent_pipeline_id IS NOT NULL) должны игнорироваться.""" + _, task = project_and_task + parent = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"]) + # Создаём саб-пайплайн вручную + conn.execute( + "INSERT INTO pipelines (task_id, project_id, route_type, steps, status, parent_pipeline_id)" + " VALUES (?, ?, ?, ?, ?, ?)", + (task["id"], "p1", "sub", "[]", "running", parent["id"]), + ) + conn.commit() + result = models.get_pipeline_for_watch(conn, task["id"]) + assert result["id"] == parent["id"] + assert result.get("parent_pipeline_id") is None + + +def test_get_pipeline_for_watch_returns_most_recent(conn, project_and_task): + """Если пайплайнов несколько — возвращает самый свежий.""" + _, task = project_and_task + p1 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"]) + p2 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"]) + result = models.get_pipeline_for_watch(conn, task["id"]) + assert result["id"] == p2["id"] + + +# --------------------------------------------------------------------------- +# SQL: get_current_agent_log +# --------------------------------------------------------------------------- + +def test_get_current_agent_log_returns_none_when_no_logs(conn, project_and_task): + _, task = project_and_task + pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", []) + result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"]) + assert result is None + + +def test_get_current_agent_log_returns_most_recent(conn, project_and_task): + _, task = project_and_task + pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", []) + models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"], output_summary="first run") + models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"], output_summary="second run") + result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"]) + assert result["agent_role"] == "dev" + assert result["output_summary"] == "second run" + + +# --------------------------------------------------------------------------- +# SQL: get_all_running_pipelines +# --------------------------------------------------------------------------- + +def test_get_all_running_pipelines_empty_when_no_pipelines(conn): + assert models.get_all_running_pipelines(conn) == [] + + +def test_get_all_running_pipelines_returns_only_running(conn, project_and_task): + """completed и failed пайплайны не должны попадать в kin ps.""" + _, task = project_and_task + running = models.create_pipeline(conn, task["id"], "p1", "standard", []) + completed = models.create_pipeline(conn, task["id"], "p1", "standard", []) + models.update_pipeline(conn, completed["id"], status="completed") + failed = models.create_pipeline(conn, task["id"], "p1", "standard", []) + models.update_pipeline(conn, failed["id"], status="failed") + + result = models.get_all_running_pipelines(conn) + ids = [r["id"] for r in result] + assert running["id"] in ids + assert completed["id"] not in ids + assert failed["id"] not in ids + + +def test_get_all_running_pipelines_pid_null_is_none(conn, project_and_task): + """PID=NULL → решение #374: отображается как '-' (pid=None в dict).""" + _, task = project_and_task + models.create_pipeline(conn, task["id"], "p1", "standard", []) + result = models.get_all_running_pipelines(conn) + assert len(result) == 1 + assert result[0]["pid"] is None + + +def test_get_all_running_pipelines_includes_current_agent(conn, project_and_task): + """current_agent = последний agent_role из agent_logs.""" + _, task = project_and_task + pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", []) + models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"]) + models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"]) + result = models.get_all_running_pipelines(conn) + assert result[0]["current_agent"] == "dev" + + +def test_get_all_running_pipelines_no_logs_current_agent_is_none(conn, project_and_task): + _, task = project_and_task + models.create_pipeline(conn, task["id"], "p1", "standard", []) + result = models.get_all_running_pipelines(conn) + assert result[0]["current_agent"] is None + + +def test_get_all_running_pipelines_includes_project_and_task_info(conn, project_and_task): + proj, task = project_and_task + models.create_pipeline(conn, task["id"], "p1", "standard", []) + result = models.get_all_running_pipelines(conn) + assert result[0]["project_name"] == proj["name"] + assert result[0]["title"] == task["title"] + + +# --------------------------------------------------------------------------- +# _render_ps (stdout) +# --------------------------------------------------------------------------- + +def test_render_ps_empty_list(capsys): + _render_ps([]) + out = capsys.readouterr().out + assert "No running pipelines." in out + + +def test_render_ps_suggests_watch_command_when_empty(capsys): + _render_ps([]) + out = capsys.readouterr().out + assert "kin watch" in out + + +def test_render_ps_pid_null_displayed_as_dash(capsys): + """Decision #374: PID=NULL → '-'.""" + rows = [{ + "id": 1, "task_id": "KIN-001", "title": "Short title", + "project_name": "MyProject", "pid": None, + "created_at": "2026-03-17 12:00:00", "current_agent": "dev", + "parent_pipeline_id": None, + }] + with patch("cli.watch._format_elapsed", return_value="5s"): + _render_ps(rows) + out = capsys.readouterr().out + # PID column should contain dash, not None or empty + lines = [l for l in out.splitlines() if "Short title" in l or "#1" in l] + assert any("-" in l for l in lines) + + +def test_render_ps_title_truncated_at_20_chars(capsys): + long_title = "A" * 25 + rows = [{ + "id": 1, "task_id": "KIN-001", "title": long_title, + "project_name": "P", "pid": 1234, + "created_at": "2026-03-17 12:00:00", "current_agent": None, + "parent_pipeline_id": None, + }] + with patch("cli.watch._format_elapsed", return_value="1s"): + _render_ps(rows) + out = capsys.readouterr().out + assert "A" * 20 + "…" in out + assert long_title not in out + + +def test_render_ps_with_pid_shows_pid(capsys): + rows = [{ + "id": 2, "task_id": "KIN-002", "title": "Fix bug", + "project_name": "Proj", "pid": 9999, + "created_at": "2026-03-17 11:00:00", "current_agent": "pm", + "parent_pipeline_id": None, + }] + with patch("cli.watch._format_elapsed", return_value="10m"): + _render_ps(rows) + out = capsys.readouterr().out + assert "9999" in out + + +def test_render_ps_shows_running_count(capsys): + rows = [ + {"id": 1, "task_id": "KIN-001", "title": "T1", "project_name": "P", + "pid": None, "created_at": "2026-03-17 12:00:00", + "current_agent": None, "parent_pipeline_id": None}, + {"id": 2, "task_id": "KIN-002", "title": "T2", "project_name": "P", + "pid": 42, "created_at": "2026-03-17 12:01:00", + "current_agent": "dev", "parent_pipeline_id": None}, + ] + with patch("cli.watch._format_elapsed", return_value="1s"): + _render_ps(rows) + out = capsys.readouterr().out + assert "2" in out + + +# --------------------------------------------------------------------------- +# _render_watch (stdout) +# --------------------------------------------------------------------------- + +def test_render_watch_no_pipeline(capsys): + task = {"id": "KIN-001", "title": "Test task", "status": "pending"} + _render_watch(task, None, None, 0, "?") + out = capsys.readouterr().out + assert "No pipeline started yet." in out + + +def test_render_watch_pipeline_null_pid_shows_dash(capsys): + """PID=NULL → 'PID: -'.""" + task = {"id": "KIN-001", "title": "Test task", "status": "in_progress"} + pipeline = { + "id": 1, "status": "running", "pid": None, + "created_at": "2026-03-17 12:00:00", "steps": [], + } + with patch("cli.watch._format_elapsed", return_value="30s"): + _render_watch(task, pipeline, None, 0, 3) + out = capsys.readouterr().out + assert "PID: -" in out + + +def test_render_watch_pipeline_with_pid(capsys): + task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"} + pipeline = { + "id": 1, "status": "running", "pid": 5678, + "created_at": "2026-03-17 12:00:00", "steps": [], + } + with patch("cli.watch._format_elapsed", return_value="10s"): + _render_watch(task, pipeline, None, 0, 2) + out = capsys.readouterr().out + assert "5678" in out + + +def test_render_watch_terminal_status_completed(capsys): + task = {"id": "KIN-001", "title": "Done task", "status": "done"} + pipeline = { + "id": 1, "status": "completed", "pid": 123, + "created_at": "2026-03-17 12:00:00", "steps": [], + } + with patch("cli.watch._format_elapsed", return_value="1m"): + _render_watch(task, pipeline, None, 2, 3) + out = capsys.readouterr().out + assert "[Pipeline completed. Exiting.]" in out + + +def test_render_watch_terminal_status_failed(capsys): + task = {"id": "KIN-001", "title": "Failed task", "status": "failed"} + pipeline = { + "id": 1, "status": "failed", "pid": 123, + "created_at": "2026-03-17 12:00:00", "steps": [], + } + with patch("cli.watch._format_elapsed", return_value="2m"): + _render_watch(task, pipeline, None, 1, 3) + out = capsys.readouterr().out + assert "[Pipeline failed. Exiting.]" in out + + +def test_render_watch_running_shows_update_hint(capsys): + task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"} + pipeline = { + "id": 1, "status": "running", "pid": 42, + "created_at": "2026-03-17 12:00:00", "steps": [], + } + with patch("cli.watch._format_elapsed", return_value="10s"): + _render_watch(task, pipeline, None, 0, 2) + out = capsys.readouterr().out + assert "[Updating every 5s. Ctrl+C to stop]" in out + + +def test_render_watch_truncates_long_title(capsys): + long_title = "B" * 55 + task = {"id": "KIN-001", "title": long_title, "status": "pending"} + _render_watch(task, None, None, 0, "?") + out = capsys.readouterr().out + assert "…" in out + assert long_title not in out + + +def test_render_watch_shows_last_15_output_lines(capsys): + """Output truncated к последним 15 строкам (аналог tail -f).""" + task = {"id": "KIN-001", "title": "T", "status": "in_progress"} + pipeline = { + "id": 1, "status": "running", "pid": 1, + "created_at": "2026-03-17 12:00:00", "steps": [], + } + lines = [f"line{i}" for i in range(20)] + log = {"agent_role": "dev", "output_summary": "\n".join(lines)} + with patch("cli.watch._format_elapsed", return_value="1s"): + _render_watch(task, pipeline, log, 1, 3) + out = capsys.readouterr().out + assert "line19" in out # последняя строка — должна быть + assert "line5" in out # line5 = 6-я с начала, входит в последние 15 + assert "line4" not in out # line4 = 5-я с начала, отсекается + + +def test_render_watch_waiting_message_when_no_log(capsys): + """Нет логов агента → 'Waiting for first agent...'.""" + task = {"id": "KIN-001", "title": "New task", "status": "pending"} + pipeline = { + "id": 1, "status": "running", "pid": None, + "created_at": "2026-03-17 12:00:00", "steps": [], + } + with patch("cli.watch._format_elapsed", return_value="0s"): + _render_watch(task, pipeline, None, 0, "?") + out = capsys.readouterr().out + assert "Waiting for first agent" in out + + +# --------------------------------------------------------------------------- +# cmd_watch: nonexistent task_id +# --------------------------------------------------------------------------- + +def test_cmd_watch_nonexistent_task_prints_error(conn, capsys): + cmd_watch(conn, "KIN-NONEXISTENT") + out = capsys.readouterr().out + assert "KIN-NONEXISTENT" in out + assert "not found" in out.lower() + + +def test_cmd_watch_nonexistent_task_does_not_sleep(conn): + """cmd_watch должен вернуться немедленно, без вызова time.sleep.""" + with patch("cli.watch.time.sleep") as mock_sleep: + cmd_watch(conn, "KIN-NONEXISTENT") + mock_sleep.assert_not_called() diff --git a/tests/test_models.py b/tests/test_models.py index d32ea46..b7aeec4 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -757,3 +757,76 @@ def test_test_command_can_be_set(conn): models.create_project(conn, "prj_tc2", "TC Project 2", "/tmp/tc2") updated = models.update_project(conn, "prj_tc2", test_command="pytest -v --tb=short") assert updated["test_command"] == "pytest -v --tb=short" + + +# --------------------------------------------------------------------------- +# KIN-084: write_log() и get_pipeline_logs() +# --------------------------------------------------------------------------- + +@pytest.fixture +def pipeline_conn(conn): + """Возвращает (conn, pipeline_id) для тестов write_log / get_pipeline_logs.""" + models.create_project(conn, "plog_proj", "Log Project", "/tmp/plog") + models.create_task(conn, "PLOG-001", "plog_proj", "Log Task") + pipe = models.create_pipeline(conn, "PLOG-001", "plog_proj", "feature", [{"role": "dev"}]) + return conn, pipe["id"] + + +def test_write_log_returns_dict(pipeline_conn): + """KIN-084: write_log возвращает dict с id, pipeline_id, message, level.""" + db, pid = pipeline_conn + entry = models.write_log(db, pid, "Pipeline started") + assert isinstance(entry, dict) + assert entry["id"] is not None + assert entry["pipeline_id"] == pid + assert entry["message"] == "Pipeline started" + assert entry["level"] == "INFO" + assert entry["ts"] is not None + + +def test_write_log_extra_none_gives_null(pipeline_conn): + """KIN-084: write_log без extra → extra_json=None.""" + db, pid = pipeline_conn + entry = models.write_log(db, pid, "No extra", extra=None) + assert entry["extra_json"] is None + + +def test_write_log_extra_dict_decoded(pipeline_conn): + """KIN-084: write_log с extra=dict → extra_json декодируется в dict.""" + db, pid = pipeline_conn + entry = models.write_log(db, pid, "With extra", extra={"role": "dev", "model": "sonnet"}) + assert isinstance(entry["extra_json"], dict) + assert entry["extra_json"]["role"] == "dev" + assert entry["extra_json"]["model"] == "sonnet" + + +def test_get_pipeline_logs_since_id_zero_returns_all(pipeline_conn): + """KIN-084: get_pipeline_logs(since_id=0) возвращает все записи.""" + db, pid = pipeline_conn + models.write_log(db, pid, "Entry 1") + models.write_log(db, pid, "Entry 2") + models.write_log(db, pid, "Entry 3") + logs = models.get_pipeline_logs(db, pid, since_id=0) + assert len(logs) == 3 + + +def test_get_pipeline_logs_since_id_filters(pipeline_conn): + """KIN-084: get_pipeline_logs(since_id=N) возвращает только id > N.""" + db, pid = pipeline_conn + e1 = models.write_log(db, pid, "Entry 1") + models.write_log(db, pid, "Entry 2") + models.write_log(db, pid, "Entry 3") + logs = models.get_pipeline_logs(db, pid, since_id=e1["id"]) + assert len(logs) == 2 + assert all(log["id"] > e1["id"] for log in logs) + + +def test_get_pipeline_logs_ordered_asc(pipeline_conn): + """KIN-084: get_pipeline_logs возвращает записи в хронологическом порядке.""" + db, pid = pipeline_conn + models.write_log(db, pid, "First") + models.write_log(db, pid, "Second") + models.write_log(db, pid, "Third") + logs = models.get_pipeline_logs(db, pid) + ids = [log["id"] for log in logs] + assert ids == sorted(ids) diff --git a/web/api.py b/web/api.py index 689d832..cce912b 100644 --- a/web/api.py +++ b/web/api.py @@ -226,12 +226,19 @@ class ProjectCreate(BaseModel): return self +VALID_DEPLOY_RUNTIMES = {"docker", "node", "python", "static"} + + class ProjectPatch(BaseModel): execution_mode: str | None = None autocommit_enabled: bool | None = None auto_test_enabled: bool | None = None obsidian_vault_path: str | None = None deploy_command: str | None = None + deploy_host: str | None = None + deploy_path: str | None = None + deploy_runtime: str | None = None + deploy_restart_cmd: str | None = None test_command: str | None = None project_type: str | None = None ssh_host: str | None = None @@ -246,6 +253,8 @@ def patch_project(project_id: str, body: ProjectPatch): body.execution_mode, body.autocommit_enabled is not None, body.auto_test_enabled is not None, body.obsidian_vault_path, body.deploy_command is not None, + body.deploy_host is not None, body.deploy_path is not None, + body.deploy_runtime is not None, body.deploy_restart_cmd is not None, body.test_command is not None, body.project_type, body.ssh_host is not None, body.ssh_user is not None, body.ssh_key_path is not None, @@ -257,6 +266,8 @@ def patch_project(project_id: str, body: ProjectPatch): raise HTTPException(400, f"Invalid execution_mode '{body.execution_mode}'. Must be one of: {', '.join(VALID_EXECUTION_MODES)}") if body.project_type is not None and body.project_type not in VALID_PROJECT_TYPES: raise HTTPException(400, f"Invalid project_type '{body.project_type}'. Must be one of: {', '.join(VALID_PROJECT_TYPES)}") + if body.deploy_runtime is not None and body.deploy_runtime != "" and body.deploy_runtime not in VALID_DEPLOY_RUNTIMES: + raise HTTPException(400, f"Invalid deploy_runtime '{body.deploy_runtime}'. Must be one of: {', '.join(sorted(VALID_DEPLOY_RUNTIMES))}") conn = get_conn() p = models.get_project(conn, project_id) if not p: @@ -274,6 +285,14 @@ def patch_project(project_id: str, body: ProjectPatch): if body.deploy_command is not None: # Empty string = sentinel for clearing (decision #68) fields["deploy_command"] = None if body.deploy_command == "" else body.deploy_command + if body.deploy_host is not None: + fields["deploy_host"] = None if body.deploy_host == "" else body.deploy_host + if body.deploy_path is not None: + fields["deploy_path"] = None if body.deploy_path == "" else body.deploy_path + if body.deploy_runtime is not None: + fields["deploy_runtime"] = None if body.deploy_runtime == "" else body.deploy_runtime + if body.deploy_restart_cmd is not None: + fields["deploy_restart_cmd"] = None if body.deploy_restart_cmd == "" else body.deploy_restart_cmd if body.test_command is not None: fields["test_command"] = body.test_command if body.project_type is not None: @@ -325,19 +344,42 @@ def sync_obsidian_endpoint(project_id: str): @app.post("/api/projects/{project_id}/deploy") def deploy_project(project_id: str): - """Execute deploy_command for a project. Returns stdout/stderr/exit_code. + """Deploy a project using structured runtime steps or legacy deploy_command. - # WARNING: shell=True — deploy_command is admin-only, set in Settings by the project owner. + New-style (deploy_runtime set): uses core/deploy.py — structured steps, + cascades to dependent projects via project_links. + Legacy fallback (only deploy_command set): runs deploy_command via shell. + + WARNING: shell=True — deploy commands are admin-only, set in Settings by the project owner. """ import time + from core.deploy import deploy_with_dependents + conn = get_conn() p = models.get_project(conn, project_id) - conn.close() if not p: + conn.close() raise HTTPException(404, f"Project '{project_id}' not found") + + deploy_runtime = p.get("deploy_runtime") deploy_command = p.get("deploy_command") - if not deploy_command: - raise HTTPException(400, "deploy_command not set for this project") + + if not deploy_runtime and not deploy_command: + conn.close() + raise HTTPException(400, "Neither deploy_runtime nor deploy_command is set for this project") + + if deploy_runtime: + # New structured deploy with dependency cascade + try: + result = deploy_with_dependents(conn, project_id) + except Exception as e: + conn.close() + raise HTTPException(500, f"Deploy failed: {e}") + conn.close() + return result + + # Legacy fallback: run deploy_command via shell + conn.close() cwd = p.get("path") or None start = time.monotonic() try: @@ -386,6 +428,57 @@ def create_project(body: ProjectCreate): return p +# --------------------------------------------------------------------------- +# Project Links (KIN-079) +# --------------------------------------------------------------------------- + +class ProjectLinkCreate(BaseModel): + from_project: str + to_project: str + type: str + description: str | None = None + + +@app.post("/api/project-links") +def create_project_link(body: ProjectLinkCreate): + """Create a project dependency link.""" + conn = get_conn() + if not models.get_project(conn, body.from_project): + conn.close() + raise HTTPException(404, f"Project '{body.from_project}' not found") + if not models.get_project(conn, body.to_project): + conn.close() + raise HTTPException(404, f"Project '{body.to_project}' not found") + link = models.create_project_link( + conn, body.from_project, body.to_project, body.type, body.description + ) + conn.close() + return link + + +@app.get("/api/projects/{project_id}/links") +def get_project_links(project_id: str): + """Get all project links where project is from or to.""" + conn = get_conn() + if not models.get_project(conn, project_id): + conn.close() + raise HTTPException(404, f"Project '{project_id}' not found") + links = models.get_project_links(conn, project_id) + conn.close() + return links + + +@app.delete("/api/project-links/{link_id}", status_code=204) +def delete_project_link(link_id: int): + """Delete a project link by id.""" + conn = get_conn() + deleted = models.delete_project_link(conn, link_id) + conn.close() + if not deleted: + raise HTTPException(404, f"Link {link_id} not found") + return Response(status_code=204) + + # --------------------------------------------------------------------------- # Phases (KIN-059) # --------------------------------------------------------------------------- @@ -670,6 +763,19 @@ def patch_task(task_id: str, body: TaskPatch): return t +@app.get("/api/pipelines/{pipeline_id}/logs") +def get_pipeline_logs(pipeline_id: int, since_id: int = 0): + """Get pipeline log entries after since_id (for live console polling).""" + conn = get_conn() + row = conn.execute("SELECT id FROM pipelines WHERE id = ?", (pipeline_id,)).fetchone() + if not row: + conn.close() + raise HTTPException(404, f"Pipeline {pipeline_id} not found") + logs = models.get_pipeline_logs(conn, pipeline_id, since_id=since_id) + conn.close() + return logs + + @app.get("/api/tasks/{task_id}/pipeline") def get_task_pipeline(task_id: str): """Get agent_logs for a task (pipeline steps)."""