kin: auto-commit after pipeline

This commit is contained in:
Gros Frumos 2026-03-17 18:29:32 +02:00
parent 94591ab7ae
commit 3d9b5766ab
10 changed files with 250 additions and 30 deletions

View file

@ -0,0 +1,152 @@
"""Regression tests for KIN-OBS-022 — three fixes in cli/watch.py:
AC1: models.get_task(conn, task_id) called at the start of each while-True iteration.
AC2: 'Waiting for first agent...' appears exactly once (agent-role line);
output section uses 'No output yet.' instead of a second occurrence.
AC3: datetime.utcnow() absent from source; datetime.now(timezone.utc) used;
'timezone' imported from datetime stdlib.
"""
import inspect
import pytest
from datetime import datetime, timedelta, timezone
from unittest.mock import patch
from core.db import init_db
from core import models
from cli.watch import _format_elapsed, _render_watch, cmd_watch
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
c = init_db(db_path=":memory:")
yield c
c.close()
@pytest.fixture
def project_and_task(conn):
proj = models.create_project(conn, "p1", "Obs022 Project", "/obs022", tech_stack=["python"])
task = models.create_task(conn, "KIN-OBS-022-T1", "p1", "Watch task")
return proj, task
# ---------------------------------------------------------------------------
# AC1: task re-read from DB in each loop iteration
# ---------------------------------------------------------------------------
def test_cmd_watch_calls_get_task_inside_loop(conn, project_and_task):
"""models.get_task must be called at least twice: once before the loop,
once inside the loop body so status changes in DB are picked up."""
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.update_pipeline(conn, pipeline["id"], status="completed")
with patch("cli.watch.models.get_task", wraps=models.get_task) as mock_get_task, \
patch("cli.watch._clear_screen"), \
patch("cli.watch.time.sleep"):
cmd_watch(conn, task["id"])
# ≥2: one before the existence check, one inside the loop body
assert mock_get_task.call_count >= 2
def test_cmd_watch_loop_get_task_uses_correct_task_id(conn, project_and_task):
"""Every get_task call in cmd_watch passes the same task_id."""
_, task = project_and_task
pipeline = models.create_pipeline(conn, task["id"], "p1", "standard", [])
models.update_pipeline(conn, pipeline["id"], status="completed")
with patch("cli.watch.models.get_task", wraps=models.get_task) as mock_get_task, \
patch("cli.watch._clear_screen"), \
patch("cli.watch.time.sleep"):
cmd_watch(conn, task["id"])
for call in mock_get_task.call_args_list:
assert call.args[1] == task["id"]
# ---------------------------------------------------------------------------
# AC2: 'Waiting for first agent...' exactly once; output section → 'No output yet.'
# ---------------------------------------------------------------------------
def test_render_watch_waiting_message_appears_exactly_once(capsys):
"""When pipeline has no log, 'Waiting for first agent...' appears once
(agent-role line), and 'No output yet.' appears in the output section."""
task = {"id": "KIN-001", "title": "T", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": None,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="0s"):
_render_watch(task, pipeline, None, 0, "?")
out = capsys.readouterr().out
assert out.count("Waiting for first agent") == 1
def test_render_watch_output_section_shows_no_output_yet_not_waiting(capsys):
"""Output section must say 'No output yet.' — not 'Waiting for first agent...'."""
task = {"id": "KIN-001", "title": "T", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": None,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
with patch("cli.watch._format_elapsed", return_value="0s"):
_render_watch(task, pipeline, None, 0, "?")
out = capsys.readouterr().out
assert "No output yet." in out
def test_render_watch_log_with_empty_output_summary_shows_no_output_yet(capsys):
"""Log present but output_summary is None → 'No output yet.' in output section."""
task = {"id": "KIN-001", "title": "T", "status": "in_progress"}
pipeline = {
"id": 1, "status": "running", "pid": 42,
"created_at": "2026-03-17 12:00:00", "steps": [],
}
log = {"agent_role": "dev", "output_summary": None}
with patch("cli.watch._format_elapsed", return_value="5s"):
_render_watch(task, pipeline, log, 1, 3)
out = capsys.readouterr().out
assert "No output yet." in out
assert "Waiting for first agent" not in out
# ---------------------------------------------------------------------------
# AC3: datetime.utcnow() absent; timezone imported; now(timezone.utc) used
# ---------------------------------------------------------------------------
def test_watch_py_does_not_contain_utcnow():
"""cli/watch.py must not reference datetime.utcnow()."""
import cli.watch as watch_module
source = inspect.getsource(watch_module)
assert "utcnow" not in source
def test_watch_py_imports_timezone():
"""cli/watch.py must import 'timezone' from stdlib datetime."""
import cli.watch as watch_module
source = inspect.getsource(watch_module)
assert "timezone" in source
def test_format_elapsed_mocked_via_now_not_utcnow():
"""_format_elapsed is controllable through datetime.now(), not utcnow().
Verifies the call path changed to datetime.now(timezone.utc)."""
dt_iso = "2026-03-17 12:00:00"
started = datetime.fromisoformat("2026-03-17T12:00:00")
frozen_now = started + timedelta(seconds=75)
with patch("cli.watch.datetime") as mock_dt:
mock_dt.fromisoformat = datetime.fromisoformat
mock_dt.now.return_value = frozen_now.replace(tzinfo=timezone.utc)
result = _format_elapsed(dt_iso)
assert result == "1m 15s"
mock_dt.now.assert_called_once()
mock_dt.utcnow.assert_not_called()