kin/cli/watch.py

178 lines
5.7 KiB
Python
Raw Normal View History

2026-03-17 17:26:31 +02:00
"""
Kin CLI kin watch <task_id> and kin ps commands.
kin watch: polling monitor, updates every 5s (like tail -f for a pipeline).
kin ps: one-shot list of all running pipelines with PID and current step.
"""
import json
import time
from datetime import datetime
from core import models
def _clear_screen() -> None:
print('\033[2J\033[H', end='', flush=True)
def _format_elapsed(dt_iso: str) -> str:
"""Format elapsed time from SQLite CURRENT_TIMESTAMP (UTC naive, space separator)."""
try:
# SQLite stores "YYYY-MM-DD HH:MM:SS"; fromisoformat requires 'T' in Python < 3.11
normalized = dt_iso.replace(' ', 'T') if ' ' in dt_iso else dt_iso
started = datetime.fromisoformat(normalized)
total_secs = int((datetime.utcnow() - started).total_seconds())
if total_secs < 0:
total_secs = 0
hours = total_secs // 3600
minutes = (total_secs % 3600) // 60
secs = total_secs % 60
if hours > 0:
return f"{hours}h {minutes}m"
elif minutes > 0:
return f"{minutes}m {secs}s"
else:
return f"{secs}s"
except (ValueError, TypeError):
return "?"
def _parse_total_steps(steps) -> int | str:
"""Return total planned steps from pipeline.steps (may be list or raw JSON string)."""
if steps is None:
return '?'
if isinstance(steps, list):
return len(steps)
# Fallback: raw JSON string (legacy or unexpected path)
if isinstance(steps, str):
try:
parsed = json.loads(steps)
if isinstance(parsed, list):
return len(parsed)
except (json.JSONDecodeError, ValueError):
pass
return '?'
def _render_watch(
task: dict,
pipeline: dict | None,
log: dict | None,
step_num: int,
total_steps: int | str,
) -> None:
"""Render the watch screen to stdout."""
sep = '' * 60
title = task['title'] or ''
title_short = (title[:50] + '') if len(title) > 50 else title
print(f"kin watch {task['id']}")
print(sep)
print(f"Task: {task['id']} {title_short}")
print(f"Status: {task['status']}")
if pipeline:
pid_str = str(pipeline['pid']) if pipeline.get('pid') else '-'
print(f"Pipeline: #{pipeline['id']} [{pipeline['status']}] PID: {pid_str}")
elapsed = _format_elapsed(pipeline['created_at'])
print(f"Elapsed: {elapsed} | Agents run: {step_num} / Total planned: {total_steps}")
print()
current = log['agent_role'] if log else 'Waiting for first agent...'
print(f"Current agent: {current}")
else:
print()
print("No pipeline started yet.")
print(sep)
if log and log.get('output_summary'):
lines = (log['output_summary'] or '').splitlines()
for line in lines[-15:]:
print(line)
elif pipeline:
print("Waiting for first agent...")
print(sep)
terminal_statuses = ('completed', 'failed', 'cancelled')
if pipeline and pipeline['status'] in terminal_statuses:
print(f"[Pipeline {pipeline['status']}. Exiting.]")
else:
print("[Updating every 5s. Ctrl+C to stop]")
def _render_ps(rows: list[dict]) -> None:
"""Render the running pipelines table to stdout."""
if not rows:
print("No running pipelines.")
print("\nUse 'kin watch <task_id>' to monitor.")
return
print(f"Running pipelines: {len(rows)}\n")
headers = ["ID", "TASK", "PROJECT", "PID", "STARTED", "AGENT", "PARENT"]
col_widths = [len(h) for h in headers]
formatted_rows = []
for r in rows:
title = r.get('title') or ''
title_short = (title[:20] + '') if len(title) > 20 else title
pid_str = str(r['pid']) if r.get('pid') else '-'
started = (_format_elapsed(r['created_at']) + ' ago') if r.get('created_at') else '-'
agent = r.get('current_agent') or '-'
parent = str(r['parent_pipeline_id']) if r.get('parent_pipeline_id') else '-'
row = [f"#{r['id']}", title_short, r.get('project_name') or '-',
pid_str, started, agent, parent]
formatted_rows.append(row)
for i, cell in enumerate(row):
col_widths[i] = max(col_widths[i], len(str(cell)))
fmt = ' '.join(f'{{:<{w}}}' for w in col_widths)
print(fmt.format(*headers))
print(fmt.format(*('-' * w for w in col_widths)))
for row in formatted_rows:
print(fmt.format(*row))
print("\nUse 'kin watch <task_id>' to monitor.")
def cmd_watch(conn, task_id: str) -> None:
"""Polling monitor for a task. Updates every 5s. Exits on terminal pipeline state."""
import sqlite3
task = models.get_task(conn, task_id)
if not task:
print(f"Task '{task_id}' not found.")
return
try:
while True:
_clear_screen()
pipeline = models.get_pipeline_for_watch(conn, task_id)
if pipeline:
log = models.get_current_agent_log(conn, task_id, pipeline['created_at'])
step_num = conn.execute(
'SELECT COUNT(*) FROM agent_logs WHERE task_id = ? AND created_at >= ?',
(task_id, pipeline['created_at']),
).fetchone()[0]
total = _parse_total_steps(pipeline.get('steps'))
else:
log, step_num, total = None, 0, '?'
_render_watch(task, pipeline, log, step_num, total)
if pipeline and pipeline['status'] in ('completed', 'failed', 'cancelled'):
break
time.sleep(5)
except KeyboardInterrupt:
print('\nExiting watch mode.')
def cmd_ps(conn) -> None:
"""One-shot list of all running pipelines."""
rows = models.get_all_running_pipelines(conn)
_render_ps(rows)