kin/tests/test_migrate_pipeline_log.py

170 lines
7 KiB
Python
Raw Normal View History

2026-03-17 17:53:14 +02:00
"""Tests for core/db._migrate() — pipeline_log table migration (KIN-084).
Convention #384: three tests for conditional DDL guard.
Convention #385: paired schema helper.
"""
import sqlite3
from core.db import SCHEMA, _migrate, init_db
# ─────────────────────────────────────────────────────────────
# Helpers (Convention #385)
# ─────────────────────────────────────────────────────────────
def _get_tables(conn: sqlite3.Connection) -> set:
return {r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()}
def _get_indexes(conn: sqlite3.Connection) -> set:
return {r[1] for r in conn.execute(
"SELECT * FROM sqlite_master WHERE type='index'"
).fetchall()}
def _get_columns(conn: sqlite3.Connection, table: str) -> set:
return {r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
def _make_db_without_pipeline_log() -> sqlite3.Connection:
"""In-memory DB with full schema minus pipeline_log (simulates legacy DB)."""
conn = sqlite3.connect(":memory:")
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
# Split on the pipeline_log section comment (last section in SCHEMA)
schema = SCHEMA.split("-- Live console log (KIN-084)")[0]
conn.executescript(schema)
conn.commit()
return conn
# ─────────────────────────────────────────────────────────────
# Тест 1: таблица отсутствует → _migrate() создаёт её
# ─────────────────────────────────────────────────────────────
def test_migrate_creates_pipeline_log_table_when_absent():
"""_migrate() создаёт таблицу pipeline_log, если она отсутствует."""
conn = _make_db_without_pipeline_log()
assert "pipeline_log" not in _get_tables(conn)
_migrate(conn)
assert "pipeline_log" in _get_tables(conn)
conn.close()
def test_migrate_creates_pipeline_log_index_when_absent():
"""_migrate() создаёт idx_pipeline_log_pipeline_id, если pipeline_log отсутствует."""
conn = _make_db_without_pipeline_log()
assert "idx_pipeline_log_pipeline_id" not in _get_indexes(conn)
_migrate(conn)
assert "idx_pipeline_log_pipeline_id" in _get_indexes(conn)
conn.close()
def test_migrate_created_pipeline_log_has_all_columns():
"""pipeline_log, созданная _migrate(), содержит все нужные колонки."""
conn = _make_db_without_pipeline_log()
_migrate(conn)
cols = _get_columns(conn, "pipeline_log")
assert {"id", "pipeline_id", "ts", "level", "message", "extra_json"} <= cols
conn.close()
# ─────────────────────────────────────────────────────────────
# Тест 2: таблица есть + полная схема → идемпотентность
# ─────────────────────────────────────────────────────────────
def test_migrate_idempotent_when_pipeline_log_exists():
"""_migrate() не ломает pipeline_log и не падает при повторном вызове."""
conn = init_db(":memory:")
assert "pipeline_log" in _get_tables(conn)
# Повторный вызов не должен бросить исключение
_migrate(conn)
assert "pipeline_log" in _get_tables(conn)
assert "idx_pipeline_log_pipeline_id" in _get_indexes(conn)
conn.close()
def test_migrate_idempotent_preserves_existing_pipeline_log_data():
"""_migrate() не удаляет данные из существующей pipeline_log."""
from core import models
conn = init_db(":memory:")
# Создаём минимальную цепочку project → task → pipeline → log
models.create_project(conn, "tp", "Test", "/tp")
models.create_task(conn, "TP-001", "tp", "T")
pipeline = models.create_pipeline(conn, "TP-001", "tp", "linear", [])
models.write_log(conn, pipeline["id"], "test-entry")
_migrate(conn)
rows = conn.execute("SELECT message FROM pipeline_log").fetchall()
assert len(rows) == 1
assert rows[0][0] == "test-entry"
conn.close()
# ─────────────────────────────────────────────────────────────
# Тест 3: таблица есть без extra_json → _migrate() не падает
# ─────────────────────────────────────────────────────────────
def test_migrate_no_crash_when_pipeline_log_missing_extra_json_column():
"""_migrate() не падает, если pipeline_log существует без колонки extra_json."""
conn = _make_db_without_pipeline_log()
# Создаём pipeline_log без extra_json (старая схема)
conn.executescript("""
CREATE TABLE pipeline_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL,
ts TEXT NOT NULL DEFAULT (datetime('now')),
level TEXT NOT NULL DEFAULT 'INFO',
message TEXT NOT NULL
);
""")
conn.commit()
assert "pipeline_log" in _get_tables(conn)
assert "extra_json" not in _get_columns(conn, "pipeline_log")
# _migrate() должен завершиться без исключений
_migrate(conn)
# Таблица по-прежнему существует
assert "pipeline_log" in _get_tables(conn)
conn.close()
def test_migrate_does_not_add_extra_json_to_existing_pipeline_log():
"""_migrate() не добавляет extra_json к существующей pipeline_log (нет ALTER TABLE)."""
conn = _make_db_without_pipeline_log()
conn.executescript("""
CREATE TABLE pipeline_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL,
ts TEXT NOT NULL DEFAULT (datetime('now')),
level TEXT NOT NULL DEFAULT 'INFO',
message TEXT NOT NULL
);
""")
conn.commit()
_migrate(conn)
# Документируем текущее поведение: колонка не добавляется
cols = _get_columns(conn, "pipeline_log")
assert "extra_json" not in cols
conn.close()