133 lines
4.6 KiB
Python
133 lines
4.6 KiB
Python
|
|
"""Tests for core/db.py — schema and migration (KIN-071)."""
|
|||
|
|
|
|||
|
|
import sqlite3
|
|||
|
|
import pytest
|
|||
|
|
from core.db import init_db, _migrate
|
|||
|
|
|
|||
|
|
|
|||
|
|
@pytest.fixture
|
|||
|
|
def conn():
|
|||
|
|
c = init_db(db_path=":memory:")
|
|||
|
|
yield c
|
|||
|
|
c.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _cols(conn, table: str) -> set[str]:
|
|||
|
|
"""Return set of column names for a table."""
|
|||
|
|
return {row["name"] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()}
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Schema: новые колонки KIN-071 присутствуют при свежей инициализации
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
class TestProjectsSchemaKin071:
|
|||
|
|
"""PRAGMA table_info(projects) должен содержать новые KIN-071 колонки."""
|
|||
|
|
|
|||
|
|
def test_schema_has_project_type_column(self, conn):
|
|||
|
|
assert "project_type" in _cols(conn, "projects")
|
|||
|
|
|
|||
|
|
def test_schema_has_ssh_host_column(self, conn):
|
|||
|
|
assert "ssh_host" in _cols(conn, "projects")
|
|||
|
|
|
|||
|
|
def test_schema_has_ssh_user_column(self, conn):
|
|||
|
|
assert "ssh_user" in _cols(conn, "projects")
|
|||
|
|
|
|||
|
|
def test_schema_has_ssh_key_path_column(self, conn):
|
|||
|
|
assert "ssh_key_path" in _cols(conn, "projects")
|
|||
|
|
|
|||
|
|
def test_schema_has_ssh_proxy_jump_column(self, conn):
|
|||
|
|
assert "ssh_proxy_jump" in _cols(conn, "projects")
|
|||
|
|
|
|||
|
|
def test_schema_has_description_column(self, conn):
|
|||
|
|
assert "description" in _cols(conn, "projects")
|
|||
|
|
|
|||
|
|
def test_project_type_defaults_to_development(self, conn):
|
|||
|
|
"""INSERT без project_type → значение по умолчанию 'development'."""
|
|||
|
|
conn.execute(
|
|||
|
|
"INSERT INTO projects (id, name, path) VALUES ('t1', 'T', '/t')"
|
|||
|
|
)
|
|||
|
|
conn.commit()
|
|||
|
|
row = conn.execute(
|
|||
|
|
"SELECT project_type FROM projects WHERE id='t1'"
|
|||
|
|
).fetchone()
|
|||
|
|
assert row["project_type"] == "development"
|
|||
|
|
|
|||
|
|
def test_ssh_fields_default_to_null(self, conn):
|
|||
|
|
"""SSH-поля по умолчанию NULL."""
|
|||
|
|
conn.execute(
|
|||
|
|
"INSERT INTO projects (id, name, path) VALUES ('t2', 'T', '/t')"
|
|||
|
|
)
|
|||
|
|
conn.commit()
|
|||
|
|
row = conn.execute(
|
|||
|
|
"SELECT ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump FROM projects WHERE id='t2'"
|
|||
|
|
).fetchone()
|
|||
|
|
assert row["ssh_host"] is None
|
|||
|
|
assert row["ssh_user"] is None
|
|||
|
|
assert row["ssh_key_path"] is None
|
|||
|
|
assert row["ssh_proxy_jump"] is None
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Migration: _migrate добавляет KIN-071 колонки в старую схему (без них)
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
def _old_schema_conn() -> sqlite3.Connection:
|
|||
|
|
"""Создаёт соединение с минимальной 'старой' схемой без KIN-071 колонок."""
|
|||
|
|
conn = sqlite3.connect(":memory:")
|
|||
|
|
conn.row_factory = sqlite3.Row
|
|||
|
|
conn.executescript("""
|
|||
|
|
CREATE TABLE projects (
|
|||
|
|
id TEXT PRIMARY KEY,
|
|||
|
|
name TEXT NOT NULL,
|
|||
|
|
path TEXT NOT NULL,
|
|||
|
|
status TEXT DEFAULT 'active',
|
|||
|
|
language TEXT DEFAULT 'ru',
|
|||
|
|
execution_mode TEXT NOT NULL DEFAULT 'review'
|
|||
|
|
);
|
|||
|
|
CREATE TABLE tasks (
|
|||
|
|
id TEXT PRIMARY KEY,
|
|||
|
|
project_id TEXT NOT NULL,
|
|||
|
|
title TEXT NOT NULL,
|
|||
|
|
status TEXT DEFAULT 'pending',
|
|||
|
|
execution_mode TEXT
|
|||
|
|
);
|
|||
|
|
""")
|
|||
|
|
conn.commit()
|
|||
|
|
return conn
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_migrate_adds_project_type_to_old_schema():
|
|||
|
|
"""_migrate добавляет project_type в старую схему без этой колонки."""
|
|||
|
|
conn = _old_schema_conn()
|
|||
|
|
_migrate(conn)
|
|||
|
|
assert "project_type" in _cols(conn, "projects")
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_migrate_adds_ssh_host_to_old_schema():
|
|||
|
|
"""_migrate добавляет ssh_host в старую схему."""
|
|||
|
|
conn = _old_schema_conn()
|
|||
|
|
_migrate(conn)
|
|||
|
|
assert "ssh_host" in _cols(conn, "projects")
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_migrate_adds_all_ssh_columns_to_old_schema():
|
|||
|
|
"""_migrate добавляет все SSH-колонки разом в старую схему."""
|
|||
|
|
conn = _old_schema_conn()
|
|||
|
|
_migrate(conn)
|
|||
|
|
cols = _cols(conn, "projects")
|
|||
|
|
assert {"ssh_host", "ssh_user", "ssh_key_path", "ssh_proxy_jump", "description"}.issubset(cols)
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_migrate_is_idempotent():
|
|||
|
|
"""Повторный вызов _migrate не ломает схему."""
|
|||
|
|
conn = init_db(":memory:")
|
|||
|
|
before = _cols(conn, "projects")
|
|||
|
|
_migrate(conn)
|
|||
|
|
after = _cols(conn, "projects")
|
|||
|
|
assert before == after
|
|||
|
|
conn.close()
|