kin/tests/test_db.py
2026-03-17 16:51:17 +02:00

504 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for core/db.py — schema and migration (KIN-071, KIN-073)."""
import sqlite3
import pytest
from core.db import init_db, _migrate
@pytest.fixture
def conn():
c = init_db(db_path=":memory:")
yield c
c.close()
def _cols(conn, table: str) -> set[str]:
"""Return set of column names for a table."""
return {row["name"] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()}
# ---------------------------------------------------------------------------
# Schema: новые колонки KIN-071 присутствуют при свежей инициализации
# ---------------------------------------------------------------------------
class TestProjectsSchemaKin071:
"""PRAGMA table_info(projects) должен содержать новые KIN-071 колонки."""
def test_schema_has_project_type_column(self, conn):
assert "project_type" in _cols(conn, "projects")
def test_schema_has_ssh_host_column(self, conn):
assert "ssh_host" in _cols(conn, "projects")
def test_schema_has_ssh_user_column(self, conn):
assert "ssh_user" in _cols(conn, "projects")
def test_schema_has_ssh_key_path_column(self, conn):
assert "ssh_key_path" in _cols(conn, "projects")
def test_schema_has_ssh_proxy_jump_column(self, conn):
assert "ssh_proxy_jump" in _cols(conn, "projects")
def test_schema_has_description_column(self, conn):
assert "description" in _cols(conn, "projects")
def test_project_type_defaults_to_development(self, conn):
"""INSERT без project_type → значение по умолчанию 'development'."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('t1', 'T', '/t')"
)
conn.commit()
row = conn.execute(
"SELECT project_type FROM projects WHERE id='t1'"
).fetchone()
assert row["project_type"] == "development"
def test_ssh_fields_default_to_null(self, conn):
"""SSH-поля по умолчанию NULL."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('t2', 'T', '/t')"
)
conn.commit()
row = conn.execute(
"SELECT ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump FROM projects WHERE id='t2'"
).fetchone()
assert row["ssh_host"] is None
assert row["ssh_user"] is None
assert row["ssh_key_path"] is None
assert row["ssh_proxy_jump"] is None
# ---------------------------------------------------------------------------
# Migration: _migrate добавляет KIN-071 колонки в старую схему (без них)
# ---------------------------------------------------------------------------
def _old_schema_conn() -> sqlite3.Connection:
"""Создаёт соединение с минимальной 'старой' схемой без KIN-071 колонок."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript("""
CREATE TABLE projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
path TEXT NOT NULL,
status TEXT DEFAULT 'active',
language TEXT DEFAULT 'ru',
execution_mode TEXT NOT NULL DEFAULT 'review'
);
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
title TEXT NOT NULL,
status TEXT DEFAULT 'pending',
execution_mode TEXT
);
""")
conn.commit()
return conn
def test_migrate_adds_project_type_to_old_schema():
"""_migrate добавляет project_type в старую схему без этой колонки."""
conn = _old_schema_conn()
_migrate(conn)
assert "project_type" in _cols(conn, "projects")
conn.close()
def test_migrate_adds_ssh_host_to_old_schema():
"""_migrate добавляет ssh_host в старую схему."""
conn = _old_schema_conn()
_migrate(conn)
assert "ssh_host" in _cols(conn, "projects")
conn.close()
def test_migrate_adds_all_ssh_columns_to_old_schema():
"""_migrate добавляет все SSH-колонки разом в старую схему."""
conn = _old_schema_conn()
_migrate(conn)
cols = _cols(conn, "projects")
assert {"ssh_host", "ssh_user", "ssh_key_path", "ssh_proxy_jump", "description"}.issubset(cols)
conn.close()
def test_migrate_is_idempotent():
"""Повторный вызов _migrate не ломает схему."""
conn = init_db(":memory:")
before = _cols(conn, "projects")
_migrate(conn)
after = _cols(conn, "projects")
assert before == after
conn.close()
# ---------------------------------------------------------------------------
# Migration KIN-UI-002: рекреация таблицы на минимальной схеме не падает
# ---------------------------------------------------------------------------
def test_migrate_recreates_table_without_operationalerror():
"""_migrate не бросает OperationalError при рекреации projects на минимальной схеме.
Регрессионный тест KIN-UI-002: INSERT SELECT в блоке KIN-ARCH-003 ранее
падал на отсутствующих колонках (tech_stack, priority, pm_prompt и др.).
"""
conn = _old_schema_conn() # path NOT NULL — триггер рекреации
try:
_migrate(conn)
except Exception as exc:
pytest.fail(f"_migrate raised {type(exc).__name__}: {exc}")
conn.close()
def test_migrate_path_becomes_nullable_on_old_schema():
"""После миграции старой схемы (path NOT NULL) колонка path становится nullable."""
conn = _old_schema_conn()
_migrate(conn)
path_col = next(
r for r in conn.execute("PRAGMA table_info(projects)").fetchall()
if r[1] == "path"
)
assert path_col[3] == 0, "path должна быть nullable после миграции KIN-ARCH-003"
conn.close()
def test_migrate_preserves_existing_rows_on_recreation():
"""Рекреация таблицы сохраняет существующие строки."""
conn = _old_schema_conn()
conn.execute(
"INSERT INTO projects (id, name, path, status) VALUES ('p1', 'MyProj', '/p', 'active')"
)
conn.commit()
_migrate(conn)
row = conn.execute("SELECT id, name, path, status FROM projects WHERE id='p1'").fetchone()
assert row is not None
assert row["name"] == "MyProj"
assert row["path"] == "/p"
assert row["status"] == "active"
conn.close()
def test_migrate_adds_missing_columns_before_recreation():
"""_migrate добавляет tech_stack, priority, pm_prompt, claude_md_path, forgejo_repo, created_at перед рекреацией."""
conn = _old_schema_conn()
_migrate(conn)
cols = _cols(conn, "projects")
required = {"tech_stack", "priority", "pm_prompt", "claude_md_path", "forgejo_repo", "created_at"}
assert required.issubset(cols), f"Отсутствуют колонки: {required - cols}"
conn.close()
def test_migrate_operations_project_with_null_path():
"""После миграции можно вставить operations-проект с path=NULL."""
conn = _old_schema_conn()
_migrate(conn)
conn.execute(
"INSERT INTO projects (id, name, path, project_type) VALUES ('ops1', 'Ops', NULL, 'operations')"
)
conn.commit()
row = conn.execute("SELECT path, project_type FROM projects WHERE id='ops1'").fetchone()
assert row["path"] is None
assert row["project_type"] == "operations"
conn.close()
# ---------------------------------------------------------------------------
# Schema KIN-073: acceptance_criteria в таблице tasks
# ---------------------------------------------------------------------------
class TestTasksAcceptanceCriteriaSchema:
"""Колонка acceptance_criteria присутствует в таблице tasks."""
def test_schema_has_acceptance_criteria_column(self, conn):
assert "acceptance_criteria" in _cols(conn, "tasks")
def test_acceptance_criteria_defaults_to_null(self, conn):
"""Создание задачи без acceptance_criteria — поле NULL (nullable)."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p1', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('t1', 'p1', 'My Task')"
)
conn.commit()
row = conn.execute(
"SELECT acceptance_criteria FROM tasks WHERE id='t1'"
).fetchone()
assert row["acceptance_criteria"] is None
def test_create_task_with_acceptance_criteria_saves_field(self, conn):
"""Создание задачи с acceptance_criteria — значение сохраняется в БД."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p2', 'P', '/p')"
)
criteria = "Поле должно сохраняться. GET возвращает значение."
conn.execute(
"INSERT INTO tasks (id, project_id, title, acceptance_criteria)"
" VALUES ('t2', 'p2', 'Task with criteria', ?)",
(criteria,),
)
conn.commit()
row = conn.execute(
"SELECT acceptance_criteria FROM tasks WHERE id='t2'"
).fetchone()
assert row["acceptance_criteria"] == criteria
def test_get_task_returns_acceptance_criteria(self, conn):
"""SELECT задачи возвращает acceptance_criteria (критерий приёмки 3)."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p3', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title, acceptance_criteria)"
" VALUES ('t3', 'p3', 'T', 'AC value')",
)
conn.commit()
row = conn.execute("SELECT * FROM tasks WHERE id='t3'").fetchone()
assert row["acceptance_criteria"] == "AC value"
# ---------------------------------------------------------------------------
# Migration KIN-073: _migrate добавляет acceptance_criteria в старую схему
# ---------------------------------------------------------------------------
def test_migrate_adds_acceptance_criteria_to_old_schema():
"""_migrate добавляет acceptance_criteria в tasks если колонки нет."""
conn = _old_schema_conn()
_migrate(conn)
assert "acceptance_criteria" in _cols(conn, "tasks")
conn.close()
def test_migrate_acceptance_criteria_is_nullable_after_migration():
"""После миграции acceptance_criteria nullable — старые строки не ломаются."""
conn = _old_schema_conn()
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('pm', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('tm', 'pm', 'Old Task')"
)
conn.commit()
_migrate(conn)
row = conn.execute("SELECT acceptance_criteria FROM tasks WHERE id='tm'").fetchone()
assert row["acceptance_criteria"] is None
conn.close()
# ---------------------------------------------------------------------------
# Regression KIN-FIX-014: guard "if pipelines in existing_tables"
# Финальный UPDATE pipelines в _migrate не должен падать на схемах без этой таблицы.
# ---------------------------------------------------------------------------
def _old_schema_with_pipelines_conn() -> sqlite3.Connection:
"""Минимальная схема С таблицей pipelines (dept_sub + running записи)."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript("""
CREATE TABLE projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
path TEXT,
status TEXT DEFAULT 'active',
language TEXT DEFAULT 'ru',
execution_mode TEXT NOT NULL DEFAULT 'review'
);
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
title TEXT NOT NULL,
status TEXT DEFAULT 'pending',
execution_mode TEXT
);
CREATE TABLE pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
project_id TEXT NOT NULL,
route_type TEXT NOT NULL,
steps TEXT NOT NULL DEFAULT '[]',
status TEXT DEFAULT 'running'
);
""")
conn.commit()
return conn
def test_migrate_without_pipelines_table_does_not_raise():
"""Regression KIN-FIX-014: _migrate на схеме без таблицы pipelines не бросает исключение.
До фикса: безусловный UPDATE pipelines падал с 'no such table: pipelines'
на тест-фикстурах с минимальной схемой (только projects + tasks).
"""
conn = _old_schema_conn()
try:
_migrate(conn)
except Exception as exc:
pytest.fail(
f"_migrate raised {type(exc).__name__} при отсутствии таблицы pipelines: {exc}"
)
conn.close()
def test_migrate_sets_orphaned_dept_sub_running_to_failed():
"""_migrate обновляет dept_sub pipelines со статусом 'running''failed'.
Исправляет призрачные записи от двойного создания пайплайна (KIN-ARCH-012).
"""
conn = _old_schema_with_pipelines_conn()
conn.execute(
"INSERT INTO projects (id, name) VALUES ('p1', 'P')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('t1', 'p1', 'T')"
)
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, status)"
" VALUES ('t1', 'p1', 'dept_sub', 'running')"
)
conn.commit()
_migrate(conn)
row = conn.execute("SELECT status FROM pipelines WHERE route_type = 'dept_sub'").fetchone()
assert row["status"] == "failed", (
"dept_sub pipeline со статусом 'running' должен стать 'failed' после миграции"
)
conn.close()
def test_migrate_preserves_non_dept_sub_and_non_running_pipelines():
"""_migrate не трогает pipelines, которые не являются dept_sub+running."""
conn = _old_schema_with_pipelines_conn()
conn.execute("INSERT INTO projects (id, name) VALUES ('p2', 'P')")
conn.execute("INSERT INTO tasks (id, project_id, title) VALUES ('t2', 'p2', 'T')")
# dept_sub но уже completed — не трогать
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, status)"
" VALUES ('t2', 'p2', 'dept_sub', 'completed')"
)
# обычный running pipeline другого типа — не трогать
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, status)"
" VALUES ('t2', 'p2', 'direct', 'running')"
)
conn.commit()
_migrate(conn)
rows = {
r["route_type"]: r["status"]
for r in conn.execute("SELECT route_type, status FROM pipelines").fetchall()
}
assert rows["dept_sub"] == "completed", "completed pipeline не должен меняться"
assert rows["direct"] == "running", "non-dept_sub running pipeline не должен меняться"
conn.close()
# ---------------------------------------------------------------------------
# Schema KIN-OBS-014: pid INTEGER присутствует в SCHEMA-строке CREATE TABLE pipelines
# ---------------------------------------------------------------------------
def test_schema_string_contains_pid_integer_in_pipelines_ddl():
"""Регрессия KIN-OBS-014: SCHEMA-строка содержит 'pid INTEGER' в теле CREATE TABLE pipelines.
До фикса поле было только в _migrate(), но отсутствовало в SCHEMA —
при свежей инициализации БД колонка не создавалась.
Тест парсит блок CREATE TABLE pipelines напрямую из SCHEMA.
"""
from core.db import SCHEMA
start = SCHEMA.index("CREATE TABLE IF NOT EXISTS pipelines")
end = SCHEMA.index(");", start) + 2
pipelines_ddl = SCHEMA[start:end]
assert "pid INTEGER" in pipelines_ddl, (
"pid INTEGER не найден в SCHEMA CREATE TABLE pipelines — "
"поле должно быть в SCHEMA (core/db.py строки ~133-148), а не только в _migrate()"
)
# ---------------------------------------------------------------------------
# Schema KIN-OBS-018: pid INTEGER присутствует в таблице pipelines
# ---------------------------------------------------------------------------
class TestPipelinesSchemaKinObs018:
"""PRAGMA table_info(pipelines) должен содержать kolонку pid INTEGER."""
def test_schema_has_pid_column(self, conn):
"""Свежая инициализация: pipelines содержит колонку pid."""
assert "pid" in _cols(conn, "pipelines")
def test_pid_defaults_to_null(self, conn):
"""Вставка pipeline без pid — значение NULL."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p_pid', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('t_pid', 'p_pid', 'T')"
)
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, steps)"
" VALUES ('t_pid', 'p_pid', 'direct', '[]')"
)
conn.commit()
row = conn.execute(
"SELECT pid FROM pipelines WHERE task_id='t_pid'"
).fetchone()
assert row["pid"] is None
def test_pid_can_store_integer_value(self, conn):
"""Вставка pipeline с pid — значение сохраняется в БД."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p_pid2', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('t_pid2', 'p_pid2', 'T')"
)
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, steps, pid)"
" VALUES ('t_pid2', 'p_pid2', 'direct', '[]', 12345)"
)
conn.commit()
row = conn.execute(
"SELECT pid FROM pipelines WHERE task_id='t_pid2'"
).fetchone()
assert row["pid"] == 12345
# ---------------------------------------------------------------------------
# Migration KIN-OBS-018: _migrate добавляет pid в существующие БД без этой колонки
# Конвенция #384: три теста для _migrate guard
# ---------------------------------------------------------------------------
def test_migrate_adds_pid_to_pipelines_without_column():
"""_migrate добавляет pid в таблицу pipelines если колонки нет."""
conn = _old_schema_with_pipelines_conn() # схема без pid
_migrate(conn)
assert "pid" in _cols(conn, "pipelines")
conn.close()
def test_migrate_pid_is_nullable_after_migration():
"""После миграции pid nullable — существующие строки не ломаются (pid=NULL)."""
conn = _old_schema_with_pipelines_conn()
conn.execute("INSERT INTO projects (id, name) VALUES ('pm_pid', 'P')")
conn.execute("INSERT INTO tasks (id, project_id, title) VALUES ('tm_pid', 'pm_pid', 'T')")
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, status)"
" VALUES ('tm_pid', 'pm_pid', 'direct', 'completed')"
)
conn.commit()
_migrate(conn)
row = conn.execute("SELECT pid FROM pipelines WHERE task_id='tm_pid'").fetchone()
assert row["pid"] is None
conn.close()
def test_migrate_pid_guard_is_idempotent():
"""Повторный вызов _migrate не ломает схему pipelines (guard idempotent)."""
conn = _old_schema_with_pipelines_conn()
_migrate(conn)
before = _cols(conn, "pipelines")
_migrate(conn)
after = _cols(conn, "pipelines")
assert before == after
assert "pid" in after
conn.close()