kin/tests/test_kin_092_watch_ps.py
2026-03-17 18:29:32 +02:00

465 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for KIN-092: cli/watch.py — kin watch and kin ps commands.
Покрывает:
- _format_elapsed: секунды, минуты, часы, ноль, отрицательное, невалидное
- _parse_total_steps: list, JSON-строка, None, невалидный JSON, не-list JSON
- get_pipeline_for_watch: только top-level, самый свежий, None если нет
- get_current_agent_log: последний лог, фильтрация по дате пайплайна
- get_all_running_pipelines: только running, PID=NULL, current_agent
- _render_ps: пустой список, PID=NULL → '-', обрезка title
- _render_watch: нет пайплайна, PID=NULL → '-', terminal status, output lines
- cmd_watch: несуществующий task_id → понятная ошибка, sleep не вызывается
"""
import pytest
from datetime import datetime, timedelta, timezone
from unittest.mock import patch
from core.db import init_db
from core import models
from cli.watch import (
_format_elapsed,
_parse_total_steps,
_render_watch,
_render_ps,
cmd_watch,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB for each test."""
c = init_db(db_path=":memory:")
yield c
c.close()
@pytest.fixture
def project_and_task(conn):
"""Create a project and a task; return (project, task)."""
proj = models.create_project(conn, "p1", "Project One", "/path/one", tech_stack=["python"])
task = models.create_task(conn, "KIN-092-T1", "p1", "Watch task")
return proj, task
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _call_format_elapsed(dt_iso: str, elapsed_seconds: int) -> str:
"""Call _format_elapsed with datetime.now(timezone.utc) frozen to (started + elapsed_seconds)."""
normalized = dt_iso.replace(' ', 'T') if ' ' in dt_iso else dt_iso
started = datetime.fromisoformat(normalized)
frozen_now = started + timedelta(seconds=elapsed_seconds)
with patch("cli.watch.datetime") as mock_dt:
mock_dt.fromisoformat = datetime.fromisoformat
# New code uses datetime.now(timezone.utc).replace(tzinfo=None)
mock_dt.now.return_value = frozen_now.replace(tzinfo=timezone.utc)
return _format_elapsed(dt_iso)
# ---------------------------------------------------------------------------
# _format_elapsed
# ---------------------------------------------------------------------------
def test_format_elapsed_seconds():
assert _call_format_elapsed("2026-03-17 12:00:00", 45) == "45s"
def test_format_elapsed_exactly_one_minute():
assert _call_format_elapsed("2026-03-17 12:00:00", 60) == "1m 0s"
def test_format_elapsed_minutes_and_seconds():
assert _call_format_elapsed("2026-03-17 12:00:00", 90) == "1m 30s"
def test_format_elapsed_hours():
assert _call_format_elapsed("2026-03-17 12:00:00", 3661) == "1h 1m"
def test_format_elapsed_zero():
"""Нулевое время → '0s'."""
assert _call_format_elapsed("2026-03-17 12:00:00", 0) == "0s"
def test_format_elapsed_negative_clipped_to_zero():
"""Отрицательный elapsed (будущая дата) → '0s', не падает."""
assert _call_format_elapsed("2026-03-17 12:00:00", -10) == "0s"
def test_format_elapsed_sqlite_space_format():
"""SQLite хранит 'YYYY-MM-DD HH:MM:SS' через пробел — должно парситься."""
assert _call_format_elapsed("2026-03-17 10:30:00", 65) == "1m 5s"
def test_format_elapsed_invalid_string():
assert _format_elapsed("not-a-date") == "?"
def test_format_elapsed_none():
assert _format_elapsed(None) == "?"
# ---------------------------------------------------------------------------
# _parse_total_steps
# ---------------------------------------------------------------------------
def test_parse_total_steps_list():
assert _parse_total_steps(["pm", "dev", "qa"]) == 3
def test_parse_total_steps_empty_list():
assert _parse_total_steps([]) == 0
def test_parse_total_steps_json_string():
assert _parse_total_steps('["pm", "dev", "qa"]') == 3
def test_parse_total_steps_json_string_empty_array():
assert _parse_total_steps('[]') == 0
def test_parse_total_steps_none():
assert _parse_total_steps(None) == "?"
def test_parse_total_steps_invalid_json():
assert _parse_total_steps("not-json") == "?"
def test_parse_total_steps_json_non_list():
"""JSON-объект (не массив) → '?'."""
assert _parse_total_steps('{"key": "value"}') == "?"
# ---------------------------------------------------------------------------
# SQL: get_pipeline_for_watch
# ---------------------------------------------------------------------------
def test_get_pipeline_for_watch_returns_none_when_no_pipeline(conn, project_and_task):
_, task = project_and_task
result = models.get_pipeline_for_watch(conn, task["id"])
assert result is None
def test_get_pipeline_for_watch_returns_top_level_pipeline(conn, project_and_task):
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"])
result = models.get_pipeline_for_watch(conn, task["id"])
assert result is not None
assert result["id"] == pipeline["id"]
def test_get_pipeline_for_watch_excludes_sub_pipelines(conn, project_and_task):
"""Саб-пайплайны (parent_pipeline_id IS NOT NULL) должны игнорироваться."""
_, task = project_and_task
parent = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"])
# Создаём саб-пайплайн вручную
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, steps, status, parent_pipeline_id)"
" VALUES (?, ?, ?, ?, ?, ?)",
(task["id"], "p1", "sub", "[]", "running", parent["id"]),
)
conn.commit()
result = models.get_pipeline_for_watch(conn, task["id"])
assert result["id"] == parent["id"]
assert result.get("parent_pipeline_id") is None
def test_get_pipeline_for_watch_returns_most_recent(conn, project_and_task):
"""Если пайплайнов несколько — возвращает самый свежий."""
_, task = project_and_task
p1 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"])
p2 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"])
result = models.get_pipeline_for_watch(conn, task["id"])
assert result["id"] == p2["id"]
# ---------------------------------------------------------------------------
# SQL: get_current_agent_log
# ---------------------------------------------------------------------------
def test_get_current_agent_log_returns_none_when_no_logs(conn, project_and_task):
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"])
assert result is None
def test_get_current_agent_log_returns_most_recent(conn, project_and_task):
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"], output_summary="first run")
models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"], output_summary="second run")
result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"])
assert result["agent_role"] == "dev"
assert result["output_summary"] == "second run"
# ---------------------------------------------------------------------------
# SQL: get_all_running_pipelines
# ---------------------------------------------------------------------------
def test_get_all_running_pipelines_empty_when_no_pipelines(conn):
assert models.get_all_running_pipelines(conn) == []
def test_get_all_running_pipelines_returns_only_running(conn, project_and_task):
"""completed и failed пайплайны не должны попадать в kin ps."""
_, task = project_and_task
running = models.create_pipeline(conn, task["id"], "p1", "standard", [])
completed = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.update_pipeline(conn, completed["id"], status="completed")
failed = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.update_pipeline(conn, failed["id"], status="failed")
result = models.get_all_running_pipelines(conn)
ids = [r["id"] for r in result]
assert running["id"] in ids
assert completed["id"] not in ids
assert failed["id"] not in ids
def test_get_all_running_pipelines_pid_null_is_none(conn, project_and_task):
"""PID=NULL → решение #374: отображается как '-' (pid=None в dict)."""
_, task = project_and_task
models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_all_running_pipelines(conn)
assert len(result) == 1
assert result[0]["pid"] is None
def test_get_all_running_pipelines_includes_current_agent(conn, project_and_task):
"""current_agent = последний agent_role из agent_logs."""
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"])
models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"])
result = models.get_all_running_pipelines(conn)
assert result[0]["current_agent"] == "dev"
def test_get_all_running_pipelines_no_logs_current_agent_is_none(conn, project_and_task):
_, task = project_and_task
models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_all_running_pipelines(conn)
assert result[0]["current_agent"] is None
def test_get_all_running_pipelines_includes_project_and_task_info(conn, project_and_task):
proj, task = project_and_task
models.create_pipeline(conn, task["id"], "p1", "standard", [])
result = models.get_all_running_pipelines(conn)
assert result[0]["project_name"] == proj["name"]
assert result[0]["title"] == task["title"]
# ---------------------------------------------------------------------------
# _render_ps (stdout)
# ---------------------------------------------------------------------------
def test_render_ps_empty_list(capsys):
_render_ps([])
out = capsys.readouterr().out
assert "No running pipelines." in out
def test_render_ps_suggests_watch_command_when_empty(capsys):
_render_ps([])
out = capsys.readouterr().out
assert "kin watch" in out
def test_render_ps_pid_null_displayed_as_dash(capsys):
"""Decision #374: PID=NULL → '-'."""
rows = [{
"id": 1, "task_id": "KIN-001", "title": "Short title",
"project_name": "MyProject", "pid": None,
"created_at": "2026-03-17 12:00:00", "current_agent": "dev",
"parent_pipeline_id": None,
}]
with patch("cli.watch._format_elapsed", return_value="5s"):
_render_ps(rows)
out = capsys.readouterr().out
# PID column should contain dash, not None or empty
lines = [l for l in out.splitlines() if "Short title" in l or "#1" in l]
assert any("-" in l for l in lines)
def test_render_ps_title_truncated_at_20_chars(capsys):
long_title = "A" * 25
rows = [{
"id": 1, "task_id": "KIN-001", "title": long_title,
"project_name": "P", "pid": 1234,
"created_at": "2026-03-17 12:00:00", "current_agent": None,
"parent_pipeline_id": None,
}]
with patch("cli.watch._format_elapsed", return_value="1s"):
_render_ps(rows)
out = capsys.readouterr().out
assert "A" * 20 + "" in out
assert long_title not in out
def test_render_ps_with_pid_shows_pid(capsys):
rows = [{
"id": 2, "task_id": "KIN-002", "title": "Fix bug",
"project_name": "Proj", "pid": 9999,
"created_at": "2026-03-17 11:00:00", "current_agent": "pm",
"parent_pipeline_id": None,
}]
with patch("cli.watch._format_elapsed", return_value="10m"):
_render_ps(rows)
out = capsys.readouterr().out
assert "9999" in out
def test_render_ps_shows_running_count(capsys):
rows = [
{"id": 1, "task_id": "KIN-001", "title": "T1", "project_name": "P",
"pid": None, "created_at": "2026-03-17 12:00:00",
"current_agent": None, "parent_pipeline_id": None},
{"id": 2, "task_id": "KIN-002", "title": "T2", "project_name": "P",
"pid": 42, "created_at": "2026-03-17 12:01:00",
"current_agent": "dev", "parent_pipeline_id": None},
]
with patch("cli.watch._format_elapsed", return_value="1s"):
_render_ps(rows)
out = capsys.readouterr().out
assert "2" in out
# ---------------------------------------------------------------------------
# _render_watch (stdout)
# ---------------------------------------------------------------------------
def test_render_watch_no_pipeline(capsys):
task = {"id": "KIN-001", "title": "Test task", "status": "pending"}
_render_watch(task, None, None, 0, "?")
out = capsys.readouterr().out
assert "No pipeline started yet." in out
def test_render_watch_pipeline_null_pid_shows_dash(capsys):
"""PID=NULL → 'PID: -'."""
task = {"id": "KIN-001", "title": "Test task", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": None,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="30s"):
_render_watch(task, pipeline, None, 0, 3)
out = capsys.readouterr().out
assert "PID: -" in out
def test_render_watch_pipeline_with_pid(capsys):
task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 5678,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="10s"):
_render_watch(task, pipeline, None, 0, 2)
out = capsys.readouterr().out
assert "5678" in out
def test_render_watch_terminal_status_completed(capsys):
task = {"id": "KIN-001", "title": "Done task", "status": "done"}
pipeline = {
"id": 1, "status": "completed", "pid": 123,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="1m"):
_render_watch(task, pipeline, None, 2, 3)
out = capsys.readouterr().out
assert "[Pipeline completed. Exiting.]" in out
def test_render_watch_terminal_status_failed(capsys):
task = {"id": "KIN-001", "title": "Failed task", "status": "failed"}
pipeline = {
"id": 1, "status": "failed", "pid": 123,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="2m"):
_render_watch(task, pipeline, None, 1, 3)
out = capsys.readouterr().out
assert "[Pipeline failed. Exiting.]" in out
def test_render_watch_running_shows_update_hint(capsys):
task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 42,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="10s"):
_render_watch(task, pipeline, None, 0, 2)
out = capsys.readouterr().out
assert "[Updating every 5s. Ctrl+C to stop]" in out
def test_render_watch_truncates_long_title(capsys):
long_title = "B" * 55
task = {"id": "KIN-001", "title": long_title, "status": "pending"}
_render_watch(task, None, None, 0, "?")
out = capsys.readouterr().out
assert "" in out
assert long_title not in out
def test_render_watch_shows_last_15_output_lines(capsys):
"""Output truncated к последним 15 строкам (аналог tail -f)."""
task = {"id": "KIN-001", "title": "T", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 1,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
lines = [f"line{i}" for i in range(20)]
log = {"agent_role": "dev", "output_summary": "\n".join(lines)}
with patch("cli.watch._format_elapsed", return_value="1s"):
_render_watch(task, pipeline, log, 1, 3)
out = capsys.readouterr().out
assert "line19" in out # последняя строка — должна быть
assert "line5" in out # line5 = 6-я с начала, входит в последние 15
assert "line4" not in out # line4 = 5-я с начала, отсекается
def test_render_watch_waiting_message_when_no_log(capsys):
"""Нет логов агента → 'Waiting for first agent...'."""
task = {"id": "KIN-001", "title": "New task", "status": "pending"}
pipeline = {
"id": 1, "status": "running", "pid": None,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="0s"):
_render_watch(task, pipeline, None, 0, "?")
out = capsys.readouterr().out
assert "Waiting for first agent" in out
# ---------------------------------------------------------------------------
# cmd_watch: nonexistent task_id
# ---------------------------------------------------------------------------
def test_cmd_watch_nonexistent_task_prints_error(conn, capsys):
cmd_watch(conn, "KIN-NONEXISTENT")
out = capsys.readouterr().out
assert "KIN-NONEXISTENT" in out
assert "not found" in out.lower()
def test_cmd_watch_nonexistent_task_does_not_sleep(conn):
"""cmd_watch должен вернуться немедленно, без вызова time.sleep."""
with patch("cli.watch.time.sleep") as mock_sleep:
cmd_watch(conn, "KIN-NONEXISTENT")
mock_sleep.assert_not_called()