kin: auto-commit after pipeline

This commit is contained in:
Gros Frumos 2026-03-17 17:26:31 +02:00
parent 17d7806838
commit eab9e951ab
12 changed files with 1696 additions and 5 deletions

View file

@ -1058,6 +1058,17 @@ def _execute_department_head_step(
# Blocked status from dept head
if parsed.get("status") == "blocked":
reason = parsed.get("blocked_reason", "Department head reported blocked")
# KIN-084: log dept head blocked
if parent_pipeline_id:
try:
models.write_log(
conn, parent_pipeline_id,
f"Dept {step['role']} blocked: {reason}",
level="WARN",
extra={"role": step["role"], "blocked_reason": reason},
)
except Exception:
pass
return {
"success": False,
"output": json.dumps(parsed, ensure_ascii=False),
@ -1095,6 +1106,18 @@ def _execute_department_head_step(
},
}, ensure_ascii=False)
# KIN-084: log sub-pipeline start
if parent_pipeline_id:
try:
sub_roles = [s.get("role", "") for s in sub_pipeline]
models.write_log(
conn, parent_pipeline_id,
f"Sub-pipeline start: dept={dept_name}, steps={len(sub_pipeline)}",
extra={"dept_name": dept_name, "sub_steps": len(sub_pipeline), "sub_roles": sub_roles},
)
except Exception:
pass
# Run the sub-pipeline (noninteractive=True — Opus already reviewed the plan)
# pass parent_pipeline_id and department so run_pipeline creates the child
# pipeline with correct attributes (route_type='dept_sub') — no double create
@ -1108,6 +1131,19 @@ def _execute_department_head_step(
department=dept_name,
)
# KIN-084: log sub-pipeline done
if parent_pipeline_id:
try:
sub_success = sub_result.get("success", False)
models.write_log(
conn, parent_pipeline_id,
f"Sub-pipeline done: dept={dept_name}, success={sub_success}, steps={sub_result.get('steps_completed', 0)}",
level="INFO" if sub_success else "ERROR",
extra={"dept_name": dept_name, "success": sub_success, "steps_completed": sub_result.get("steps_completed", 0)},
)
except Exception:
pass
# Extract decisions from sub-pipeline results for handoff
decisions_made = []
sub_results = sub_result.get("results", [])
@ -1268,6 +1304,15 @@ def run_pipeline(
# Save PID so watchdog can detect dead subprocesses (KIN-099)
pipeline = models.update_pipeline(conn, pipeline["id"], pid=os.getpid())
models.update_task(conn, task_id, status="in_progress")
# KIN-084: log pipeline start
try:
models.write_log(
conn, pipeline["id"],
f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}",
extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode},
)
except Exception:
pass
results = []
total_cost = 0.0
@ -1281,6 +1326,19 @@ def run_pipeline(
model = step.get("model", "sonnet")
brief = step.get("brief")
# KIN-084: log step start
try:
if pipeline:
brief_preview = (step.get("brief") or "")[:100]
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} start: role={role}, model={model}",
level="INFO",
extra={"role": role, "model": model, "brief_preview": brief_preview},
)
except Exception:
pass
# Check parent process is still alive (KIN-099 watchdog)
if not dry_run and pipeline:
if _check_parent_alive(conn, pipeline, task_id, project_id):
@ -1335,6 +1393,16 @@ def run_pipeline(
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
# KIN-084: log step exception
try:
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} exception: {exc}",
level="ERROR",
extra={"role": role, "exc_type": type(exc).__name__},
)
except Exception:
pass
models.log_agent_run(
conn,
project_id=project_id,
@ -1382,6 +1450,23 @@ def run_pipeline(
total_tokens += result.get("tokens_used") or 0
total_duration += result.get("duration_seconds") or 0
# KIN-084: log step done
try:
if pipeline:
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} done: role={role}, success={result.get('success')}",
level="INFO",
extra={
"role": role,
"tokens_used": result.get("tokens_used") or 0,
"cost_usd": result.get("cost_usd") or 0,
"duration_seconds": result.get("duration_seconds") or 0,
},
)
except Exception:
pass
if not result["success"]:
# Auto mode: retry once with allow_write on permission error
if mode == "auto_complete" and not allow_write and _is_permission_error(result):
@ -1433,6 +1518,17 @@ def run_pipeline(
total_duration_seconds=total_duration,
)
agent_error = result.get("error") or ""
# KIN-084: log step failed
try:
if pipeline:
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} failed: {agent_error}",
level="ERROR",
extra={"role": role, "error": agent_error},
)
except Exception:
pass
error_msg = f"Step {i+1}/{len(steps)} ({role}) failed"
if agent_error:
error_msg += f": {agent_error}"
@ -1499,6 +1595,16 @@ def run_pipeline(
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
# KIN-084: log step blocked
try:
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} blocked: {blocked_info['reason']}",
level="WARN",
extra={"role": role, "reason": blocked_info["reason"]},
)
except Exception:
pass
models.update_task(
conn, task_id,
status="blocked",
@ -1746,6 +1852,20 @@ def run_pipeline(
total_tokens=total_tokens,
total_duration_seconds=total_duration,
)
# KIN-084: log pipeline completed
try:
models.write_log(
conn, pipeline["id"],
f"Pipeline completed: {len(steps)} steps, cost=${total_cost:.4f}, tokens={total_tokens}, duration={total_duration}s",
extra={
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"steps_count": len(steps),
},
)
except Exception:
pass
task_modules = models.get_modules(conn, project_id)
@ -1771,6 +1891,15 @@ def run_pipeline(
elif mode == "auto_complete" and auto_eligible:
# Auto-complete mode: last step is tester/reviewer — skip review, approve immediately
models.update_task(conn, task_id, status="done")
# KIN-084: log task status
try:
models.write_log(
conn, pipeline["id"],
"Task status: done",
extra={"task_status": "done", "mode": mode},
)
except Exception:
pass
try:
run_hooks(conn, project_id, task_id,
event="task_auto_approved", task_modules=task_modules)
@ -1800,6 +1929,15 @@ def run_pipeline(
else:
# Review mode: wait for manual approval
models.update_task(conn, task_id, status="review", execution_mode="review")
# KIN-084: log task status
try:
models.write_log(
conn, pipeline["id"],
"Task status: review",
extra={"task_status": "review", "mode": mode},
)
except Exception:
pass
# Run post-pipeline hooks (failures don't affect pipeline status)
try:

View file

@ -703,6 +703,27 @@ def run_task(ctx, task_id, dry_run, allow_write):
click.echo(f"Duration: {result['total_duration_seconds']}s")
# ===========================================================================
# watch / ps
# ===========================================================================
@cli.command("watch")
@click.argument("task_id")
@click.pass_context
def watch(ctx, task_id):
"""Monitor a running task in real time (updates every 5s)."""
from cli.watch import cmd_watch
cmd_watch(ctx.obj["conn"], task_id)
@cli.command("ps")
@click.pass_context
def ps(ctx):
"""List all running pipelines with PID and current step."""
from cli.watch import cmd_ps
cmd_ps(ctx.obj["conn"])
# ===========================================================================
# audit
# ===========================================================================

177
cli/watch.py Normal file
View file

@ -0,0 +1,177 @@
"""
Kin CLI kin watch <task_id> and kin ps commands.
kin watch: polling monitor, updates every 5s (like tail -f for a pipeline).
kin ps: one-shot list of all running pipelines with PID and current step.
"""
import json
import time
from datetime import datetime
from core import models
def _clear_screen() -> None:
print('\033[2J\033[H', end='', flush=True)
def _format_elapsed(dt_iso: str) -> str:
"""Format elapsed time from SQLite CURRENT_TIMESTAMP (UTC naive, space separator)."""
try:
# SQLite stores "YYYY-MM-DD HH:MM:SS"; fromisoformat requires 'T' in Python < 3.11
normalized = dt_iso.replace(' ', 'T') if ' ' in dt_iso else dt_iso
started = datetime.fromisoformat(normalized)
total_secs = int((datetime.utcnow() - started).total_seconds())
if total_secs < 0:
total_secs = 0
hours = total_secs // 3600
minutes = (total_secs % 3600) // 60
secs = total_secs % 60
if hours > 0:
return f"{hours}h {minutes}m"
elif minutes > 0:
return f"{minutes}m {secs}s"
else:
return f"{secs}s"
except (ValueError, TypeError):
return "?"
def _parse_total_steps(steps) -> int | str:
"""Return total planned steps from pipeline.steps (may be list or raw JSON string)."""
if steps is None:
return '?'
if isinstance(steps, list):
return len(steps)
# Fallback: raw JSON string (legacy or unexpected path)
if isinstance(steps, str):
try:
parsed = json.loads(steps)
if isinstance(parsed, list):
return len(parsed)
except (json.JSONDecodeError, ValueError):
pass
return '?'
def _render_watch(
task: dict,
pipeline: dict | None,
log: dict | None,
step_num: int,
total_steps: int | str,
) -> None:
"""Render the watch screen to stdout."""
sep = '' * 60
title = task['title'] or ''
title_short = (title[:50] + '') if len(title) > 50 else title
print(f"kin watch {task['id']}")
print(sep)
print(f"Task: {task['id']} {title_short}")
print(f"Status: {task['status']}")
if pipeline:
pid_str = str(pipeline['pid']) if pipeline.get('pid') else '-'
print(f"Pipeline: #{pipeline['id']} [{pipeline['status']}] PID: {pid_str}")
elapsed = _format_elapsed(pipeline['created_at'])
print(f"Elapsed: {elapsed} | Agents run: {step_num} / Total planned: {total_steps}")
print()
current = log['agent_role'] if log else 'Waiting for first agent...'
print(f"Current agent: {current}")
else:
print()
print("No pipeline started yet.")
print(sep)
if log and log.get('output_summary'):
lines = (log['output_summary'] or '').splitlines()
for line in lines[-15:]:
print(line)
elif pipeline:
print("Waiting for first agent...")
print(sep)
terminal_statuses = ('completed', 'failed', 'cancelled')
if pipeline and pipeline['status'] in terminal_statuses:
print(f"[Pipeline {pipeline['status']}. Exiting.]")
else:
print("[Updating every 5s. Ctrl+C to stop]")
def _render_ps(rows: list[dict]) -> None:
"""Render the running pipelines table to stdout."""
if not rows:
print("No running pipelines.")
print("\nUse 'kin watch <task_id>' to monitor.")
return
print(f"Running pipelines: {len(rows)}\n")
headers = ["ID", "TASK", "PROJECT", "PID", "STARTED", "AGENT", "PARENT"]
col_widths = [len(h) for h in headers]
formatted_rows = []
for r in rows:
title = r.get('title') or ''
title_short = (title[:20] + '') if len(title) > 20 else title
pid_str = str(r['pid']) if r.get('pid') else '-'
started = (_format_elapsed(r['created_at']) + ' ago') if r.get('created_at') else '-'
agent = r.get('current_agent') or '-'
parent = str(r['parent_pipeline_id']) if r.get('parent_pipeline_id') else '-'
row = [f"#{r['id']}", title_short, r.get('project_name') or '-',
pid_str, started, agent, parent]
formatted_rows.append(row)
for i, cell in enumerate(row):
col_widths[i] = max(col_widths[i], len(str(cell)))
fmt = ' '.join(f'{{:<{w}}}' for w in col_widths)
print(fmt.format(*headers))
print(fmt.format(*('-' * w for w in col_widths)))
for row in formatted_rows:
print(fmt.format(*row))
print("\nUse 'kin watch <task_id>' to monitor.")
def cmd_watch(conn, task_id: str) -> None:
"""Polling monitor for a task. Updates every 5s. Exits on terminal pipeline state."""
import sqlite3
task = models.get_task(conn, task_id)
if not task:
print(f"Task '{task_id}' not found.")
return
try:
while True:
_clear_screen()
pipeline = models.get_pipeline_for_watch(conn, task_id)
if pipeline:
log = models.get_current_agent_log(conn, task_id, pipeline['created_at'])
step_num = conn.execute(
'SELECT COUNT(*) FROM agent_logs WHERE task_id = ? AND created_at >= ?',
(task_id, pipeline['created_at']),
).fetchone()[0]
total = _parse_total_steps(pipeline.get('steps'))
else:
log, step_num, total = None, 0, '?'
_render_watch(task, pipeline, log, step_num, total)
if pipeline and pipeline['status'] in ('completed', 'failed', 'cancelled'):
break
time.sleep(5)
except KeyboardInterrupt:
print('\nExiting watch mode.')
def cmd_ps(conn) -> None:
"""One-shot list of all running pipelines."""
rows = models.get_all_running_pipelines(conn)
_render_ps(rows)

View file

@ -29,6 +29,10 @@ CREATE TABLE IF NOT EXISTS projects (
ssh_key_path TEXT,
ssh_proxy_jump TEXT,
description TEXT,
deploy_host TEXT,
deploy_path TEXT,
deploy_runtime TEXT,
deploy_restart_cmd TEXT,
autocommit_enabled INTEGER DEFAULT 0,
obsidian_vault_path TEXT,
worktrees_enabled INTEGER DEFAULT 0,
@ -302,6 +306,18 @@ CREATE TABLE IF NOT EXISTS task_attachments (
);
CREATE INDEX IF NOT EXISTS idx_task_attachments_task ON task_attachments(task_id);
-- Live console log (KIN-084)
CREATE TABLE IF NOT EXISTS pipeline_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL REFERENCES pipelines(id),
ts TEXT NOT NULL DEFAULT (datetime('now')),
level TEXT NOT NULL DEFAULT 'INFO',
message TEXT NOT NULL,
extra_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id);
"""
@ -419,6 +435,22 @@ def _migrate(conn: sqlite3.Connection):
conn.execute("ALTER TABLE projects ADD COLUMN description TEXT")
conn.commit()
if "deploy_host" not in proj_cols:
conn.execute("ALTER TABLE projects ADD COLUMN deploy_host TEXT")
conn.commit()
if "deploy_path" not in proj_cols:
conn.execute("ALTER TABLE projects ADD COLUMN deploy_path TEXT")
conn.commit()
if "deploy_runtime" not in proj_cols:
conn.execute("ALTER TABLE projects ADD COLUMN deploy_runtime TEXT")
conn.commit()
if "deploy_restart_cmd" not in proj_cols:
conn.execute("ALTER TABLE projects ADD COLUMN deploy_restart_cmd TEXT")
conn.commit()
# Migrate audit_log + project_phases tables
existing_tables = {r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
@ -612,6 +644,20 @@ def _migrate(conn: sqlite3.Connection):
""")
conn.commit()
if "pipeline_log" not in existing_tables:
conn.executescript("""
CREATE TABLE IF NOT EXISTS pipeline_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL REFERENCES pipelines(id),
ts TEXT NOT NULL DEFAULT (datetime('now')),
level TEXT NOT NULL DEFAULT 'INFO',
message TEXT NOT NULL,
extra_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id);
""")
conn.commit()
# Migrate pipelines: add parent_pipeline_id and department columns (KIN-098)
# Guard: table may not exist in legacy schemas without pipelines (old test fixtures)
if "pipelines" in existing_tables:

223
core/deploy.py Normal file
View file

@ -0,0 +1,223 @@
"""
Kin structured deploy module.
Business logic for project deployments:
- Runtime-based step templates (docker/node/python/static)
- Local and SSH execution
- Dependency chain traversal via project_links
"""
import sqlite3
import subprocess
import time
VALID_RUNTIMES = {"docker", "node", "python", "static"}
# Base command templates per runtime.
# deploy_restart_cmd (if set) is appended as the final step for all runtimes.
RUNTIME_STEPS = {
"docker": ["git pull", "docker compose up -d --build"],
"node": ["git pull", "npm install --production", "pm2 restart all"],
"python": ["git pull", "pip install -r requirements.txt"],
"static": ["git pull", "nginx -s reload"],
}
def build_deploy_steps(project: dict) -> list[str]:
"""Build deploy command list based on runtime and project config.
Returns empty list if deploy_runtime is not set or invalid.
Appends deploy_restart_cmd as the last step if provided.
"""
runtime = project.get("deploy_runtime")
if not runtime or runtime not in RUNTIME_STEPS:
return []
steps = list(RUNTIME_STEPS[runtime])
restart_cmd = project.get("deploy_restart_cmd")
if restart_cmd and restart_cmd.strip():
steps.append(restart_cmd.strip())
return steps
def _build_ssh_cmd(project: dict, command: str) -> list[str]:
"""Build SSH subprocess command list to run a shell command on deploy_host."""
deploy_host = project.get("deploy_host") or project.get("ssh_host")
ssh_user = project.get("ssh_user") or "root"
ssh_key = project.get("ssh_key_path")
proxy_jump = project.get("ssh_proxy_jump")
deploy_path = project.get("deploy_path")
full_cmd = f"cd {deploy_path} && {command}" if deploy_path else command
cmd = ["ssh"]
if ssh_key:
cmd += ["-i", ssh_key]
if proxy_jump:
cmd += ["-J", proxy_jump]
cmd += ["-o", "StrictHostKeyChecking=no", "-o", "BatchMode=yes"]
cmd += [f"{ssh_user}@{deploy_host}", full_cmd]
return cmd
def execute_deploy(project: dict, conn: sqlite3.Connection) -> dict:
"""Execute deploy steps for a project. Returns structured result.
Returns:
{
"success": bool,
"steps": list[str],
"results": list[{"step", "stdout", "stderr", "exit_code"}],
}
"""
deploy_host = project.get("deploy_host") or project.get("ssh_host")
steps = build_deploy_steps(project)
if not steps:
return {
"success": False,
"steps": [],
"results": [],
"error": "No deploy steps: deploy_runtime not set or invalid",
}
deploy_path = project.get("deploy_path") or project.get("path") or None
results = []
overall_success = True
for step in steps:
try:
if deploy_host:
cmd = _build_ssh_cmd(project, step)
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120,
)
else:
# WARNING: shell=True — deploy commands are admin-only, set by project owner
proc = subprocess.run(
step,
shell=True, # WARNING: shell=True — command is admin-only
cwd=deploy_path,
capture_output=True,
text=True,
timeout=120,
)
except subprocess.TimeoutExpired:
results.append({
"step": step,
"stdout": "",
"stderr": "Timed out after 120 seconds",
"exit_code": -1,
})
overall_success = False
break
except Exception as e:
results.append({
"step": step,
"stdout": "",
"stderr": str(e),
"exit_code": -1,
})
overall_success = False
break
results.append({
"step": step,
"stdout": proc.stdout,
"stderr": proc.stderr,
"exit_code": proc.returncode,
})
if proc.returncode != 0:
overall_success = False
break # Stop on first failure
return {
"success": overall_success,
"steps": steps,
"results": results,
}
def get_deploy_chain(conn: sqlite3.Connection, project_id: str) -> list[str]:
"""Get ordered deploy chain: project + all downstream dependents (BFS, cycle-safe).
Traverses project_links where type='depends_on':
- from_project depends_on to_project
- When to_project changes, from_project must also be redeployed.
Returns list starting with project_id, followed by dependents in BFS order.
"""
visited: set[str] = {project_id}
chain: list[str] = [project_id]
queue: list[str] = [project_id]
while queue:
current = queue.pop(0)
rows = conn.execute(
"SELECT from_project FROM project_links"
" WHERE to_project = ? AND type = 'depends_on'",
(current,),
).fetchall()
for row in rows:
dep_id = row[0]
if dep_id not in visited:
visited.add(dep_id)
chain.append(dep_id)
queue.append(dep_id)
return chain
def deploy_with_dependents(conn: sqlite3.Connection, project_id: str) -> dict:
"""Deploy project and all dependents in chain order.
Returns:
{
"success": bool,
"steps": list[str], # main project steps
"results": list[dict], # main project step results
"dependents_deployed": list[str],
}
"""
from core.models import get_project
chain = get_deploy_chain(conn, project_id)
dependents = [pid for pid in chain if pid != project_id]
# Deploy main project first
main_project = get_project(conn, project_id)
if not main_project:
return {
"success": False,
"steps": [],
"results": [],
"dependents_deployed": [],
"error": f"Project '{project_id}' not found",
}
main_result = execute_deploy(main_project, conn)
overall_success = main_result["success"]
dependents_deployed = []
if main_result["success"] and dependents:
for dep_id in dependents:
dep_project = get_project(conn, dep_id)
if dep_project:
dep_result = execute_deploy(dep_project, conn)
if dep_result["success"]:
dependents_deployed.append(dep_id)
else:
overall_success = False
return {
"success": overall_success,
"steps": main_result.get("steps", []),
"results": main_result.get("results", []),
"dependents_deployed": dependents_deployed,
}

View file

@ -531,6 +531,86 @@ def get_running_pipelines_with_pid(conn: sqlite3.Connection) -> list[dict]:
return _rows_to_list(rows)
def get_pipeline_for_watch(conn: sqlite3.Connection, task_id: str) -> dict | None:
"""Return the most recent top-level pipeline for a task (for kin watch)."""
row = conn.execute(
"""SELECT id, task_id, project_id, status, pid, steps, created_at, completed_at
FROM pipelines
WHERE task_id = ? AND parent_pipeline_id IS NULL
ORDER BY created_at DESC, id DESC LIMIT 1""",
(task_id,),
).fetchone()
return _row_to_dict(row)
def get_current_agent_log(
conn: sqlite3.Connection, task_id: str, since_iso: str
) -> dict | None:
"""Return the most recent agent log for a task since a given datetime (for kin watch)."""
row = conn.execute(
"""SELECT agent_role, output_summary, duration_seconds, success, created_at
FROM agent_logs
WHERE task_id = ? AND created_at >= ?
ORDER BY id DESC LIMIT 1""",
(task_id, since_iso),
).fetchone()
return _row_to_dict(row)
def write_log(
conn: sqlite3.Connection,
pipeline_id: int,
message: str,
level: str = "INFO",
extra: dict | list | None = None,
) -> dict:
"""Insert a pipeline log entry. Returns inserted row as dict."""
extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None
cur = conn.execute(
"""INSERT INTO pipeline_log (pipeline_id, message, level, extra_json)
VALUES (?, ?, ?, ?)""",
(pipeline_id, message, level, extra_json),
)
conn.commit()
row = conn.execute(
"SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
def get_pipeline_logs(
conn: sqlite3.Connection,
pipeline_id: int,
since_id: int = 0,
) -> list[dict]:
"""Get pipeline log entries after since_id in chronological order."""
rows = conn.execute(
"""SELECT * FROM pipeline_log
WHERE pipeline_id = ? AND id > ?
ORDER BY id ASC""",
(pipeline_id, since_id),
).fetchall()
return _rows_to_list(rows)
def get_all_running_pipelines(conn: sqlite3.Connection) -> list[dict]:
"""Return all running pipelines with task/project info and current agent (for kin ps)."""
rows = conn.execute(
"""SELECT p.id, p.task_id, p.status, p.pid, p.created_at,
p.parent_pipeline_id,
t.title, proj.name AS project_name,
(SELECT agent_role FROM agent_logs
WHERE task_id = p.task_id AND created_at >= p.created_at
ORDER BY id DESC LIMIT 1) AS current_agent
FROM pipelines p
JOIN tasks t ON p.task_id = t.id
JOIN projects proj ON p.project_id = proj.id
WHERE p.status = 'running'
ORDER BY p.created_at DESC"""
).fetchall()
return _rows_to_list(rows)
# ---------------------------------------------------------------------------
# Support
# ---------------------------------------------------------------------------
@ -998,6 +1078,47 @@ def get_last_handoff(
return _row_to_dict(row)
# ---------------------------------------------------------------------------
# Project Links (KIN-079)
# ---------------------------------------------------------------------------
def create_project_link(
conn: sqlite3.Connection,
from_project: str,
to_project: str,
type: str,
description: str | None = None,
) -> dict:
"""Create a project dependency link. Returns the created link as dict."""
cur = conn.execute(
"""INSERT INTO project_links (from_project, to_project, type, description)
VALUES (?, ?, ?, ?)""",
(from_project, to_project, type, description),
)
conn.commit()
row = conn.execute(
"SELECT * FROM project_links WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
def get_project_links(conn: sqlite3.Connection, project_id: str) -> list[dict]:
"""Get all project links where project_id is from_project or to_project."""
rows = conn.execute(
"SELECT * FROM project_links WHERE from_project = ? OR to_project = ?"
" ORDER BY created_at",
(project_id, project_id),
).fetchall()
return _rows_to_list(rows)
def delete_project_link(conn: sqlite3.Connection, link_id: int) -> bool:
"""Delete a project link by id. Returns True if deleted, False if not found."""
cur = conn.execute("DELETE FROM project_links WHERE id = ?", (link_id,))
conn.commit()
return cur.rowcount > 0
def get_chat_messages(
conn: sqlite3.Connection,
project_id: str,

217
tasks/kin-084-spec.md Normal file
View file

@ -0,0 +1,217 @@
# KIN-084 — Live Console: архитектурная спека
## 1. DDL — таблица pipeline_log
```sql
CREATE TABLE IF NOT EXISTS pipeline_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL REFERENCES pipelines(id),
ts TEXT NOT NULL DEFAULT (datetime('now')),
level TEXT NOT NULL DEFAULT 'INFO',
message TEXT NOT NULL,
extra_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id);
```
**Замечание**: Brief указывает `pipeline_id TEXT`, но `pipelines.id``INTEGER PRIMARY KEY AUTOINCREMENT`.
Используем `INTEGER` (соответствует паттерну `department_handoffs.pipeline_id INTEGER`).
**Где добавить**:
1. В `SCHEMA` в `core/db.py` — в конец строки SCHEMA (после `task_attachments`)
2. В `_migrate()` в `core/db.py` — guard `if "pipeline_log" not in existing_tables:` (по аналогии с `task_attachments`)
---
## 2. Функции в core/models.py
### write_log()
```python
def write_log(
conn: sqlite3.Connection,
pipeline_id: int,
message: str,
level: str = "INFO",
extra: dict | list | None = None,
) -> dict:
"""Insert a pipeline log entry. Returns inserted row as dict.
extra: optional structured data (serialized to JSON string).
level: INFO | DEBUG | ERROR | WARN
"""
extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None
cur = conn.execute(
"""INSERT INTO pipeline_log (pipeline_id, message, level, extra_json)
VALUES (?, ?, ?, ?)""",
(pipeline_id, message, level, extra_json),
)
conn.commit()
row = conn.execute(
"SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
```
### get_pipeline_logs()
```python
def get_pipeline_logs(
conn: sqlite3.Connection,
pipeline_id: int,
since_id: int = 0,
) -> list[dict]:
"""Get pipeline log entries after since_id in chronological order.
since_id=0 returns all entries (id > 0 matches all).
"""
rows = conn.execute(
"""SELECT * FROM pipeline_log
WHERE pipeline_id = ? AND id > ?
ORDER BY id ASC""",
(pipeline_id, since_id),
).fetchall()
return _rows_to_list(rows)
```
**Замечание по extra_json**: `_row_to_dict()` автоматически декодирует JSON-строки начинающиеся с `{` или `[`.
Так что в ответе API `extra_json` будет уже распарсенным dict/list.
---
## 3. Точки инструментации в agents/runner.py
### Функция run_pipeline() (строки 12101835)
Все write_log в run_pipeline должны:
- Быть защищены `if pipeline:` (dry_run не создаёт pipeline)
- Оборачиваться в `try/except Exception: pass` (не блокировать pipeline)
| # | После строки | Событие | Level | extra |
|---|---|---|---|---|
| 1 | ~1270 после `update_pipeline(pid=...)` | Pipeline start | INFO | `{route_type, steps_count, mode}` |
| 2 | ~1280 начало тела цикла `for i, step` | Step N start | INFO | `{role, model, brief_preview}` |
| 3 | ~1383 после аккумуляции stats | Step N done (success) | INFO | `{tokens, cost_usd, duration_seconds}` |
| 4 | ~1329 в `except Exception as exc:` | Step exception | ERROR | `{role, exc_type}` |
| 5 | ~1436-1438 в блоке `if not result["success"]` | Step failed | ERROR | `{role, error}` |
| 6 | ~1525 в блоке `if blocked_info:` | Step blocked | WARN | `{role, reason}` |
| 7 | ~1748 после `update_pipeline(completed)` | Pipeline completed | INFO | `{steps, cost_usd, tokens, duration_s}` |
| 8 | ~17711802 финальный статус задачи | Task status | INFO | `{task_status, mode}` |
### Функция _execute_department_head_step() (строки 10281172)
Логируем в `parent_pipeline_id` (существует в момент вызова).
Guard: `if parent_pipeline_id:`.
| # | После строки | Событие | Level | extra |
|---|---|---|---|---|
| 9 | ~1059-1066 (dept blocked) | Dept head blocked | WARN | `{role, blocked_reason}` |
| 10 | ~1100 перед вызовом sub run_pipeline | Sub-pipeline start | INFO | `{dept_name, sub_steps}` |
| 11 | ~1112 после вызова sub run_pipeline | Sub-pipeline done | INFO/ERROR | `{success, steps_completed}` |
**Пример вызова (точка 2)**:
```python
try:
if pipeline:
brief_preview = (step.get("brief") or "")[:100]
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} start: role={role}, model={model}",
level="INFO",
extra={"role": role, "model": model, "brief_preview": brief_preview},
)
except Exception:
pass
```
**Пример вызова (точка 1 — pipeline start)**:
```python
try:
models.write_log(
conn, pipeline["id"],
f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}",
extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode},
)
except Exception:
pass
```
---
## 4. API Endpoint
### GET /api/pipelines/{pipeline_id}/logs
**Контракт**:
- Путь: `/api/pipelines/{pipeline_id}/logs`
- Query: `since_id: int = 0`
- Response: `list[PipelineLogEntry]`
- 404 если pipeline не найден
**Схема ответа** (один элемент):
```json
{
"id": 42,
"pipeline_id": 7,
"ts": "2026-03-17 14:23:01",
"level": "INFO",
"message": "Step 1/3 start: role=backend_dev, model=sonnet",
"extra_json": {"role": "backend_dev", "model": "sonnet", "brief_preview": "..."}
}
```
**Реализация в web/api.py**:
```python
@app.get("/api/pipelines/{pipeline_id}/logs")
def get_pipeline_logs(pipeline_id: int, since_id: int = 0):
"""Get pipeline log entries after since_id (for live console polling)."""
conn = get_conn()
row = conn.execute("SELECT id FROM pipelines WHERE id = ?", (pipeline_id,)).fetchone()
if not row:
conn.close()
raise HTTPException(404, f"Pipeline {pipeline_id} not found")
logs = models.get_pipeline_logs(conn, pipeline_id, since_id=since_id)
conn.close()
return logs
```
**Поллинг-протокол (для фронтенда)**:
1. `since_id=0` — первый запрос, получить всё
2. `since_id = max(id)` из последнего ответа для инкрементного обновления
3. Поллить каждые 1-2 секунды пока `pipeline.status == 'running'`
4. Остановить поллинг когда pipeline завершён
---
## 5. Тесты (обязательны для core/)
### tests/test_db.py
- SCHEMA-строка содержит CREATE TABLE pipeline_log (convention #387)
- `_migrate()` создаёт pipeline_log если таблицы нет
### tests/test_models.py
- `write_log()` корректно вставляет запись, возвращает dict
- `write_log()` с `extra=None` — extra_json = NULL
- `write_log()` с `extra={dict}` — extra_json декодируется в dict
- `get_pipeline_logs()` с `since_id=0` возвращает все записи
- `get_pipeline_logs()` с `since_id=N` возвращает только id > N
- Записи в хронологическом порядке (ORDER BY id ASC)
### tests/test_api.py
- GET /api/pipelines/{id}/logs → 200, list
- GET /api/pipelines/{id}/logs?since_id=5 → только id > 5
- GET /api/pipelines/9999/logs → 404
---
## 6. Совместимость (#375/#376)
`write_log()` пишет ТОЛЬКО в `pipeline_log`, не затрагивает `pipelines.status` и `watchdog`.
Watchdog (core/watchdog.py) проверяет `pipelines.status = 'running' AND pid IS NOT NULL` — не зависит от pipeline_log.
Нет конфликтов.
**FK guard**: write_log вызывается только ПОСЛЕ `create_pipeline()` (т.е. когда pipeline_id уже в БД).
PM-шаг (cli/main.py) запускается ДО `run_pipeline()`, поэтому pipeline_id ещё не существует.
PM-логи недоступны в pipeline_log — это ограничение текущей архитектуры.
Для KIN-084 MVP: логируем от момента создания pipeline_record до финала.

View file

@ -2199,3 +2199,69 @@ def test_patch_project_test_command_null_returns_400(client):
r = client.patch("/api/projects/p1", json={"test_command": None})
assert r.status_code == 400
assert "Nothing to update" in r.json()["detail"]
# ---------------------------------------------------------------------------
# KIN-084: GET /api/pipelines/{pipeline_id}/logs
# ---------------------------------------------------------------------------
def _seed_pipeline(db_path, project_id="p1", task_id="P1-001") -> int:
"""Создаёт pipeline в БД и возвращает его id."""
from core.db import init_db
from core import models
conn = init_db(db_path)
pipe = models.create_pipeline(conn, task_id, project_id, "feature", [{"role": "dev"}])
conn.close()
return pipe["id"]
def test_get_pipeline_logs_returns_200_empty(client):
"""KIN-084: GET /api/pipelines/{id}/logs → 200 с пустым списком если логов нет."""
pid = _seed_pipeline(api_module.DB_PATH)
r = client.get(f"/api/pipelines/{pid}/logs")
assert r.status_code == 200
assert r.json() == []
def test_get_pipeline_logs_returns_entries(client):
"""KIN-084: GET /api/pipelines/{id}/logs → список записей с нужными полями."""
from core.db import init_db
from core import models
pid = _seed_pipeline(api_module.DB_PATH)
conn = init_db(api_module.DB_PATH)
models.write_log(conn, pid, "Pipeline started", extra={"steps_count": 3})
models.write_log(conn, pid, "Step 1 done")
conn.close()
r = client.get(f"/api/pipelines/{pid}/logs")
assert r.status_code == 200
logs = r.json()
assert len(logs) == 2
assert logs[0]["message"] == "Pipeline started"
assert isinstance(logs[0]["extra_json"], dict)
assert logs[0]["extra_json"]["steps_count"] == 3
assert logs[1]["message"] == "Step 1 done"
def test_get_pipeline_logs_since_id_filters(client):
"""KIN-084: ?since_id=N возвращает только id > N."""
from core.db import init_db
from core import models
pid = _seed_pipeline(api_module.DB_PATH, task_id="P1-001")
conn = init_db(api_module.DB_PATH)
e1 = models.write_log(conn, pid, "Entry 1")
models.write_log(conn, pid, "Entry 2")
models.write_log(conn, pid, "Entry 3")
conn.close()
r = client.get(f"/api/pipelines/{pid}/logs?since_id={e1['id']}")
assert r.status_code == 200
logs = r.json()
assert len(logs) == 2
assert all(log["id"] > e1["id"] for log in logs)
def test_get_pipeline_logs_not_found(client):
"""KIN-084: GET /api/pipelines/9999/logs → 404."""
r = client.get("/api/pipelines/9999/logs")
assert r.status_code == 404

View file

@ -502,3 +502,42 @@ def test_migrate_pid_guard_is_idempotent():
assert before == after
assert "pid" in after
conn.close()
# ---------------------------------------------------------------------------
# Schema KIN-084: pipeline_log в SCHEMA-строке и _migrate()
# Конвенция #387: DDL-парсинг SCHEMA-строки
# ---------------------------------------------------------------------------
def test_schema_string_contains_pipeline_log_table():
"""Регрессия KIN-084: SCHEMA-строка содержит CREATE TABLE pipeline_log."""
from core.db import SCHEMA
assert "CREATE TABLE IF NOT EXISTS pipeline_log" in SCHEMA
def test_schema_pipeline_log_has_required_columns():
"""SCHEMA pipeline_log содержит все обязательные колонки."""
from core.db import SCHEMA
start = SCHEMA.index("CREATE TABLE IF NOT EXISTS pipeline_log")
end = SCHEMA.index(");", start) + 2
ddl = SCHEMA[start:end]
assert "pipeline_id INTEGER" in ddl
assert "ts " in ddl
assert "level " in ddl
assert "message " in ddl
assert "extra_json " in ddl
def test_migrate_creates_pipeline_log_on_fresh_init(conn):
"""Свежая инициализация: pipeline_log существует."""
tables = {r["name"] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
assert "pipeline_log" in tables
def test_migrate_creates_pipeline_log_if_missing():
"""_migrate создаёт pipeline_log если таблица отсутствует в старой БД."""
conn = _old_schema_conn()
_migrate(conn)
tables = {r["name"] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
assert "pipeline_log" in tables
conn.close()

View file

@ -0,0 +1,464 @@
"""Tests for KIN-092: cli/watch.py — kin watch and kin ps commands.
Покрывает:
- _format_elapsed: секунды, минуты, часы, ноль, отрицательное, невалидное
- _parse_total_steps: list, JSON-строка, None, невалидный JSON, не-list JSON
- get_pipeline_for_watch: только top-level, самый свежий, None если нет
- get_current_agent_log: последний лог, фильтрация по дате пайплайна
- get_all_running_pipelines: только running, PID=NULL, current_agent
- _render_ps: пустой список, PID=NULL '-', обрезка title
- _render_watch: нет пайплайна, PID=NULL '-', terminal status, output lines
- cmd_watch: несуществующий task_id понятная ошибка, sleep не вызывается
"""
import pytest
from datetime import datetime, timedelta
from unittest.mock import patch
from core.db import init_db
from core import models
from cli.watch import (
_format_elapsed,
_parse_total_steps,
_render_watch,
_render_ps,
cmd_watch,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB for each test."""
c = init_db(db_path=":memory:")
yield c
c.close()
@pytest.fixture
def project_and_task(conn):
"""Create a project and a task; return (project, task)."""
proj = models.create_project(conn, "p1", "Project One", "/path/one", tech_stack=["python"])
task = models.create_task(conn, "KIN-092-T1", "p1", "Watch task")
return proj, task
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _call_format_elapsed(dt_iso: str, elapsed_seconds: int) -> str:
"""Call _format_elapsed with datetime.utcnow frozen to (started + elapsed_seconds)."""
normalized = dt_iso.replace(' ', 'T') if ' ' in dt_iso else dt_iso
started = datetime.fromisoformat(normalized)
frozen_now = started + timedelta(seconds=elapsed_seconds)
with patch("cli.watch.datetime") as mock_dt:
mock_dt.fromisoformat = datetime.fromisoformat
mock_dt.utcnow.return_value = frozen_now
return _format_elapsed(dt_iso)
# ---------------------------------------------------------------------------
# _format_elapsed
# ---------------------------------------------------------------------------
def test_format_elapsed_seconds():
assert _call_format_elapsed("2026-03-17 12:00:00", 45) == "45s"
def test_format_elapsed_exactly_one_minute():
assert _call_format_elapsed("2026-03-17 12:00:00", 60) == "1m 0s"
def test_format_elapsed_minutes_and_seconds():
assert _call_format_elapsed("2026-03-17 12:00:00", 90) == "1m 30s"
def test_format_elapsed_hours():
assert _call_format_elapsed("2026-03-17 12:00:00", 3661) == "1h 1m"
def test_format_elapsed_zero():
"""Нулевое время → '0s'."""
assert _call_format_elapsed("2026-03-17 12:00:00", 0) == "0s"
def test_format_elapsed_negative_clipped_to_zero():
"""Отрицательный elapsed (будущая дата) → '0s', не падает."""
assert _call_format_elapsed("2026-03-17 12:00:00", -10) == "0s"
def test_format_elapsed_sqlite_space_format():
"""SQLite хранит 'YYYY-MM-DD HH:MM:SS' через пробел — должно парситься."""
assert _call_format_elapsed("2026-03-17 10:30:00", 65) == "1m 5s"
def test_format_elapsed_invalid_string():
assert _format_elapsed("not-a-date") == "?"
def test_format_elapsed_none():
assert _format_elapsed(None) == "?"
# ---------------------------------------------------------------------------
# _parse_total_steps
# ---------------------------------------------------------------------------
def test_parse_total_steps_list():
assert _parse_total_steps(["pm", "dev", "qa"]) == 3
def test_parse_total_steps_empty_list():
assert _parse_total_steps([]) == 0
def test_parse_total_steps_json_string():
assert _parse_total_steps('["pm", "dev", "qa"]') == 3
def test_parse_total_steps_json_string_empty_array():
assert _parse_total_steps('[]') == 0
def test_parse_total_steps_none():
assert _parse_total_steps(None) == "?"
def test_parse_total_steps_invalid_json():
assert _parse_total_steps("not-json") == "?"
def test_parse_total_steps_json_non_list():
"""JSON-объект (не массив) → '?'."""
assert _parse_total_steps('{"key": "value"}') == "?"
# ---------------------------------------------------------------------------
# SQL: get_pipeline_for_watch
# ---------------------------------------------------------------------------
def test_get_pipeline_for_watch_returns_none_when_no_pipeline(conn, project_and_task):
_, task = project_and_task
result = models.get_pipeline_for_watch(conn, task["id"])
assert result is None
def test_get_pipeline_for_watch_returns_top_level_pipeline(conn, project_and_task):
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"])
result = models.get_pipeline_for_watch(conn, task["id"])
assert result is not None
assert result["id"] == pipeline["id"]
def test_get_pipeline_for_watch_excludes_sub_pipelines(conn, project_and_task):
"""Саб-пайплайны (parent_pipeline_id IS NOT NULL) должны игнорироваться."""
_, task = project_and_task
parent = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"])
# Создаём саб-пайплайн вручную
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, steps, status, parent_pipeline_id)"
" VALUES (?, ?, ?, ?, ?, ?)",
(task["id"], "p1", "sub", "[]", "running", parent["id"]),
)
conn.commit()
result = models.get_pipeline_for_watch(conn, task["id"])
assert result["id"] == parent["id"]
assert result.get("parent_pipeline_id") is None
def test_get_pipeline_for_watch_returns_most_recent(conn, project_and_task):
"""Если пайплайнов несколько — возвращает самый свежий."""
_, task = project_and_task
p1 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"])
p2 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"])
result = models.get_pipeline_for_watch(conn, task["id"])
assert result["id"] == p2["id"]
# ---------------------------------------------------------------------------
# SQL: get_current_agent_log
# ---------------------------------------------------------------------------
def test_get_current_agent_log_returns_none_when_no_logs(conn, project_and_task):
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"])
assert result is None
def test_get_current_agent_log_returns_most_recent(conn, project_and_task):
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"], output_summary="first run")
models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"], output_summary="second run")
result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"])
assert result["agent_role"] == "dev"
assert result["output_summary"] == "second run"
# ---------------------------------------------------------------------------
# SQL: get_all_running_pipelines
# ---------------------------------------------------------------------------
def test_get_all_running_pipelines_empty_when_no_pipelines(conn):
assert models.get_all_running_pipelines(conn) == []
def test_get_all_running_pipelines_returns_only_running(conn, project_and_task):
"""completed и failed пайплайны не должны попадать в kin ps."""
_, task = project_and_task
running = models.create_pipeline(conn, task["id"], "p1", "standard", [])
completed = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.update_pipeline(conn, completed["id"], status="completed")
failed = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.update_pipeline(conn, failed["id"], status="failed")
result = models.get_all_running_pipelines(conn)
ids = [r["id"] for r in result]
assert running["id"] in ids
assert completed["id"] not in ids
assert failed["id"] not in ids
def test_get_all_running_pipelines_pid_null_is_none(conn, project_and_task):
"""PID=NULL → решение #374: отображается как '-' (pid=None в dict)."""
_, task = project_and_task
models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_all_running_pipelines(conn)
assert len(result) == 1
assert result[0]["pid"] is None
def test_get_all_running_pipelines_includes_current_agent(conn, project_and_task):
"""current_agent = последний agent_role из agent_logs."""
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"])
models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"])
result = models.get_all_running_pipelines(conn)
assert result[0]["current_agent"] == "dev"
def test_get_all_running_pipelines_no_logs_current_agent_is_none(conn, project_and_task):
_, task = project_and_task
models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_all_running_pipelines(conn)
assert result[0]["current_agent"] is None
def test_get_all_running_pipelines_includes_project_and_task_info(conn, project_and_task):
proj, task = project_and_task
models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_all_running_pipelines(conn)
assert result[0]["project_name"] == proj["name"]
assert result[0]["title"] == task["title"]
# ---------------------------------------------------------------------------
# _render_ps (stdout)
# ---------------------------------------------------------------------------
def test_render_ps_empty_list(capsys):
_render_ps([])
out = capsys.readouterr().out
assert "No running pipelines." in out
def test_render_ps_suggests_watch_command_when_empty(capsys):
_render_ps([])
out = capsys.readouterr().out
assert "kin watch" in out
def test_render_ps_pid_null_displayed_as_dash(capsys):
"""Decision #374: PID=NULL → '-'."""
rows = [{
"id": 1, "task_id": "KIN-001", "title": "Short title",
"project_name": "MyProject", "pid": None,
"created_at": "2026-03-17 12:00:00", "current_agent": "dev",
"parent_pipeline_id": None,
}]
with patch("cli.watch._format_elapsed", return_value="5s"):
_render_ps(rows)
out = capsys.readouterr().out
# PID column should contain dash, not None or empty
lines = [l for l in out.splitlines() if "Short title" in l or "#1" in l]
assert any("-" in l for l in lines)
def test_render_ps_title_truncated_at_20_chars(capsys):
long_title = "A" * 25
rows = [{
"id": 1, "task_id": "KIN-001", "title": long_title,
"project_name": "P", "pid": 1234,
"created_at": "2026-03-17 12:00:00", "current_agent": None,
"parent_pipeline_id": None,
}]
with patch("cli.watch._format_elapsed", return_value="1s"):
_render_ps(rows)
out = capsys.readouterr().out
assert "A" * 20 + "" in out
assert long_title not in out
def test_render_ps_with_pid_shows_pid(capsys):
rows = [{
"id": 2, "task_id": "KIN-002", "title": "Fix bug",
"project_name": "Proj", "pid": 9999,
"created_at": "2026-03-17 11:00:00", "current_agent": "pm",
"parent_pipeline_id": None,
}]
with patch("cli.watch._format_elapsed", return_value="10m"):
_render_ps(rows)
out = capsys.readouterr().out
assert "9999" in out
def test_render_ps_shows_running_count(capsys):
rows = [
{"id": 1, "task_id": "KIN-001", "title": "T1", "project_name": "P",
"pid": None, "created_at": "2026-03-17 12:00:00",
"current_agent": None, "parent_pipeline_id": None},
{"id": 2, "task_id": "KIN-002", "title": "T2", "project_name": "P",
"pid": 42, "created_at": "2026-03-17 12:01:00",
"current_agent": "dev", "parent_pipeline_id": None},
]
with patch("cli.watch._format_elapsed", return_value="1s"):
_render_ps(rows)
out = capsys.readouterr().out
assert "2" in out
# ---------------------------------------------------------------------------
# _render_watch (stdout)
# ---------------------------------------------------------------------------
def test_render_watch_no_pipeline(capsys):
task = {"id": "KIN-001", "title": "Test task", "status": "pending"}
_render_watch(task, None, None, 0, "?")
out = capsys.readouterr().out
assert "No pipeline started yet." in out
def test_render_watch_pipeline_null_pid_shows_dash(capsys):
"""PID=NULL → 'PID: -'."""
task = {"id": "KIN-001", "title": "Test task", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": None,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="30s"):
_render_watch(task, pipeline, None, 0, 3)
out = capsys.readouterr().out
assert "PID: -" in out
def test_render_watch_pipeline_with_pid(capsys):
task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 5678,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="10s"):
_render_watch(task, pipeline, None, 0, 2)
out = capsys.readouterr().out
assert "5678" in out
def test_render_watch_terminal_status_completed(capsys):
task = {"id": "KIN-001", "title": "Done task", "status": "done"}
pipeline = {
"id": 1, "status": "completed", "pid": 123,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="1m"):
_render_watch(task, pipeline, None, 2, 3)
out = capsys.readouterr().out
assert "[Pipeline completed. Exiting.]" in out
def test_render_watch_terminal_status_failed(capsys):
task = {"id": "KIN-001", "title": "Failed task", "status": "failed"}
pipeline = {
"id": 1, "status": "failed", "pid": 123,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="2m"):
_render_watch(task, pipeline, None, 1, 3)
out = capsys.readouterr().out
assert "[Pipeline failed. Exiting.]" in out
def test_render_watch_running_shows_update_hint(capsys):
task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 42,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="10s"):
_render_watch(task, pipeline, None, 0, 2)
out = capsys.readouterr().out
assert "[Updating every 5s. Ctrl+C to stop]" in out
def test_render_watch_truncates_long_title(capsys):
long_title = "B" * 55
task = {"id": "KIN-001", "title": long_title, "status": "pending"}
_render_watch(task, None, None, 0, "?")
out = capsys.readouterr().out
assert "" in out
assert long_title not in out
def test_render_watch_shows_last_15_output_lines(capsys):
"""Output truncated к последним 15 строкам (аналог tail -f)."""
task = {"id": "KIN-001", "title": "T", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 1,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
lines = [f"line{i}" for i in range(20)]
log = {"agent_role": "dev", "output_summary": "\n".join(lines)}
with patch("cli.watch._format_elapsed", return_value="1s"):
_render_watch(task, pipeline, log, 1, 3)
out = capsys.readouterr().out
assert "line19" in out # последняя строка — должна быть
assert "line5" in out # line5 = 6-я с начала, входит в последние 15
assert "line4" not in out # line4 = 5-я с начала, отсекается
def test_render_watch_waiting_message_when_no_log(capsys):
"""Нет логов агента → 'Waiting for first agent...'."""
task = {"id": "KIN-001", "title": "New task", "status": "pending"}
pipeline = {
"id": 1, "status": "running", "pid": None,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="0s"):
_render_watch(task, pipeline, None, 0, "?")
out = capsys.readouterr().out
assert "Waiting for first agent" in out
# ---------------------------------------------------------------------------
# cmd_watch: nonexistent task_id
# ---------------------------------------------------------------------------
def test_cmd_watch_nonexistent_task_prints_error(conn, capsys):
cmd_watch(conn, "KIN-NONEXISTENT")
out = capsys.readouterr().out
assert "KIN-NONEXISTENT" in out
assert "not found" in out.lower()
def test_cmd_watch_nonexistent_task_does_not_sleep(conn):
"""cmd_watch должен вернуться немедленно, без вызова time.sleep."""
with patch("cli.watch.time.sleep") as mock_sleep:
cmd_watch(conn, "KIN-NONEXISTENT")
mock_sleep.assert_not_called()

View file

@ -757,3 +757,76 @@ def test_test_command_can_be_set(conn):
models.create_project(conn, "prj_tc2", "TC Project 2", "/tmp/tc2")
updated = models.update_project(conn, "prj_tc2", test_command="pytest -v --tb=short")
assert updated["test_command"] == "pytest -v --tb=short"
# ---------------------------------------------------------------------------
# KIN-084: write_log() и get_pipeline_logs()
# ---------------------------------------------------------------------------
@pytest.fixture
def pipeline_conn(conn):
"""Возвращает (conn, pipeline_id) для тестов write_log / get_pipeline_logs."""
models.create_project(conn, "plog_proj", "Log Project", "/tmp/plog")
models.create_task(conn, "PLOG-001", "plog_proj", "Log Task")
pipe = models.create_pipeline(conn, "PLOG-001", "plog_proj", "feature", [{"role": "dev"}])
return conn, pipe["id"]
def test_write_log_returns_dict(pipeline_conn):
"""KIN-084: write_log возвращает dict с id, pipeline_id, message, level."""
db, pid = pipeline_conn
entry = models.write_log(db, pid, "Pipeline started")
assert isinstance(entry, dict)
assert entry["id"] is not None
assert entry["pipeline_id"] == pid
assert entry["message"] == "Pipeline started"
assert entry["level"] == "INFO"
assert entry["ts"] is not None
def test_write_log_extra_none_gives_null(pipeline_conn):
"""KIN-084: write_log без extra → extra_json=None."""
db, pid = pipeline_conn
entry = models.write_log(db, pid, "No extra", extra=None)
assert entry["extra_json"] is None
def test_write_log_extra_dict_decoded(pipeline_conn):
"""KIN-084: write_log с extra=dict → extra_json декодируется в dict."""
db, pid = pipeline_conn
entry = models.write_log(db, pid, "With extra", extra={"role": "dev", "model": "sonnet"})
assert isinstance(entry["extra_json"], dict)
assert entry["extra_json"]["role"] == "dev"
assert entry["extra_json"]["model"] == "sonnet"
def test_get_pipeline_logs_since_id_zero_returns_all(pipeline_conn):
"""KIN-084: get_pipeline_logs(since_id=0) возвращает все записи."""
db, pid = pipeline_conn
models.write_log(db, pid, "Entry 1")
models.write_log(db, pid, "Entry 2")
models.write_log(db, pid, "Entry 3")
logs = models.get_pipeline_logs(db, pid, since_id=0)
assert len(logs) == 3
def test_get_pipeline_logs_since_id_filters(pipeline_conn):
"""KIN-084: get_pipeline_logs(since_id=N) возвращает только id > N."""
db, pid = pipeline_conn
e1 = models.write_log(db, pid, "Entry 1")
models.write_log(db, pid, "Entry 2")
models.write_log(db, pid, "Entry 3")
logs = models.get_pipeline_logs(db, pid, since_id=e1["id"])
assert len(logs) == 2
assert all(log["id"] > e1["id"] for log in logs)
def test_get_pipeline_logs_ordered_asc(pipeline_conn):
"""KIN-084: get_pipeline_logs возвращает записи в хронологическом порядке."""
db, pid = pipeline_conn
models.write_log(db, pid, "First")
models.write_log(db, pid, "Second")
models.write_log(db, pid, "Third")
logs = models.get_pipeline_logs(db, pid)
ids = [log["id"] for log in logs]
assert ids == sorted(ids)

View file

@ -226,12 +226,19 @@ class ProjectCreate(BaseModel):
return self
VALID_DEPLOY_RUNTIMES = {"docker", "node", "python", "static"}
class ProjectPatch(BaseModel):
execution_mode: str | None = None
autocommit_enabled: bool | None = None
auto_test_enabled: bool | None = None
obsidian_vault_path: str | None = None
deploy_command: str | None = None
deploy_host: str | None = None
deploy_path: str | None = None
deploy_runtime: str | None = None
deploy_restart_cmd: str | None = None
test_command: str | None = None
project_type: str | None = None
ssh_host: str | None = None
@ -246,6 +253,8 @@ def patch_project(project_id: str, body: ProjectPatch):
body.execution_mode, body.autocommit_enabled is not None,
body.auto_test_enabled is not None,
body.obsidian_vault_path, body.deploy_command is not None,
body.deploy_host is not None, body.deploy_path is not None,
body.deploy_runtime is not None, body.deploy_restart_cmd is not None,
body.test_command is not None,
body.project_type, body.ssh_host is not None,
body.ssh_user is not None, body.ssh_key_path is not None,
@ -257,6 +266,8 @@ def patch_project(project_id: str, body: ProjectPatch):
raise HTTPException(400, f"Invalid execution_mode '{body.execution_mode}'. Must be one of: {', '.join(VALID_EXECUTION_MODES)}")
if body.project_type is not None and body.project_type not in VALID_PROJECT_TYPES:
raise HTTPException(400, f"Invalid project_type '{body.project_type}'. Must be one of: {', '.join(VALID_PROJECT_TYPES)}")
if body.deploy_runtime is not None and body.deploy_runtime != "" and body.deploy_runtime not in VALID_DEPLOY_RUNTIMES:
raise HTTPException(400, f"Invalid deploy_runtime '{body.deploy_runtime}'. Must be one of: {', '.join(sorted(VALID_DEPLOY_RUNTIMES))}")
conn = get_conn()
p = models.get_project(conn, project_id)
if not p:
@ -274,6 +285,14 @@ def patch_project(project_id: str, body: ProjectPatch):
if body.deploy_command is not None:
# Empty string = sentinel for clearing (decision #68)
fields["deploy_command"] = None if body.deploy_command == "" else body.deploy_command
if body.deploy_host is not None:
fields["deploy_host"] = None if body.deploy_host == "" else body.deploy_host
if body.deploy_path is not None:
fields["deploy_path"] = None if body.deploy_path == "" else body.deploy_path
if body.deploy_runtime is not None:
fields["deploy_runtime"] = None if body.deploy_runtime == "" else body.deploy_runtime
if body.deploy_restart_cmd is not None:
fields["deploy_restart_cmd"] = None if body.deploy_restart_cmd == "" else body.deploy_restart_cmd
if body.test_command is not None:
fields["test_command"] = body.test_command
if body.project_type is not None:
@ -325,19 +344,42 @@ def sync_obsidian_endpoint(project_id: str):
@app.post("/api/projects/{project_id}/deploy")
def deploy_project(project_id: str):
"""Execute deploy_command for a project. Returns stdout/stderr/exit_code.
"""Deploy a project using structured runtime steps or legacy deploy_command.
# WARNING: shell=True — deploy_command is admin-only, set in Settings by the project owner.
New-style (deploy_runtime set): uses core/deploy.py structured steps,
cascades to dependent projects via project_links.
Legacy fallback (only deploy_command set): runs deploy_command via shell.
WARNING: shell=True deploy commands are admin-only, set in Settings by the project owner.
"""
import time
from core.deploy import deploy_with_dependents
conn = get_conn()
p = models.get_project(conn, project_id)
conn.close()
if not p:
conn.close()
raise HTTPException(404, f"Project '{project_id}' not found")
deploy_runtime = p.get("deploy_runtime")
deploy_command = p.get("deploy_command")
if not deploy_command:
raise HTTPException(400, "deploy_command not set for this project")
if not deploy_runtime and not deploy_command:
conn.close()
raise HTTPException(400, "Neither deploy_runtime nor deploy_command is set for this project")
if deploy_runtime:
# New structured deploy with dependency cascade
try:
result = deploy_with_dependents(conn, project_id)
except Exception as e:
conn.close()
raise HTTPException(500, f"Deploy failed: {e}")
conn.close()
return result
# Legacy fallback: run deploy_command via shell
conn.close()
cwd = p.get("path") or None
start = time.monotonic()
try:
@ -386,6 +428,57 @@ def create_project(body: ProjectCreate):
return p
# ---------------------------------------------------------------------------
# Project Links (KIN-079)
# ---------------------------------------------------------------------------
class ProjectLinkCreate(BaseModel):
from_project: str
to_project: str
type: str
description: str | None = None
@app.post("/api/project-links")
def create_project_link(body: ProjectLinkCreate):
"""Create a project dependency link."""
conn = get_conn()
if not models.get_project(conn, body.from_project):
conn.close()
raise HTTPException(404, f"Project '{body.from_project}' not found")
if not models.get_project(conn, body.to_project):
conn.close()
raise HTTPException(404, f"Project '{body.to_project}' not found")
link = models.create_project_link(
conn, body.from_project, body.to_project, body.type, body.description
)
conn.close()
return link
@app.get("/api/projects/{project_id}/links")
def get_project_links(project_id: str):
"""Get all project links where project is from or to."""
conn = get_conn()
if not models.get_project(conn, project_id):
conn.close()
raise HTTPException(404, f"Project '{project_id}' not found")
links = models.get_project_links(conn, project_id)
conn.close()
return links
@app.delete("/api/project-links/{link_id}", status_code=204)
def delete_project_link(link_id: int):
"""Delete a project link by id."""
conn = get_conn()
deleted = models.delete_project_link(conn, link_id)
conn.close()
if not deleted:
raise HTTPException(404, f"Link {link_id} not found")
return Response(status_code=204)
# ---------------------------------------------------------------------------
# Phases (KIN-059)
# ---------------------------------------------------------------------------
@ -670,6 +763,19 @@ def patch_task(task_id: str, body: TaskPatch):
return t
@app.get("/api/pipelines/{pipeline_id}/logs")
def get_pipeline_logs(pipeline_id: int, since_id: int = 0):
"""Get pipeline log entries after since_id (for live console polling)."""
conn = get_conn()
row = conn.execute("SELECT id FROM pipelines WHERE id = ?", (pipeline_id,)).fetchone()
if not row:
conn.close()
raise HTTPException(404, f"Pipeline {pipeline_id} not found")
logs = models.get_pipeline_logs(conn, pipeline_id, since_id=since_id)
conn.close()
return logs
@app.get("/api/tasks/{task_id}/pipeline")
def get_task_pipeline(task_id: str):
"""Get agent_logs for a task (pipeline steps)."""