kin/tests/test_api_pipeline_logs.py

172 lines
7.4 KiB
Python
Raw Normal View History

2026-03-17 17:53:14 +02:00
"""Tests for GET /api/pipelines/{id}/logs endpoint (KIN-084 Live Console).
Convention #418: since_id cursor pagination for append-only tables.
Convention #420: 404 for non-existent resources.
"""
import pytest
from fastapi.testclient import TestClient
import web.api as api_module
from core.db import init_db
from core import models
# ─────────────────────────────────────────────────────────────
# Fixtures
# ─────────────────────────────────────────────────────────────
@pytest.fixture
def client(tmp_path):
"""Bare TestClient с изолированной БД (без предварительных данных)."""
db_path = tmp_path / "test.db"
api_module.DB_PATH = db_path
from web.api import app
return TestClient(app)
@pytest.fixture
def pipeline_client(tmp_path):
"""TestClient с project + task + pipeline, готовый к тестированию логов."""
db_path = tmp_path / "test.db"
api_module.DB_PATH = db_path
from web.api import app
c = TestClient(app)
# Seed project + task через API
c.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"})
c.post("/api/tasks", json={"project_id": "p1", "title": "Task 1"})
# Создаём pipeline напрямую в БД
conn = init_db(db_path)
pipeline = models.create_pipeline(conn, "P1-001", "p1", "linear", ["step1"])
conn.close()
yield c, pipeline["id"], db_path
# ─────────────────────────────────────────────────────────────
# Тест: пустой pipeline → пустой список
# ─────────────────────────────────────────────────────────────
def test_get_pipeline_logs_empty_returns_empty_list(pipeline_client):
"""GET /api/pipelines/{id}/logs возвращает [] для pipeline без записей."""
c, pipeline_id, _ = pipeline_client
r = c.get(f"/api/pipelines/{pipeline_id}/logs")
assert r.status_code == 200
assert r.json() == []
# ─────────────────────────────────────────────────────────────
# Тест: несуществующий pipeline → 404 (Convention #420)
# ─────────────────────────────────────────────────────────────
def test_get_pipeline_logs_nonexistent_pipeline_returns_404(client):
"""GET /api/pipelines/99999/logs возвращает 404 для несуществующего pipeline."""
r = client.get("/api/pipelines/99999/logs")
assert r.status_code == 404
# ─────────────────────────────────────────────────────────────
# Тест: 3 записи → правильные поля (id, ts, level, message, extra_json)
# ─────────────────────────────────────────────────────────────
def test_get_pipeline_logs_returns_three_entries_with_correct_fields(pipeline_client):
"""После write_log() x3 GET возвращает 3 записи с полями id, ts, level, message, extra_json."""
c, pipeline_id, db_path = pipeline_client
conn = init_db(db_path)
models.write_log(conn, pipeline_id, "PM started", level="INFO")
models.write_log(conn, pipeline_id, "Running agent", level="DEBUG")
models.write_log(conn, pipeline_id, "Agent error", level="ERROR", extra={"code": 500})
conn.close()
r = c.get(f"/api/pipelines/{pipeline_id}/logs")
assert r.status_code == 200
logs = r.json()
assert len(logs) == 3
# Проверяем наличие всех обязательных полей
first = logs[0]
for field in ("id", "ts", "level", "message", "extra_json"):
assert field in first, f"Поле '{field}' отсутствует в ответе"
assert first["message"] == "PM started"
assert first["level"] == "INFO"
assert first["extra_json"] is None
assert logs[2]["level"] == "ERROR"
assert logs[2]["extra_json"] == {"code": 500}
def test_get_pipeline_logs_returns_entries_in_chronological_order(pipeline_client):
"""Записи возвращаются в хронологическом порядке (по id ASC)."""
c, pipeline_id, db_path = pipeline_client
conn = init_db(db_path)
for msg in ("first", "second", "third"):
models.write_log(conn, pipeline_id, msg)
conn.close()
logs = c.get(f"/api/pipelines/{pipeline_id}/logs").json()
assert [e["message"] for e in logs] == ["first", "second", "third"]
# ─────────────────────────────────────────────────────────────
# Тест: since_id cursor pagination (Convention #418)
# ─────────────────────────────────────────────────────────────
def test_get_pipeline_logs_since_id_returns_entries_after_cursor(pipeline_client):
"""GET ?since_id=<id> возвращает только записи с id > since_id."""
c, pipeline_id, db_path = pipeline_client
conn = init_db(db_path)
for i in range(1, 6): # 5 записей
models.write_log(conn, pipeline_id, f"Message {i}")
conn.close()
# Получаем все записи
all_logs = c.get(f"/api/pipelines/{pipeline_id}/logs").json()
assert len(all_logs) == 5
# Берём id третьей записи как курсор
cursor_id = all_logs[2]["id"]
r = c.get(f"/api/pipelines/{pipeline_id}/logs?since_id={cursor_id}")
assert r.status_code == 200
partial = r.json()
assert len(partial) == 2
for entry in partial:
assert entry["id"] > cursor_id
def test_get_pipeline_logs_since_id_zero_returns_all(pipeline_client):
"""GET ?since_id=0 возвращает все записи (значение по умолчанию)."""
c, pipeline_id, db_path = pipeline_client
conn = init_db(db_path)
models.write_log(conn, pipeline_id, "A")
models.write_log(conn, pipeline_id, "B")
conn.close()
r = c.get(f"/api/pipelines/{pipeline_id}/logs?since_id=0")
assert r.status_code == 200
assert len(r.json()) == 2
def test_get_pipeline_logs_since_id_beyond_last_returns_empty(pipeline_client):
"""GET ?since_id=<last_id> возвращает [] (нет записей после последней)."""
c, pipeline_id, db_path = pipeline_client
conn = init_db(db_path)
models.write_log(conn, pipeline_id, "Only entry")
conn.close()
all_logs = c.get(f"/api/pipelines/{pipeline_id}/logs").json()
last_id = all_logs[-1]["id"]
r = c.get(f"/api/pipelines/{pipeline_id}/logs?since_id={last_id}")
assert r.status_code == 200
assert r.json() == []