"""Tests for KIN-092: cli/watch.py — kin watch and kin ps commands. Покрывает: - _format_elapsed: секунды, минуты, часы, ноль, отрицательное, невалидное - _parse_total_steps: list, JSON-строка, None, невалидный JSON, не-list JSON - get_pipeline_for_watch: только top-level, самый свежий, None если нет - get_current_agent_log: последний лог, фильтрация по дате пайплайна - get_all_running_pipelines: только running, PID=NULL, current_agent - _render_ps: пустой список, PID=NULL → '-', обрезка title - _render_watch: нет пайплайна, PID=NULL → '-', terminal status, output lines - cmd_watch: несуществующий task_id → понятная ошибка, sleep не вызывается """ import pytest from datetime import datetime, timedelta from unittest.mock import patch from core.db import init_db from core import models from cli.watch import ( _format_elapsed, _parse_total_steps, _render_watch, _render_ps, cmd_watch, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def conn(): """Fresh in-memory DB for each test.""" c = init_db(db_path=":memory:") yield c c.close() @pytest.fixture def project_and_task(conn): """Create a project and a task; return (project, task).""" proj = models.create_project(conn, "p1", "Project One", "/path/one", tech_stack=["python"]) task = models.create_task(conn, "KIN-092-T1", "p1", "Watch task") return proj, task # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _call_format_elapsed(dt_iso: str, elapsed_seconds: int) -> str: """Call _format_elapsed with datetime.utcnow frozen to (started + elapsed_seconds).""" normalized = dt_iso.replace(' ', 'T') if ' ' in dt_iso else dt_iso started = datetime.fromisoformat(normalized) frozen_now = started + timedelta(seconds=elapsed_seconds) with patch("cli.watch.datetime") as mock_dt: mock_dt.fromisoformat = datetime.fromisoformat mock_dt.utcnow.return_value = frozen_now return _format_elapsed(dt_iso) # --------------------------------------------------------------------------- # _format_elapsed # --------------------------------------------------------------------------- def test_format_elapsed_seconds(): assert _call_format_elapsed("2026-03-17 12:00:00", 45) == "45s" def test_format_elapsed_exactly_one_minute(): assert _call_format_elapsed("2026-03-17 12:00:00", 60) == "1m 0s" def test_format_elapsed_minutes_and_seconds(): assert _call_format_elapsed("2026-03-17 12:00:00", 90) == "1m 30s" def test_format_elapsed_hours(): assert _call_format_elapsed("2026-03-17 12:00:00", 3661) == "1h 1m" def test_format_elapsed_zero(): """Нулевое время → '0s'.""" assert _call_format_elapsed("2026-03-17 12:00:00", 0) == "0s" def test_format_elapsed_negative_clipped_to_zero(): """Отрицательный elapsed (будущая дата) → '0s', не падает.""" assert _call_format_elapsed("2026-03-17 12:00:00", -10) == "0s" def test_format_elapsed_sqlite_space_format(): """SQLite хранит 'YYYY-MM-DD HH:MM:SS' через пробел — должно парситься.""" assert _call_format_elapsed("2026-03-17 10:30:00", 65) == "1m 5s" def test_format_elapsed_invalid_string(): assert _format_elapsed("not-a-date") == "?" def test_format_elapsed_none(): assert _format_elapsed(None) == "?" # --------------------------------------------------------------------------- # _parse_total_steps # --------------------------------------------------------------------------- def test_parse_total_steps_list(): assert _parse_total_steps(["pm", "dev", "qa"]) == 3 def test_parse_total_steps_empty_list(): assert _parse_total_steps([]) == 0 def test_parse_total_steps_json_string(): assert _parse_total_steps('["pm", "dev", "qa"]') == 3 def test_parse_total_steps_json_string_empty_array(): assert _parse_total_steps('[]') == 0 def test_parse_total_steps_none(): assert _parse_total_steps(None) == "?" def test_parse_total_steps_invalid_json(): assert _parse_total_steps("not-json") == "?" def test_parse_total_steps_json_non_list(): """JSON-объект (не массив) → '?'.""" assert _parse_total_steps('{"key": "value"}') == "?" # --------------------------------------------------------------------------- # SQL: get_pipeline_for_watch # --------------------------------------------------------------------------- def test_get_pipeline_for_watch_returns_none_when_no_pipeline(conn, project_and_task): _, task = project_and_task result = models.get_pipeline_for_watch(conn, task["id"]) assert result is None def test_get_pipeline_for_watch_returns_top_level_pipeline(conn, project_and_task): _, task = project_and_task pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"]) result = models.get_pipeline_for_watch(conn, task["id"]) assert result is not None assert result["id"] == pipeline["id"] def test_get_pipeline_for_watch_excludes_sub_pipelines(conn, project_and_task): """Саб-пайплайны (parent_pipeline_id IS NOT NULL) должны игнорироваться.""" _, task = project_and_task parent = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"]) # Создаём саб-пайплайн вручную conn.execute( "INSERT INTO pipelines (task_id, project_id, route_type, steps, status, parent_pipeline_id)" " VALUES (?, ?, ?, ?, ?, ?)", (task["id"], "p1", "sub", "[]", "running", parent["id"]), ) conn.commit() result = models.get_pipeline_for_watch(conn, task["id"]) assert result["id"] == parent["id"] assert result.get("parent_pipeline_id") is None def test_get_pipeline_for_watch_returns_most_recent(conn, project_and_task): """Если пайплайнов несколько — возвращает самый свежий.""" _, task = project_and_task p1 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm"]) p2 = models.create_pipeline(conn, task["id"], "p1", "standard", ["pm", "dev"]) result = models.get_pipeline_for_watch(conn, task["id"]) assert result["id"] == p2["id"] # --------------------------------------------------------------------------- # SQL: get_current_agent_log # --------------------------------------------------------------------------- def test_get_current_agent_log_returns_none_when_no_logs(conn, project_and_task): _, task = project_and_task pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", []) result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"]) assert result is None def test_get_current_agent_log_returns_most_recent(conn, project_and_task): _, task = project_and_task pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", []) models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"], output_summary="first run") models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"], output_summary="second run") result = models.get_current_agent_log(conn, task["id"], pipeline["created_at"]) assert result["agent_role"] == "dev" assert result["output_summary"] == "second run" # --------------------------------------------------------------------------- # SQL: get_all_running_pipelines # --------------------------------------------------------------------------- def test_get_all_running_pipelines_empty_when_no_pipelines(conn): assert models.get_all_running_pipelines(conn) == [] def test_get_all_running_pipelines_returns_only_running(conn, project_and_task): """completed и failed пайплайны не должны попадать в kin ps.""" _, task = project_and_task running = models.create_pipeline(conn, task["id"], "p1", "standard", []) completed = models.create_pipeline(conn, task["id"], "p1", "standard", []) models.update_pipeline(conn, completed["id"], status="completed") failed = models.create_pipeline(conn, task["id"], "p1", "standard", []) models.update_pipeline(conn, failed["id"], status="failed") result = models.get_all_running_pipelines(conn) ids = [r["id"] for r in result] assert running["id"] in ids assert completed["id"] not in ids assert failed["id"] not in ids def test_get_all_running_pipelines_pid_null_is_none(conn, project_and_task): """PID=NULL → решение #374: отображается как '-' (pid=None в dict).""" _, task = project_and_task models.create_pipeline(conn, task["id"], "p1", "standard", []) result = models.get_all_running_pipelines(conn) assert len(result) == 1 assert result[0]["pid"] is None def test_get_all_running_pipelines_includes_current_agent(conn, project_and_task): """current_agent = последний agent_role из agent_logs.""" _, task = project_and_task pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", []) models.log_agent_run(conn, "p1", "pm", "plan", task_id=task["id"]) models.log_agent_run(conn, "p1", "dev", "code", task_id=task["id"]) result = models.get_all_running_pipelines(conn) assert result[0]["current_agent"] == "dev" def test_get_all_running_pipelines_no_logs_current_agent_is_none(conn, project_and_task): _, task = project_and_task models.create_pipeline(conn, task["id"], "p1", "standard", []) result = models.get_all_running_pipelines(conn) assert result[0]["current_agent"] is None def test_get_all_running_pipelines_includes_project_and_task_info(conn, project_and_task): proj, task = project_and_task models.create_pipeline(conn, task["id"], "p1", "standard", []) result = models.get_all_running_pipelines(conn) assert result[0]["project_name"] == proj["name"] assert result[0]["title"] == task["title"] # --------------------------------------------------------------------------- # _render_ps (stdout) # --------------------------------------------------------------------------- def test_render_ps_empty_list(capsys): _render_ps([]) out = capsys.readouterr().out assert "No running pipelines." in out def test_render_ps_suggests_watch_command_when_empty(capsys): _render_ps([]) out = capsys.readouterr().out assert "kin watch" in out def test_render_ps_pid_null_displayed_as_dash(capsys): """Decision #374: PID=NULL → '-'.""" rows = [{ "id": 1, "task_id": "KIN-001", "title": "Short title", "project_name": "MyProject", "pid": None, "created_at": "2026-03-17 12:00:00", "current_agent": "dev", "parent_pipeline_id": None, }] with patch("cli.watch._format_elapsed", return_value="5s"): _render_ps(rows) out = capsys.readouterr().out # PID column should contain dash, not None or empty lines = [l for l in out.splitlines() if "Short title" in l or "#1" in l] assert any("-" in l for l in lines) def test_render_ps_title_truncated_at_20_chars(capsys): long_title = "A" * 25 rows = [{ "id": 1, "task_id": "KIN-001", "title": long_title, "project_name": "P", "pid": 1234, "created_at": "2026-03-17 12:00:00", "current_agent": None, "parent_pipeline_id": None, }] with patch("cli.watch._format_elapsed", return_value="1s"): _render_ps(rows) out = capsys.readouterr().out assert "A" * 20 + "…" in out assert long_title not in out def test_render_ps_with_pid_shows_pid(capsys): rows = [{ "id": 2, "task_id": "KIN-002", "title": "Fix bug", "project_name": "Proj", "pid": 9999, "created_at": "2026-03-17 11:00:00", "current_agent": "pm", "parent_pipeline_id": None, }] with patch("cli.watch._format_elapsed", return_value="10m"): _render_ps(rows) out = capsys.readouterr().out assert "9999" in out def test_render_ps_shows_running_count(capsys): rows = [ {"id": 1, "task_id": "KIN-001", "title": "T1", "project_name": "P", "pid": None, "created_at": "2026-03-17 12:00:00", "current_agent": None, "parent_pipeline_id": None}, {"id": 2, "task_id": "KIN-002", "title": "T2", "project_name": "P", "pid": 42, "created_at": "2026-03-17 12:01:00", "current_agent": "dev", "parent_pipeline_id": None}, ] with patch("cli.watch._format_elapsed", return_value="1s"): _render_ps(rows) out = capsys.readouterr().out assert "2" in out # --------------------------------------------------------------------------- # _render_watch (stdout) # --------------------------------------------------------------------------- def test_render_watch_no_pipeline(capsys): task = {"id": "KIN-001", "title": "Test task", "status": "pending"} _render_watch(task, None, None, 0, "?") out = capsys.readouterr().out assert "No pipeline started yet." in out def test_render_watch_pipeline_null_pid_shows_dash(capsys): """PID=NULL → 'PID: -'.""" task = {"id": "KIN-001", "title": "Test task", "status": "in_progress"} pipeline = { "id": 1, "status": "running", "pid": None, "created_at": "2026-03-17 12:00:00", "steps": [], } with patch("cli.watch._format_elapsed", return_value="30s"): _render_watch(task, pipeline, None, 0, 3) out = capsys.readouterr().out assert "PID: -" in out def test_render_watch_pipeline_with_pid(capsys): task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"} pipeline = { "id": 1, "status": "running", "pid": 5678, "created_at": "2026-03-17 12:00:00", "steps": [], } with patch("cli.watch._format_elapsed", return_value="10s"): _render_watch(task, pipeline, None, 0, 2) out = capsys.readouterr().out assert "5678" in out def test_render_watch_terminal_status_completed(capsys): task = {"id": "KIN-001", "title": "Done task", "status": "done"} pipeline = { "id": 1, "status": "completed", "pid": 123, "created_at": "2026-03-17 12:00:00", "steps": [], } with patch("cli.watch._format_elapsed", return_value="1m"): _render_watch(task, pipeline, None, 2, 3) out = capsys.readouterr().out assert "[Pipeline completed. Exiting.]" in out def test_render_watch_terminal_status_failed(capsys): task = {"id": "KIN-001", "title": "Failed task", "status": "failed"} pipeline = { "id": 1, "status": "failed", "pid": 123, "created_at": "2026-03-17 12:00:00", "steps": [], } with patch("cli.watch._format_elapsed", return_value="2m"): _render_watch(task, pipeline, None, 1, 3) out = capsys.readouterr().out assert "[Pipeline failed. Exiting.]" in out def test_render_watch_running_shows_update_hint(capsys): task = {"id": "KIN-001", "title": "Active task", "status": "in_progress"} pipeline = { "id": 1, "status": "running", "pid": 42, "created_at": "2026-03-17 12:00:00", "steps": [], } with patch("cli.watch._format_elapsed", return_value="10s"): _render_watch(task, pipeline, None, 0, 2) out = capsys.readouterr().out assert "[Updating every 5s. Ctrl+C to stop]" in out def test_render_watch_truncates_long_title(capsys): long_title = "B" * 55 task = {"id": "KIN-001", "title": long_title, "status": "pending"} _render_watch(task, None, None, 0, "?") out = capsys.readouterr().out assert "…" in out assert long_title not in out def test_render_watch_shows_last_15_output_lines(capsys): """Output truncated к последним 15 строкам (аналог tail -f).""" task = {"id": "KIN-001", "title": "T", "status": "in_progress"} pipeline = { "id": 1, "status": "running", "pid": 1, "created_at": "2026-03-17 12:00:00", "steps": [], } lines = [f"line{i}" for i in range(20)] log = {"agent_role": "dev", "output_summary": "\n".join(lines)} with patch("cli.watch._format_elapsed", return_value="1s"): _render_watch(task, pipeline, log, 1, 3) out = capsys.readouterr().out assert "line19" in out # последняя строка — должна быть assert "line5" in out # line5 = 6-я с начала, входит в последние 15 assert "line4" not in out # line4 = 5-я с начала, отсекается def test_render_watch_waiting_message_when_no_log(capsys): """Нет логов агента → 'Waiting for first agent...'.""" task = {"id": "KIN-001", "title": "New task", "status": "pending"} pipeline = { "id": 1, "status": "running", "pid": None, "created_at": "2026-03-17 12:00:00", "steps": [], } with patch("cli.watch._format_elapsed", return_value="0s"): _render_watch(task, pipeline, None, 0, "?") out = capsys.readouterr().out assert "Waiting for first agent" in out # --------------------------------------------------------------------------- # cmd_watch: nonexistent task_id # --------------------------------------------------------------------------- def test_cmd_watch_nonexistent_task_prints_error(conn, capsys): cmd_watch(conn, "KIN-NONEXISTENT") out = capsys.readouterr().out assert "KIN-NONEXISTENT" in out assert "not found" in out.lower() def test_cmd_watch_nonexistent_task_does_not_sleep(conn): """cmd_watch должен вернуться немедленно, без вызова time.sleep.""" with patch("cli.watch.time.sleep") as mock_sleep: cmd_watch(conn, "KIN-NONEXISTENT") mock_sleep.assert_not_called()