kin/tests/test_kin_092_watch_ps.py

465 lines
18 KiB
Python
Raw Normal View History

2026-03-17 17:26:31 +02:00
"""Tests for KIN-092: cli/watch.py — kin watch and kin ps commands.
Покрывает:
- _format_elapsed: секунды, минуты, часы, ноль, отрицательное, невалидное
- _parse_total_steps: list, JSON-строка, None, невалидный JSON, не-list JSON
- get_pipeline_for_watch: только top-level, самый свежий, None если нет
- get_current_agent_log: последний лог, фильтрация по дате пайплайна
- get_all_running_pipelines: только running, PID=NULL, current_agent
- _render_ps: пустой список, PID=NULL '-', обрезка title
- _render_watch: нет пайплайна, PID=NULL '-', terminal status, output lines
- cmd_watch: несуществующий task_id понятная ошибка, sleep не вызывается
"""
import pytest
from datetime import datetime, timedelta
from unittest.mock import patch
from core.db import init_db
from core import models
from cli.watch import (
_format_elapsed,
_parse_total_steps,
_render_watch,
_render_ps,
cmd_watch,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB for each test."""
c = init_db(db_path=":memory:")
yield c
c.close()
@pytest.fixture
def project_and_task(conn):
"""Create a project and a task; return (project, task)."""
proj = models.create_project(conn, "p1", "Project One", "/path/one", tech_stack=["python"])
task = models.create_task(conn, "KIN-092-T1", "p1", "Watch task")
return proj, task
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _call_format_elapsed(dt_iso: str, elapsed_seconds: int) -> str:
"""Call _format_elapsed with datetime.utcnow frozen to (started + elapsed_seconds)."""
normalized = dt_iso.replace(' ', 'T') if ' ' in dt_iso else dt_iso
started = datetime.fromisoformat(normalized)
frozen_now = started + timedelta(seconds=elapsed_seconds)
with patch("cli.watch.datetime") as mock_dt:
mock_dt.fromisoformat = datetime.fromisoformat
mock_dt.utcnow.return_value = frozen_now
return _format_elapsed(dt_iso)
# ---------------------------------------------------------------------------
# _format_elapsed
# ---------------------------------------------------------------------------
def test_format_elapsed_seconds():
assert _call_format_elapsed("2026-03-17 12:00:00", 45) == "45s"
def test_format_elapsed_exactly_one_minute():
assert _call_format_elapsed("2026-03-17 12:00:00", 60) == "1m 0s"
def test_format_elapsed_minutes_and_seconds():
assert _call_format_elapsed("2026-03-17 12:00:00", 90) == "1m 30s"
def test_format_elapsed_hours():
assert _call_format_elapsed("2026-03-17 12:00:00", 3661) == "1h 1m"
def test_format_elapsed_zero():
"""Нулевое время → '0s'."""
assert _call_format_elapsed("2026-03-17 12:00:00", 0) == "0s"
def test_format_elapsed_negative_clipped_to_zero():
"""Отрицательный elapsed (будущая дата) → '0s', не падает."""
assert _call_format_elapsed("2026-03-17 12:00:00", -10) == "0s"
def test_format_elapsed_sqlite_space_format():
"""SQLite хранит 'YYYY-MM-DD HH:MM:SS' через пробел — должно парситься."""
assert _call_format_elapsed("2026-03-17 10:30:00", 65) == "1m 5s"
def test_format_elapsed_invalid_string():
assert _format_elapsed("not-a-date") == "?"
def test_format_elapsed_none():
assert _format_elapsed(None) == "?"
# ---------------------------------------------------------------------------
# _parse_total_steps
# ---------------------------------------------------------------------------
def test_parse_total_steps_list():
assert _parse_total_steps(["pm", "dev", "qa"]) == 3
def test_parse_total_steps_empty_list():
assert _parse_total_steps([]) == 0
def test_parse_total_steps_json_string():
assert _parse_total_steps('["pm", "dev", "qa"]') == 3
def test_parse_total_steps_json_string_empty_array():
assert _parse_total_steps('[]') == 0
def test_parse_total_steps_none():
assert _parse_total_steps(None) == "?"
def test_parse_total_steps_invalid_json():
assert _parse_total_steps("not-json") == "?"
def test_parse_total_steps_json_non_list():
"""JSON-объект (не массив) → '?'."""
assert _parse_total_steps('{"key": "value"}') == "?"
# ---------------------------------------------------------------------------
# SQL: get_pipeline_for_watch
# ---------------------------------------------------------------------------
def test_get_pipeline_for_watch_returns_none_when_no_pipeline(conn, project_and_task):
_, task = project_and_task
result = models.get_pipeline_for_watch(conn, task["id"])
assert result is None
def test_get_pipeline_for_watch_returns_top_level_pipeline(conn, project_and_task):
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"])
result = models.get_pipeline_for_watch(conn, task["id"])
assert result is not None
assert result["id"] == pipeline["id"]
def test_get_pipeline_for_watch_excludes_sub_pipelines(conn, project_and_task):
"""Саб-пайплайны (parent_pipeline_id IS NOT NULL) должны игнорироваться."""
_, task = project_and_task
parent = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"])
# Создаём саб-пайплайн вручную
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, steps, status, parent_pipeline_id)"
" VALUES (?, ?, ?, ?, ?, ?)",
(task["id"], "p1", "sub", "[]", "running", parent["id"]),
)
conn.commit()
result = models.get_pipeline_for_watch(conn, task["id"])
assert result["id"] == parent["id"]
assert result.get("parent_pipeline_id") is None
def test_get_pipeline_for_watch_returns_most_recent(conn, project_and_task):
"""Если пайплайнов несколько — возвращает самый свежий."""
_, task = project_and_task
p1 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"])
p2 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"])
result = models.get_pipeline_for_watch(conn, task["id"])
assert result["id"] == p2["id"]
# ---------------------------------------------------------------------------
# SQL: get_current_agent_log
# ---------------------------------------------------------------------------
def test_get_current_agent_log_returns_none_when_no_logs(conn, project_and_task):
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"])
assert result is None
def test_get_current_agent_log_returns_most_recent(conn, project_and_task):
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"], output_summary="first run")
models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"], output_summary="second run")
result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"])
assert result["agent_role"] == "dev"
assert result["output_summary"] == "second run"
# ---------------------------------------------------------------------------
# SQL: get_all_running_pipelines
# ---------------------------------------------------------------------------
def test_get_all_running_pipelines_empty_when_no_pipelines(conn):
assert models.get_all_running_pipelines(conn) == []
def test_get_all_running_pipelines_returns_only_running(conn, project_and_task):
"""completed и failed пайплайны не должны попадать в kin ps."""
_, task = project_and_task
running = models.create_pipeline(conn, task["id"], "p1", "standard", [])
completed = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.update_pipeline(conn, completed["id"], status="completed")
failed = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.update_pipeline(conn, failed["id"], status="failed")
result = models.get_all_running_pipelines(conn)
ids = [r["id"] for r in result]
assert running["id"] in ids
assert completed["id"] not in ids
assert failed["id"] not in ids
def test_get_all_running_pipelines_pid_null_is_none(conn, project_and_task):
"""PID=NULL → решение #374: отображается как '-' (pid=None в dict)."""
_, task = project_and_task
models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_all_running_pipelines(conn)
assert len(result) == 1
assert result[0]["pid"] is None
def test_get_all_running_pipelines_includes_current_agent(conn, project_and_task):
"""current_agent = последний agent_role из agent_logs."""
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"])
models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"])
result = models.get_all_running_pipelines(conn)
assert result[0]["current_agent"] == "dev"
def test_get_all_running_pipelines_no_logs_current_agent_is_none(conn, project_and_task):
_, task = project_and_task
models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_all_running_pipelines(conn)
assert result[0]["current_agent"] is None
def test_get_all_running_pipelines_includes_project_and_task_info(conn, project_and_task):
proj, task = project_and_task
models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_all_running_pipelines(conn)
assert result[0]["project_name"] == proj["name"]
assert result[0]["title"] == task["title"]
# ---------------------------------------------------------------------------
# _render_ps (stdout)
# ---------------------------------------------------------------------------
def test_render_ps_empty_list(capsys):
_render_ps([])
out = capsys.readouterr().out
assert "No running pipelines." in out
def test_render_ps_suggests_watch_command_when_empty(capsys):
_render_ps([])
out = capsys.readouterr().out
assert "kin watch" in out
def test_render_ps_pid_null_displayed_as_dash(capsys):
"""Decision #374: PID=NULL → '-'."""
rows = [{
"id": 1, "task_id": "KIN-001", "title": "Short title",
"project_name": "MyProject", "pid": None,
"created_at": "2026-03-17 12:00:00", "current_agent": "dev",
"parent_pipeline_id": None,
}]
with patch("cli.watch._format_elapsed", return_value="5s"):
_render_ps(rows)
out = capsys.readouterr().out
# PID column should contain dash, not None or empty
lines = [l for l in out.splitlines() if "Short title" in l or "#1" in l]
assert any("-" in l for l in lines)
def test_render_ps_title_truncated_at_20_chars(capsys):
long_title = "A" * 25
rows = [{
"id": 1, "task_id": "KIN-001", "title": long_title,
"project_name": "P", "pid": 1234,
"created_at": "2026-03-17 12:00:00", "current_agent": None,
"parent_pipeline_id": None,
}]
with patch("cli.watch._format_elapsed", return_value="1s"):
_render_ps(rows)
out = capsys.readouterr().out
assert "A" * 20 + "" in out
assert long_title not in out
def test_render_ps_with_pid_shows_pid(capsys):
rows = [{
"id": 2, "task_id": "KIN-002", "title": "Fix bug",
"project_name": "Proj", "pid": 9999,
"created_at": "2026-03-17 11:00:00", "current_agent": "pm",
"parent_pipeline_id": None,
}]
with patch("cli.watch._format_elapsed", return_value="10m"):
_render_ps(rows)
out = capsys.readouterr().out
assert "9999" in out
def test_render_ps_shows_running_count(capsys):
rows = [
{"id": 1, "task_id": "KIN-001", "title": "T1", "project_name": "P",
"pid": None, "created_at": "2026-03-17 12:00:00",
"current_agent": None, "parent_pipeline_id": None},
{"id": 2, "task_id": "KIN-002", "title": "T2", "project_name": "P",
"pid": 42, "created_at": "2026-03-17 12:01:00",
"current_agent": "dev", "parent_pipeline_id": None},
]
with patch("cli.watch._format_elapsed", return_value="1s"):
_render_ps(rows)
out = capsys.readouterr().out
assert "2" in out
# ---------------------------------------------------------------------------
# _render_watch (stdout)
# ---------------------------------------------------------------------------
def test_render_watch_no_pipeline(capsys):
task = {"id": "KIN-001", "title": "Test task", "status": "pending"}
_render_watch(task, None, None, 0, "?")
out = capsys.readouterr().out
assert "No pipeline started yet." in out
def test_render_watch_pipeline_null_pid_shows_dash(capsys):
"""PID=NULL → 'PID: -'."""
task = {"id": "KIN-001", "title": "Test task", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": None,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="30s"):
_render_watch(task, pipeline, None, 0, 3)
out = capsys.readouterr().out
assert "PID: -" in out
def test_render_watch_pipeline_with_pid(capsys):
task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 5678,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="10s"):
_render_watch(task, pipeline, None, 0, 2)
out = capsys.readouterr().out
assert "5678" in out
def test_render_watch_terminal_status_completed(capsys):
task = {"id": "KIN-001", "title": "Done task", "status": "done"}
pipeline = {
"id": 1, "status": "completed", "pid": 123,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="1m"):
_render_watch(task, pipeline, None, 2, 3)
out = capsys.readouterr().out
assert "[Pipeline completed. Exiting.]" in out
def test_render_watch_terminal_status_failed(capsys):
task = {"id": "KIN-001", "title": "Failed task", "status": "failed"}
pipeline = {
"id": 1, "status": "failed", "pid": 123,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="2m"):
_render_watch(task, pipeline, None, 1, 3)
out = capsys.readouterr().out
assert "[Pipeline failed. Exiting.]" in out
def test_render_watch_running_shows_update_hint(capsys):
task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 42,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="10s"):
_render_watch(task, pipeline, None, 0, 2)
out = capsys.readouterr().out
assert "[Updating every 5s. Ctrl+C to stop]" in out
def test_render_watch_truncates_long_title(capsys):
long_title = "B" * 55
task = {"id": "KIN-001", "title": long_title, "status": "pending"}
_render_watch(task, None, None, 0, "?")
out = capsys.readouterr().out
assert "" in out
assert long_title not in out
def test_render_watch_shows_last_15_output_lines(capsys):
"""Output truncated к последним 15 строкам (аналог tail -f)."""
task = {"id": "KIN-001", "title": "T", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 1,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
lines = [f"line{i}" for i in range(20)]
log = {"agent_role": "dev", "output_summary": "\n".join(lines)}
with patch("cli.watch._format_elapsed", return_value="1s"):
_render_watch(task, pipeline, log, 1, 3)
out = capsys.readouterr().out
assert "line19" in out # последняя строка — должна быть
assert "line5" in out # line5 = 6-я с начала, входит в последние 15
assert "line4" not in out # line4 = 5-я с начала, отсекается
def test_render_watch_waiting_message_when_no_log(capsys):
"""Нет логов агента → 'Waiting for first agent...'."""
task = {"id": "KIN-001", "title": "New task", "status": "pending"}
pipeline = {
"id": 1, "status": "running", "pid": None,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="0s"):
_render_watch(task, pipeline, None, 0, "?")
out = capsys.readouterr().out
assert "Waiting for first agent" in out
# ---------------------------------------------------------------------------
# cmd_watch: nonexistent task_id
# ---------------------------------------------------------------------------
def test_cmd_watch_nonexistent_task_prints_error(conn, capsys):
cmd_watch(conn, "KIN-NONEXISTENT")
out = capsys.readouterr().out
assert "KIN-NONEXISTENT" in out
assert "not found" in out.lower()
def test_cmd_watch_nonexistent_task_does_not_sleep(conn):
"""cmd_watch должен вернуться немедленно, без вызова time.sleep."""
with patch("cli.watch.time.sleep") as mock_sleep:
cmd_watch(conn, "KIN-NONEXISTENT")
mock_sleep.assert_not_called()