"""Tests for core/db.py — schema and migration (KIN-071, KIN-073).""" import sqlite3 import pytest from core.db import init_db, _migrate @pytest.fixture def conn(): c = init_db(db_path=":memory:") yield c c.close() def _cols(conn, table: str) -> set[str]: """Return set of column names for a table.""" return {row["name"] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()} # --------------------------------------------------------------------------- # Schema: новые колонки KIN-071 присутствуют при свежей инициализации # --------------------------------------------------------------------------- class TestProjectsSchemaKin071: """PRAGMA table_info(projects) должен содержать новые KIN-071 колонки.""" def test_schema_has_project_type_column(self, conn): assert "project_type" in _cols(conn, "projects") def test_schema_has_ssh_host_column(self, conn): assert "ssh_host" in _cols(conn, "projects") def test_schema_has_ssh_user_column(self, conn): assert "ssh_user" in _cols(conn, "projects") def test_schema_has_ssh_key_path_column(self, conn): assert "ssh_key_path" in _cols(conn, "projects") def test_schema_has_ssh_proxy_jump_column(self, conn): assert "ssh_proxy_jump" in _cols(conn, "projects") def test_schema_has_description_column(self, conn): assert "description" in _cols(conn, "projects") def test_project_type_defaults_to_development(self, conn): """INSERT без project_type → значение по умолчанию 'development'.""" conn.execute( "INSERT INTO projects (id, name, path) VALUES ('t1', 'T', '/t')" ) conn.commit() row = conn.execute( "SELECT project_type FROM projects WHERE id='t1'" ).fetchone() assert row["project_type"] == "development" def test_ssh_fields_default_to_null(self, conn): """SSH-поля по умолчанию NULL.""" conn.execute( "INSERT INTO projects (id, name, path) VALUES ('t2', 'T', '/t')" ) conn.commit() row = conn.execute( "SELECT ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump FROM projects WHERE id='t2'" ).fetchone() assert row["ssh_host"] is None assert row["ssh_user"] is None assert row["ssh_key_path"] is None assert row["ssh_proxy_jump"] is None # --------------------------------------------------------------------------- # Migration: _migrate добавляет KIN-071 колонки в старую схему (без них) # --------------------------------------------------------------------------- def _old_schema_conn() -> sqlite3.Connection: """Создаёт соединение с минимальной 'старой' схемой без KIN-071 колонок.""" conn = sqlite3.connect(":memory:") conn.row_factory = sqlite3.Row conn.executescript(""" CREATE TABLE projects ( id TEXT PRIMARY KEY, name TEXT NOT NULL, path TEXT NOT NULL, status TEXT DEFAULT 'active', language TEXT DEFAULT 'ru', execution_mode TEXT NOT NULL DEFAULT 'review' ); CREATE TABLE tasks ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, title TEXT NOT NULL, status TEXT DEFAULT 'pending', execution_mode TEXT ); """) conn.commit() return conn def test_migrate_adds_project_type_to_old_schema(): """_migrate добавляет project_type в старую схему без этой колонки.""" conn = _old_schema_conn() _migrate(conn) assert "project_type" in _cols(conn, "projects") conn.close() def test_migrate_adds_ssh_host_to_old_schema(): """_migrate добавляет ssh_host в старую схему.""" conn = _old_schema_conn() _migrate(conn) assert "ssh_host" in _cols(conn, "projects") conn.close() def test_migrate_adds_all_ssh_columns_to_old_schema(): """_migrate добавляет все SSH-колонки разом в старую схему.""" conn = _old_schema_conn() _migrate(conn) cols = _cols(conn, "projects") assert {"ssh_host", "ssh_user", "ssh_key_path", "ssh_proxy_jump", "description"}.issubset(cols) conn.close() def test_migrate_is_idempotent(): """Повторный вызов _migrate не ломает схему.""" conn = init_db(":memory:") before = _cols(conn, "projects") _migrate(conn) after = _cols(conn, "projects") assert before == after conn.close() # --------------------------------------------------------------------------- # Migration KIN-UI-002: рекреация таблицы на минимальной схеме не падает # --------------------------------------------------------------------------- def test_migrate_recreates_table_without_operationalerror(): """_migrate не бросает OperationalError при рекреации projects на минимальной схеме. Регрессионный тест KIN-UI-002: INSERT SELECT в блоке KIN-ARCH-003 ранее падал на отсутствующих колонках (tech_stack, priority, pm_prompt и др.). """ conn = _old_schema_conn() # path NOT NULL — триггер рекреации try: _migrate(conn) except Exception as exc: pytest.fail(f"_migrate raised {type(exc).__name__}: {exc}") conn.close() def test_migrate_path_becomes_nullable_on_old_schema(): """После миграции старой схемы (path NOT NULL) колонка path становится nullable.""" conn = _old_schema_conn() _migrate(conn) path_col = next( r for r in conn.execute("PRAGMA table_info(projects)").fetchall() if r[1] == "path" ) assert path_col[3] == 0, "path должна быть nullable после миграции KIN-ARCH-003" conn.close() def test_migrate_preserves_existing_rows_on_recreation(): """Рекреация таблицы сохраняет существующие строки.""" conn = _old_schema_conn() conn.execute( "INSERT INTO projects (id, name, path, status) VALUES ('p1', 'MyProj', '/p', 'active')" ) conn.commit() _migrate(conn) row = conn.execute("SELECT id, name, path, status FROM projects WHERE id='p1'").fetchone() assert row is not None assert row["name"] == "MyProj" assert row["path"] == "/p" assert row["status"] == "active" conn.close() def test_migrate_adds_missing_columns_before_recreation(): """_migrate добавляет tech_stack, priority, pm_prompt, claude_md_path, forgejo_repo, created_at перед рекреацией.""" conn = _old_schema_conn() _migrate(conn) cols = _cols(conn, "projects") required = {"tech_stack", "priority", "pm_prompt", "claude_md_path", "forgejo_repo", "created_at"} assert required.issubset(cols), f"Отсутствуют колонки: {required - cols}" conn.close() def test_migrate_operations_project_with_null_path(): """После миграции можно вставить operations-проект с path=NULL.""" conn = _old_schema_conn() _migrate(conn) conn.execute( "INSERT INTO projects (id, name, path, project_type) VALUES ('ops1', 'Ops', NULL, 'operations')" ) conn.commit() row = conn.execute("SELECT path, project_type FROM projects WHERE id='ops1'").fetchone() assert row["path"] is None assert row["project_type"] == "operations" conn.close() # --------------------------------------------------------------------------- # Schema KIN-073: acceptance_criteria в таблице tasks # --------------------------------------------------------------------------- class TestTasksAcceptanceCriteriaSchema: """Колонка acceptance_criteria присутствует в таблице tasks.""" def test_schema_has_acceptance_criteria_column(self, conn): assert "acceptance_criteria" in _cols(conn, "tasks") def test_acceptance_criteria_defaults_to_null(self, conn): """Создание задачи без acceptance_criteria — поле NULL (nullable).""" conn.execute( "INSERT INTO projects (id, name, path) VALUES ('p1', 'P', '/p')" ) conn.execute( "INSERT INTO tasks (id, project_id, title) VALUES ('t1', 'p1', 'My Task')" ) conn.commit() row = conn.execute( "SELECT acceptance_criteria FROM tasks WHERE id='t1'" ).fetchone() assert row["acceptance_criteria"] is None def test_create_task_with_acceptance_criteria_saves_field(self, conn): """Создание задачи с acceptance_criteria — значение сохраняется в БД.""" conn.execute( "INSERT INTO projects (id, name, path) VALUES ('p2', 'P', '/p')" ) criteria = "Поле должно сохраняться. GET возвращает значение." conn.execute( "INSERT INTO tasks (id, project_id, title, acceptance_criteria)" " VALUES ('t2', 'p2', 'Task with criteria', ?)", (criteria,), ) conn.commit() row = conn.execute( "SELECT acceptance_criteria FROM tasks WHERE id='t2'" ).fetchone() assert row["acceptance_criteria"] == criteria def test_get_task_returns_acceptance_criteria(self, conn): """SELECT задачи возвращает acceptance_criteria (критерий приёмки 3).""" conn.execute( "INSERT INTO projects (id, name, path) VALUES ('p3', 'P', '/p')" ) conn.execute( "INSERT INTO tasks (id, project_id, title, acceptance_criteria)" " VALUES ('t3', 'p3', 'T', 'AC value')", ) conn.commit() row = conn.execute("SELECT * FROM tasks WHERE id='t3'").fetchone() assert row["acceptance_criteria"] == "AC value" # --------------------------------------------------------------------------- # Migration KIN-073: _migrate добавляет acceptance_criteria в старую схему # --------------------------------------------------------------------------- def test_migrate_adds_acceptance_criteria_to_old_schema(): """_migrate добавляет acceptance_criteria в tasks если колонки нет.""" conn = _old_schema_conn() _migrate(conn) assert "acceptance_criteria" in _cols(conn, "tasks") conn.close() def test_migrate_acceptance_criteria_is_nullable_after_migration(): """После миграции acceptance_criteria nullable — старые строки не ломаются.""" conn = _old_schema_conn() conn.execute( "INSERT INTO projects (id, name, path) VALUES ('pm', 'P', '/p')" ) conn.execute( "INSERT INTO tasks (id, project_id, title) VALUES ('tm', 'pm', 'Old Task')" ) conn.commit() _migrate(conn) row = conn.execute("SELECT acceptance_criteria FROM tasks WHERE id='tm'").fetchone() assert row["acceptance_criteria"] is None conn.close() # --------------------------------------------------------------------------- # Regression KIN-FIX-014: guard "if pipelines in existing_tables" # Финальный UPDATE pipelines в _migrate не должен падать на схемах без этой таблицы. # --------------------------------------------------------------------------- def _old_schema_with_pipelines_conn() -> sqlite3.Connection: """Минимальная схема С таблицей pipelines (dept_sub + running записи).""" conn = sqlite3.connect(":memory:") conn.row_factory = sqlite3.Row conn.executescript(""" CREATE TABLE projects ( id TEXT PRIMARY KEY, name TEXT NOT NULL, path TEXT, status TEXT DEFAULT 'active', language TEXT DEFAULT 'ru', execution_mode TEXT NOT NULL DEFAULT 'review' ); CREATE TABLE tasks ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, title TEXT NOT NULL, status TEXT DEFAULT 'pending', execution_mode TEXT ); CREATE TABLE pipelines ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, project_id TEXT NOT NULL, route_type TEXT NOT NULL, steps TEXT NOT NULL DEFAULT '[]', status TEXT DEFAULT 'running' ); """) conn.commit() return conn def test_migrate_without_pipelines_table_does_not_raise(): """Regression KIN-FIX-014: _migrate на схеме без таблицы pipelines не бросает исключение. До фикса: безусловный UPDATE pipelines падал с 'no such table: pipelines' на тест-фикстурах с минимальной схемой (только projects + tasks). """ conn = _old_schema_conn() try: _migrate(conn) except Exception as exc: pytest.fail( f"_migrate raised {type(exc).__name__} при отсутствии таблицы pipelines: {exc}" ) conn.close() def test_migrate_sets_orphaned_dept_sub_running_to_failed(): """_migrate обновляет dept_sub pipelines со статусом 'running' → 'failed'. Исправляет призрачные записи от двойного создания пайплайна (KIN-ARCH-012). """ conn = _old_schema_with_pipelines_conn() conn.execute( "INSERT INTO projects (id, name) VALUES ('p1', 'P')" ) conn.execute( "INSERT INTO tasks (id, project_id, title) VALUES ('t1', 'p1', 'T')" ) conn.execute( "INSERT INTO pipelines (task_id, project_id, route_type, status)" " VALUES ('t1', 'p1', 'dept_sub', 'running')" ) conn.commit() _migrate(conn) row = conn.execute("SELECT status FROM pipelines WHERE route_type = 'dept_sub'").fetchone() assert row["status"] == "failed", ( "dept_sub pipeline со статусом 'running' должен стать 'failed' после миграции" ) conn.close() def test_migrate_preserves_non_dept_sub_and_non_running_pipelines(): """_migrate не трогает pipelines, которые не являются dept_sub+running.""" conn = _old_schema_with_pipelines_conn() conn.execute("INSERT INTO projects (id, name) VALUES ('p2', 'P')") conn.execute("INSERT INTO tasks (id, project_id, title) VALUES ('t2', 'p2', 'T')") # dept_sub но уже completed — не трогать conn.execute( "INSERT INTO pipelines (task_id, project_id, route_type, status)" " VALUES ('t2', 'p2', 'dept_sub', 'completed')" ) # обычный running pipeline другого типа — не трогать conn.execute( "INSERT INTO pipelines (task_id, project_id, route_type, status)" " VALUES ('t2', 'p2', 'direct', 'running')" ) conn.commit() _migrate(conn) rows = { r["route_type"]: r["status"] for r in conn.execute("SELECT route_type, status FROM pipelines").fetchall() } assert rows["dept_sub"] == "completed", "completed pipeline не должен меняться" assert rows["direct"] == "running", "non-dept_sub running pipeline не должен меняться" conn.close() # --------------------------------------------------------------------------- # Schema KIN-OBS-014: pid INTEGER присутствует в SCHEMA-строке CREATE TABLE pipelines # --------------------------------------------------------------------------- def test_schema_string_contains_pid_integer_in_pipelines_ddl(): """Регрессия KIN-OBS-014: SCHEMA-строка содержит 'pid INTEGER' в теле CREATE TABLE pipelines. До фикса поле было только в _migrate(), но отсутствовало в SCHEMA — при свежей инициализации БД колонка не создавалась. Тест парсит блок CREATE TABLE pipelines напрямую из SCHEMA. """ from core.db import SCHEMA start = SCHEMA.index("CREATE TABLE IF NOT EXISTS pipelines") end = SCHEMA.index(");", start) + 2 pipelines_ddl = SCHEMA[start:end] assert "pid INTEGER" in pipelines_ddl, ( "pid INTEGER не найден в SCHEMA CREATE TABLE pipelines — " "поле должно быть в SCHEMA (core/db.py строки ~133-148), а не только в _migrate()" ) # --------------------------------------------------------------------------- # Schema KIN-OBS-018: pid INTEGER присутствует в таблице pipelines # --------------------------------------------------------------------------- class TestPipelinesSchemaKinObs018: """PRAGMA table_info(pipelines) должен содержать kolонку pid INTEGER.""" def test_schema_has_pid_column(self, conn): """Свежая инициализация: pipelines содержит колонку pid.""" assert "pid" in _cols(conn, "pipelines") def test_pid_defaults_to_null(self, conn): """Вставка pipeline без pid — значение NULL.""" conn.execute( "INSERT INTO projects (id, name, path) VALUES ('p_pid', 'P', '/p')" ) conn.execute( "INSERT INTO tasks (id, project_id, title) VALUES ('t_pid', 'p_pid', 'T')" ) conn.execute( "INSERT INTO pipelines (task_id, project_id, route_type, steps)" " VALUES ('t_pid', 'p_pid', 'direct', '[]')" ) conn.commit() row = conn.execute( "SELECT pid FROM pipelines WHERE task_id='t_pid'" ).fetchone() assert row["pid"] is None def test_pid_can_store_integer_value(self, conn): """Вставка pipeline с pid — значение сохраняется в БД.""" conn.execute( "INSERT INTO projects (id, name, path) VALUES ('p_pid2', 'P', '/p')" ) conn.execute( "INSERT INTO tasks (id, project_id, title) VALUES ('t_pid2', 'p_pid2', 'T')" ) conn.execute( "INSERT INTO pipelines (task_id, project_id, route_type, steps, pid)" " VALUES ('t_pid2', 'p_pid2', 'direct', '[]', 12345)" ) conn.commit() row = conn.execute( "SELECT pid FROM pipelines WHERE task_id='t_pid2'" ).fetchone() assert row["pid"] == 12345 # --------------------------------------------------------------------------- # Migration KIN-OBS-018: _migrate добавляет pid в существующие БД без этой колонки # Конвенция #384: три теста для _migrate guard # --------------------------------------------------------------------------- def test_migrate_adds_pid_to_pipelines_without_column(): """_migrate добавляет pid в таблицу pipelines если колонки нет.""" conn = _old_schema_with_pipelines_conn() # схема без pid _migrate(conn) assert "pid" in _cols(conn, "pipelines") conn.close() def test_migrate_pid_is_nullable_after_migration(): """После миграции pid nullable — существующие строки не ломаются (pid=NULL).""" conn = _old_schema_with_pipelines_conn() conn.execute("INSERT INTO projects (id, name) VALUES ('pm_pid', 'P')") conn.execute("INSERT INTO tasks (id, project_id, title) VALUES ('tm_pid', 'pm_pid', 'T')") conn.execute( "INSERT INTO pipelines (task_id, project_id, route_type, status)" " VALUES ('tm_pid', 'pm_pid', 'direct', 'completed')" ) conn.commit() _migrate(conn) row = conn.execute("SELECT pid FROM pipelines WHERE task_id='tm_pid'").fetchone() assert row["pid"] is None conn.close() def test_migrate_pid_guard_is_idempotent(): """Повторный вызов _migrate не ломает схему pipelines (guard idempotent).""" conn = _old_schema_with_pipelines_conn() _migrate(conn) before = _cols(conn, "pipelines") _migrate(conn) after = _cols(conn, "pipelines") assert before == after assert "pid" in after conn.close() # --------------------------------------------------------------------------- # Schema KIN-084: pipeline_log в SCHEMA-строке и _migrate() # Конвенция #387: DDL-парсинг SCHEMA-строки # --------------------------------------------------------------------------- def test_schema_string_contains_pipeline_log_table(): """Регрессия KIN-084: SCHEMA-строка содержит CREATE TABLE pipeline_log.""" from core.db import SCHEMA assert "CREATE TABLE IF NOT EXISTS pipeline_log" in SCHEMA def test_schema_pipeline_log_has_required_columns(): """SCHEMA pipeline_log содержит все обязательные колонки.""" from core.db import SCHEMA start = SCHEMA.index("CREATE TABLE IF NOT EXISTS pipeline_log") end = SCHEMA.index(");", start) + 2 ddl = SCHEMA[start:end] assert "pipeline_id INTEGER" in ddl assert "ts " in ddl assert "level " in ddl assert "message " in ddl assert "extra_json " in ddl def test_migrate_creates_pipeline_log_on_fresh_init(conn): """Свежая инициализация: pipeline_log существует.""" tables = {r["name"] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} assert "pipeline_log" in tables def test_migrate_creates_pipeline_log_if_missing(): """_migrate создаёт pipeline_log если таблица отсутствует в старой БД.""" conn = _old_schema_conn() _migrate(conn) tables = {r["name"] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} assert "pipeline_log" in tables conn.close()