"""Tests for core/db.py — schema and migration (KIN-071).""" import sqlite3 import pytest from core.db import init_db, _migrate @pytest.fixture def conn(): c = init_db(db_path=":memory:") yield c c.close() def _cols(conn, table: str) -> set[str]: """Return set of column names for a table.""" return {row["name"] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()} # --------------------------------------------------------------------------- # Schema: новые колонки KIN-071 присутствуют при свежей инициализации # --------------------------------------------------------------------------- class TestProjectsSchemaKin071: """PRAGMA table_info(projects) должен содержать новые KIN-071 колонки.""" def test_schema_has_project_type_column(self, conn): assert "project_type" in _cols(conn, "projects") def test_schema_has_ssh_host_column(self, conn): assert "ssh_host" in _cols(conn, "projects") def test_schema_has_ssh_user_column(self, conn): assert "ssh_user" in _cols(conn, "projects") def test_schema_has_ssh_key_path_column(self, conn): assert "ssh_key_path" in _cols(conn, "projects") def test_schema_has_ssh_proxy_jump_column(self, conn): assert "ssh_proxy_jump" in _cols(conn, "projects") def test_schema_has_description_column(self, conn): assert "description" in _cols(conn, "projects") def test_project_type_defaults_to_development(self, conn): """INSERT без project_type → значение по умолчанию 'development'.""" conn.execute( "INSERT INTO projects (id, name, path) VALUES ('t1', 'T', '/t')" ) conn.commit() row = conn.execute( "SELECT project_type FROM projects WHERE id='t1'" ).fetchone() assert row["project_type"] == "development" def test_ssh_fields_default_to_null(self, conn): """SSH-поля по умолчанию NULL.""" conn.execute( "INSERT INTO projects (id, name, path) VALUES ('t2', 'T', '/t')" ) conn.commit() row = conn.execute( "SELECT ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump FROM projects WHERE id='t2'" ).fetchone() assert row["ssh_host"] is None assert row["ssh_user"] is None assert row["ssh_key_path"] is None assert row["ssh_proxy_jump"] is None # --------------------------------------------------------------------------- # Migration: _migrate добавляет KIN-071 колонки в старую схему (без них) # --------------------------------------------------------------------------- def _old_schema_conn() -> sqlite3.Connection: """Создаёт соединение с минимальной 'старой' схемой без KIN-071 колонок.""" conn = sqlite3.connect(":memory:") conn.row_factory = sqlite3.Row conn.executescript(""" CREATE TABLE projects ( id TEXT PRIMARY KEY, name TEXT NOT NULL, path TEXT NOT NULL, status TEXT DEFAULT 'active', language TEXT DEFAULT 'ru', execution_mode TEXT NOT NULL DEFAULT 'review' ); CREATE TABLE tasks ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, title TEXT NOT NULL, status TEXT DEFAULT 'pending', execution_mode TEXT ); """) conn.commit() return conn def test_migrate_adds_project_type_to_old_schema(): """_migrate добавляет project_type в старую схему без этой колонки.""" conn = _old_schema_conn() _migrate(conn) assert "project_type" in _cols(conn, "projects") conn.close() def test_migrate_adds_ssh_host_to_old_schema(): """_migrate добавляет ssh_host в старую схему.""" conn = _old_schema_conn() _migrate(conn) assert "ssh_host" in _cols(conn, "projects") conn.close() def test_migrate_adds_all_ssh_columns_to_old_schema(): """_migrate добавляет все SSH-колонки разом в старую схему.""" conn = _old_schema_conn() _migrate(conn) cols = _cols(conn, "projects") assert {"ssh_host", "ssh_user", "ssh_key_path", "ssh_proxy_jump", "description"}.issubset(cols) conn.close() def test_migrate_is_idempotent(): """Повторный вызов _migrate не ломает схему.""" conn = init_db(":memory:") before = _cols(conn, "projects") _migrate(conn) after = _cols(conn, "projects") assert before == after conn.close()