"""Tests for core/db._migrate() — pipeline_log table migration (KIN-084). Convention #384: three tests for conditional DDL guard. Convention #385: paired schema helper. """ import sqlite3 from core.db import SCHEMA, _migrate, init_db # ───────────────────────────────────────────────────────────── # Helpers (Convention #385) # ───────────────────────────────────────────────────────────── def _get_tables(conn: sqlite3.Connection) -> set: return {r[0] for r in conn.execute( "SELECT name FROM sqlite_master WHERE type='table'" ).fetchall()} def _get_indexes(conn: sqlite3.Connection) -> set: return {r[1] for r in conn.execute( "SELECT * FROM sqlite_master WHERE type='index'" ).fetchall()} def _get_columns(conn: sqlite3.Connection, table: str) -> set: return {r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()} def _make_db_without_pipeline_log() -> sqlite3.Connection: """In-memory DB with full schema minus pipeline_log (simulates legacy DB).""" conn = sqlite3.connect(":memory:") conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.row_factory = sqlite3.Row # Split on the pipeline_log section comment (last section in SCHEMA) schema = SCHEMA.split("-- Live console log (KIN-084)")[0] conn.executescript(schema) conn.commit() return conn # ───────────────────────────────────────────────────────────── # Тест 1: таблица отсутствует → _migrate() создаёт её # ───────────────────────────────────────────────────────────── def test_migrate_creates_pipeline_log_table_when_absent(): """_migrate() создаёт таблицу pipeline_log, если она отсутствует.""" conn = _make_db_without_pipeline_log() assert "pipeline_log" not in _get_tables(conn) _migrate(conn) assert "pipeline_log" in _get_tables(conn) conn.close() def test_migrate_creates_pipeline_log_index_when_absent(): """_migrate() создаёт idx_pipeline_log_pipeline_id, если pipeline_log отсутствует.""" conn = _make_db_without_pipeline_log() assert "idx_pipeline_log_pipeline_id" not in _get_indexes(conn) _migrate(conn) assert "idx_pipeline_log_pipeline_id" in _get_indexes(conn) conn.close() def test_migrate_created_pipeline_log_has_all_columns(): """pipeline_log, созданная _migrate(), содержит все нужные колонки.""" conn = _make_db_without_pipeline_log() _migrate(conn) cols = _get_columns(conn, "pipeline_log") assert {"id", "pipeline_id", "ts", "level", "message", "extra_json"} <= cols conn.close() # ───────────────────────────────────────────────────────────── # Тест 2: таблица есть + полная схема → идемпотентность # ───────────────────────────────────────────────────────────── def test_migrate_idempotent_when_pipeline_log_exists(): """_migrate() не ломает pipeline_log и не падает при повторном вызове.""" conn = init_db(":memory:") assert "pipeline_log" in _get_tables(conn) # Повторный вызов не должен бросить исключение _migrate(conn) assert "pipeline_log" in _get_tables(conn) assert "idx_pipeline_log_pipeline_id" in _get_indexes(conn) conn.close() def test_migrate_idempotent_preserves_existing_pipeline_log_data(): """_migrate() не удаляет данные из существующей pipeline_log.""" from core import models conn = init_db(":memory:") # Создаём минимальную цепочку project → task → pipeline → log models.create_project(conn, "tp", "Test", "/tp") models.create_task(conn, "TP-001", "tp", "T") pipeline = models.create_pipeline(conn, "TP-001", "tp", "linear", []) models.write_log(conn, pipeline["id"], "test-entry") _migrate(conn) rows = conn.execute("SELECT message FROM pipeline_log").fetchall() assert len(rows) == 1 assert rows[0][0] == "test-entry" conn.close() # ───────────────────────────────────────────────────────────── # Тест 3: таблица есть без extra_json → _migrate() не падает # ───────────────────────────────────────────────────────────── def test_migrate_no_crash_when_pipeline_log_missing_extra_json_column(): """_migrate() не падает, если pipeline_log существует без колонки extra_json.""" conn = _make_db_without_pipeline_log() # Создаём pipeline_log без extra_json (старая схема) conn.executescript(""" CREATE TABLE pipeline_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, pipeline_id INTEGER NOT NULL, ts TEXT NOT NULL DEFAULT (datetime('now')), level TEXT NOT NULL DEFAULT 'INFO', message TEXT NOT NULL ); """) conn.commit() assert "pipeline_log" in _get_tables(conn) assert "extra_json" not in _get_columns(conn, "pipeline_log") # _migrate() должен завершиться без исключений _migrate(conn) # Таблица по-прежнему существует assert "pipeline_log" in _get_tables(conn) conn.close() def test_migrate_does_not_add_extra_json_to_existing_pipeline_log(): """_migrate() не добавляет extra_json к существующей pipeline_log (нет ALTER TABLE).""" conn = _make_db_without_pipeline_log() conn.executescript(""" CREATE TABLE pipeline_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, pipeline_id INTEGER NOT NULL, ts TEXT NOT NULL DEFAULT (datetime('now')), level TEXT NOT NULL DEFAULT 'INFO', message TEXT NOT NULL ); """) conn.commit() _migrate(conn) # Документируем текущее поведение: колонка не добавляется cols = _get_columns(conn, "pipeline_log") assert "extra_json" not in cols conn.close()