2026-03-17 17:26:31 +02:00
|
|
|
"""
|
|
|
|
|
Kin CLI — kin watch <task_id> and kin ps commands.
|
|
|
|
|
|
|
|
|
|
kin watch: polling monitor, updates every 5s (like tail -f for a pipeline).
|
|
|
|
|
kin ps: one-shot list of all running pipelines with PID and current step.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import time
|
2026-03-17 18:26:28 +02:00
|
|
|
from datetime import datetime, timezone
|
2026-03-17 17:26:31 +02:00
|
|
|
|
|
|
|
|
from core import models
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _clear_screen() -> None:
|
|
|
|
|
print('\033[2J\033[H', end='', flush=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _format_elapsed(dt_iso: str) -> str:
|
|
|
|
|
"""Format elapsed time from SQLite CURRENT_TIMESTAMP (UTC naive, space separator)."""
|
|
|
|
|
try:
|
|
|
|
|
# SQLite stores "YYYY-MM-DD HH:MM:SS"; fromisoformat requires 'T' in Python < 3.11
|
|
|
|
|
normalized = dt_iso.replace(' ', 'T') if ' ' in dt_iso else dt_iso
|
|
|
|
|
started = datetime.fromisoformat(normalized)
|
2026-03-17 18:26:28 +02:00
|
|
|
total_secs = int((datetime.now(timezone.utc).replace(tzinfo=None) - started).total_seconds())
|
2026-03-17 17:26:31 +02:00
|
|
|
if total_secs < 0:
|
|
|
|
|
total_secs = 0
|
|
|
|
|
hours = total_secs // 3600
|
|
|
|
|
minutes = (total_secs % 3600) // 60
|
|
|
|
|
secs = total_secs % 60
|
|
|
|
|
if hours > 0:
|
|
|
|
|
return f"{hours}h {minutes}m"
|
|
|
|
|
elif minutes > 0:
|
|
|
|
|
return f"{minutes}m {secs}s"
|
|
|
|
|
else:
|
|
|
|
|
return f"{secs}s"
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
return "?"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_total_steps(steps) -> int | str:
|
|
|
|
|
"""Return total planned steps from pipeline.steps (may be list or raw JSON string)."""
|
|
|
|
|
if steps is None:
|
|
|
|
|
return '?'
|
|
|
|
|
if isinstance(steps, list):
|
|
|
|
|
return len(steps)
|
|
|
|
|
# Fallback: raw JSON string (legacy or unexpected path)
|
|
|
|
|
if isinstance(steps, str):
|
|
|
|
|
try:
|
|
|
|
|
parsed = json.loads(steps)
|
|
|
|
|
if isinstance(parsed, list):
|
|
|
|
|
return len(parsed)
|
|
|
|
|
except (json.JSONDecodeError, ValueError):
|
|
|
|
|
pass
|
|
|
|
|
return '?'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _render_watch(
|
|
|
|
|
task: dict,
|
|
|
|
|
pipeline: dict | None,
|
|
|
|
|
log: dict | None,
|
|
|
|
|
step_num: int,
|
|
|
|
|
total_steps: int | str,
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Render the watch screen to stdout."""
|
|
|
|
|
sep = '─' * 60
|
|
|
|
|
title = task['title'] or ''
|
|
|
|
|
title_short = (title[:50] + '…') if len(title) > 50 else title
|
|
|
|
|
|
|
|
|
|
print(f"kin watch {task['id']}")
|
|
|
|
|
print(sep)
|
|
|
|
|
print(f"Task: {task['id']} {title_short}")
|
|
|
|
|
print(f"Status: {task['status']}")
|
|
|
|
|
|
|
|
|
|
if pipeline:
|
|
|
|
|
pid_str = str(pipeline['pid']) if pipeline.get('pid') else '-'
|
|
|
|
|
print(f"Pipeline: #{pipeline['id']} [{pipeline['status']}] PID: {pid_str}")
|
|
|
|
|
elapsed = _format_elapsed(pipeline['created_at'])
|
|
|
|
|
print(f"Elapsed: {elapsed} | Agents run: {step_num} / Total planned: {total_steps}")
|
|
|
|
|
print()
|
|
|
|
|
current = log['agent_role'] if log else 'Waiting for first agent...'
|
|
|
|
|
print(f"Current agent: {current}")
|
|
|
|
|
else:
|
|
|
|
|
print()
|
|
|
|
|
print("No pipeline started yet.")
|
|
|
|
|
|
|
|
|
|
print(sep)
|
|
|
|
|
|
|
|
|
|
if log and log.get('output_summary'):
|
|
|
|
|
lines = (log['output_summary'] or '').splitlines()
|
|
|
|
|
for line in lines[-15:]:
|
|
|
|
|
print(line)
|
|
|
|
|
elif pipeline:
|
2026-03-17 18:26:43 +02:00
|
|
|
print("No output yet.")
|
2026-03-17 17:26:31 +02:00
|
|
|
|
|
|
|
|
print(sep)
|
|
|
|
|
|
|
|
|
|
terminal_statuses = ('completed', 'failed', 'cancelled')
|
|
|
|
|
if pipeline and pipeline['status'] in terminal_statuses:
|
|
|
|
|
print(f"[Pipeline {pipeline['status']}. Exiting.]")
|
|
|
|
|
else:
|
|
|
|
|
print("[Updating every 5s. Ctrl+C to stop]")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _render_ps(rows: list[dict]) -> None:
|
|
|
|
|
"""Render the running pipelines table to stdout."""
|
|
|
|
|
if not rows:
|
|
|
|
|
print("No running pipelines.")
|
|
|
|
|
print("\nUse 'kin watch <task_id>' to monitor.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
print(f"Running pipelines: {len(rows)}\n")
|
|
|
|
|
|
|
|
|
|
headers = ["ID", "TASK", "PROJECT", "PID", "STARTED", "AGENT", "PARENT"]
|
|
|
|
|
col_widths = [len(h) for h in headers]
|
|
|
|
|
|
|
|
|
|
formatted_rows = []
|
|
|
|
|
for r in rows:
|
|
|
|
|
title = r.get('title') or ''
|
|
|
|
|
title_short = (title[:20] + '…') if len(title) > 20 else title
|
|
|
|
|
pid_str = str(r['pid']) if r.get('pid') else '-'
|
|
|
|
|
started = (_format_elapsed(r['created_at']) + ' ago') if r.get('created_at') else '-'
|
|
|
|
|
agent = r.get('current_agent') or '-'
|
|
|
|
|
parent = str(r['parent_pipeline_id']) if r.get('parent_pipeline_id') else '-'
|
|
|
|
|
row = [f"#{r['id']}", title_short, r.get('project_name') or '-',
|
|
|
|
|
pid_str, started, agent, parent]
|
|
|
|
|
formatted_rows.append(row)
|
|
|
|
|
for i, cell in enumerate(row):
|
|
|
|
|
col_widths[i] = max(col_widths[i], len(str(cell)))
|
|
|
|
|
|
|
|
|
|
fmt = ' '.join(f'{{:<{w}}}' for w in col_widths)
|
|
|
|
|
print(fmt.format(*headers))
|
|
|
|
|
print(fmt.format(*('-' * w for w in col_widths)))
|
|
|
|
|
for row in formatted_rows:
|
|
|
|
|
print(fmt.format(*row))
|
|
|
|
|
|
|
|
|
|
print("\nUse 'kin watch <task_id>' to monitor.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cmd_watch(conn, task_id: str) -> None:
|
|
|
|
|
"""Polling monitor for a task. Updates every 5s. Exits on terminal pipeline state."""
|
|
|
|
|
task = models.get_task(conn, task_id)
|
|
|
|
|
if not task:
|
|
|
|
|
print(f"Task '{task_id}' not found.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
while True:
|
2026-03-17 18:26:43 +02:00
|
|
|
task = models.get_task(conn, task_id)
|
2026-03-17 17:26:31 +02:00
|
|
|
_clear_screen()
|
|
|
|
|
pipeline = models.get_pipeline_for_watch(conn, task_id)
|
|
|
|
|
|
|
|
|
|
if pipeline:
|
|
|
|
|
log = models.get_current_agent_log(conn, task_id, pipeline['created_at'])
|
2026-03-17 18:23:04 +02:00
|
|
|
step_num = models.count_agent_logs_since(conn, task_id, pipeline['created_at'])
|
2026-03-17 17:26:31 +02:00
|
|
|
total = _parse_total_steps(pipeline.get('steps'))
|
|
|
|
|
else:
|
|
|
|
|
log, step_num, total = None, 0, '?'
|
|
|
|
|
|
|
|
|
|
_render_watch(task, pipeline, log, step_num, total)
|
|
|
|
|
|
|
|
|
|
if pipeline and pipeline['status'] in ('completed', 'failed', 'cancelled'):
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
print('\nExiting watch mode.')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cmd_ps(conn) -> None:
|
|
|
|
|
"""One-shot list of all running pipelines."""
|
|
|
|
|
rows = models.get_all_running_pipelines(conn)
|
|
|
|
|
_render_ps(rows)
|