"""Tests for GET /api/pipelines/{id}/logs endpoint (KIN-084 Live Console). Convention #418: since_id cursor pagination for append-only tables. Convention #420: 404 for non-existent resources. """ import pytest from fastapi.testclient import TestClient import web.api as api_module from core.db import init_db from core import models # ───────────────────────────────────────────────────────────── # Fixtures # ───────────────────────────────────────────────────────────── @pytest.fixture def client(tmp_path): """Bare TestClient с изолированной БД (без предварительных данных).""" db_path = tmp_path / "test.db" api_module.DB_PATH = db_path from web.api import app return TestClient(app) @pytest.fixture def pipeline_client(tmp_path): """TestClient с project + task + pipeline, готовый к тестированию логов.""" db_path = tmp_path / "test.db" api_module.DB_PATH = db_path from web.api import app c = TestClient(app) # Seed project + task через API c.post("/api/projects", json={"id": "p1", "name": "P1", "path": "/p1"}) c.post("/api/tasks", json={"project_id": "p1", "title": "Task 1"}) # Создаём pipeline напрямую в БД conn = init_db(db_path) pipeline = models.create_pipeline(conn, "P1-001", "p1", "linear", ["step1"]) conn.close() yield c, pipeline["id"], db_path # ───────────────────────────────────────────────────────────── # Тест: пустой pipeline → пустой список # ───────────────────────────────────────────────────────────── def test_get_pipeline_logs_empty_returns_empty_list(pipeline_client): """GET /api/pipelines/{id}/logs возвращает [] для pipeline без записей.""" c, pipeline_id, _ = pipeline_client r = c.get(f"/api/pipelines/{pipeline_id}/logs") assert r.status_code == 200 assert r.json() == [] # ───────────────────────────────────────────────────────────── # Тест: несуществующий pipeline → 404 (Convention #420) # ───────────────────────────────────────────────────────────── def test_get_pipeline_logs_nonexistent_pipeline_returns_404(client): """GET /api/pipelines/99999/logs возвращает 404 для несуществующего pipeline.""" r = client.get("/api/pipelines/99999/logs") assert r.status_code == 404 # ───────────────────────────────────────────────────────────── # Тест: 3 записи → правильные поля (id, ts, level, message, extra_json) # ───────────────────────────────────────────────────────────── def test_get_pipeline_logs_returns_three_entries_with_correct_fields(pipeline_client): """После write_log() x3 GET возвращает 3 записи с полями id, ts, level, message, extra_json.""" c, pipeline_id, db_path = pipeline_client conn = init_db(db_path) models.write_log(conn, pipeline_id, "PM started", level="INFO") models.write_log(conn, pipeline_id, "Running agent", level="DEBUG") models.write_log(conn, pipeline_id, "Agent error", level="ERROR", extra={"code": 500}) conn.close() r = c.get(f"/api/pipelines/{pipeline_id}/logs") assert r.status_code == 200 logs = r.json() assert len(logs) == 3 # Проверяем наличие всех обязательных полей first = logs[0] for field in ("id", "ts", "level", "message", "extra_json"): assert field in first, f"Поле '{field}' отсутствует в ответе" assert first["message"] == "PM started" assert first["level"] == "INFO" assert first["extra_json"] is None assert logs[2]["level"] == "ERROR" assert logs[2]["extra_json"] == {"code": 500} def test_get_pipeline_logs_returns_entries_in_chronological_order(pipeline_client): """Записи возвращаются в хронологическом порядке (по id ASC).""" c, pipeline_id, db_path = pipeline_client conn = init_db(db_path) for msg in ("first", "second", "third"): models.write_log(conn, pipeline_id, msg) conn.close() logs = c.get(f"/api/pipelines/{pipeline_id}/logs").json() assert [e["message"] for e in logs] == ["first", "second", "third"] # ───────────────────────────────────────────────────────────── # Тест: since_id cursor pagination (Convention #418) # ───────────────────────────────────────────────────────────── def test_get_pipeline_logs_since_id_returns_entries_after_cursor(pipeline_client): """GET ?since_id= возвращает только записи с id > since_id.""" c, pipeline_id, db_path = pipeline_client conn = init_db(db_path) for i in range(1, 6): # 5 записей models.write_log(conn, pipeline_id, f"Message {i}") conn.close() # Получаем все записи all_logs = c.get(f"/api/pipelines/{pipeline_id}/logs").json() assert len(all_logs) == 5 # Берём id третьей записи как курсор cursor_id = all_logs[2]["id"] r = c.get(f"/api/pipelines/{pipeline_id}/logs?since_id={cursor_id}") assert r.status_code == 200 partial = r.json() assert len(partial) == 2 for entry in partial: assert entry["id"] > cursor_id def test_get_pipeline_logs_since_id_zero_returns_all(pipeline_client): """GET ?since_id=0 возвращает все записи (значение по умолчанию).""" c, pipeline_id, db_path = pipeline_client conn = init_db(db_path) models.write_log(conn, pipeline_id, "A") models.write_log(conn, pipeline_id, "B") conn.close() r = c.get(f"/api/pipelines/{pipeline_id}/logs?since_id=0") assert r.status_code == 200 assert len(r.json()) == 2 def test_get_pipeline_logs_since_id_beyond_last_returns_empty(pipeline_client): """GET ?since_id= возвращает [] (нет записей после последней).""" c, pipeline_id, db_path = pipeline_client conn = init_db(db_path) models.write_log(conn, pipeline_id, "Only entry") conn.close() all_logs = c.get(f"/api/pipelines/{pipeline_id}/logs").json() last_id = all_logs[-1]["id"] r = c.get(f"/api/pipelines/{pipeline_id}/logs?since_id={last_id}") assert r.status_code == 200 assert r.json() == []