kin/tests/test_db.py

585 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for core/db.py — schema and migration (KIN-071, KIN-073)."""
import importlib
import os
import sqlite3
import pytest
from pathlib import Path
from unittest.mock import patch
from core.db import init_db, _migrate
@pytest.fixture
def conn():
c = init_db(db_path=":memory:")
yield c
c.close()
def _cols(conn, table: str) -> set[str]:
"""Return set of column names for a table."""
return {row["name"] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()}
# ---------------------------------------------------------------------------
# Schema: новые колонки KIN-071 присутствуют при свежей инициализации
# ---------------------------------------------------------------------------
class TestProjectsSchemaKin071:
"""PRAGMA table_info(projects) должен содержать новые KIN-071 колонки."""
def test_schema_has_project_type_column(self, conn):
assert "project_type" in _cols(conn, "projects")
def test_schema_has_ssh_host_column(self, conn):
assert "ssh_host" in _cols(conn, "projects")
def test_schema_has_ssh_user_column(self, conn):
assert "ssh_user" in _cols(conn, "projects")
def test_schema_has_ssh_key_path_column(self, conn):
assert "ssh_key_path" in _cols(conn, "projects")
def test_schema_has_ssh_proxy_jump_column(self, conn):
assert "ssh_proxy_jump" in _cols(conn, "projects")
def test_schema_has_description_column(self, conn):
assert "description" in _cols(conn, "projects")
def test_project_type_defaults_to_development(self, conn):
"""INSERT без project_type → значение по умолчанию 'development'."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('t1', 'T', '/t')"
)
conn.commit()
row = conn.execute(
"SELECT project_type FROM projects WHERE id='t1'"
).fetchone()
assert row["project_type"] == "development"
def test_ssh_fields_default_to_null(self, conn):
"""SSH-поля по умолчанию NULL."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('t2', 'T', '/t')"
)
conn.commit()
row = conn.execute(
"SELECT ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump FROM projects WHERE id='t2'"
).fetchone()
assert row["ssh_host"] is None
assert row["ssh_user"] is None
assert row["ssh_key_path"] is None
assert row["ssh_proxy_jump"] is None
# ---------------------------------------------------------------------------
# Migration: _migrate добавляет KIN-071 колонки в старую схему (без них)
# ---------------------------------------------------------------------------
def _old_schema_conn() -> sqlite3.Connection:
"""Создаёт соединение с минимальной 'старой' схемой без KIN-071 колонок."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript("""
CREATE TABLE projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
path TEXT NOT NULL,
status TEXT DEFAULT 'active',
language TEXT DEFAULT 'ru',
execution_mode TEXT NOT NULL DEFAULT 'review'
);
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
title TEXT NOT NULL,
status TEXT DEFAULT 'pending',
execution_mode TEXT
);
""")
conn.commit()
return conn
def test_migrate_adds_project_type_to_old_schema():
"""_migrate добавляет project_type в старую схему без этой колонки."""
conn = _old_schema_conn()
_migrate(conn)
assert "project_type" in _cols(conn, "projects")
conn.close()
def test_migrate_adds_ssh_host_to_old_schema():
"""_migrate добавляет ssh_host в старую схему."""
conn = _old_schema_conn()
_migrate(conn)
assert "ssh_host" in _cols(conn, "projects")
conn.close()
def test_migrate_adds_all_ssh_columns_to_old_schema():
"""_migrate добавляет все SSH-колонки разом в старую схему."""
conn = _old_schema_conn()
_migrate(conn)
cols = _cols(conn, "projects")
assert {"ssh_host", "ssh_user", "ssh_key_path", "ssh_proxy_jump", "description"}.issubset(cols)
conn.close()
def test_migrate_is_idempotent():
"""Повторный вызов _migrate не ломает схему."""
conn = init_db(":memory:")
before = _cols(conn, "projects")
_migrate(conn)
after = _cols(conn, "projects")
assert before == after
conn.close()
# ---------------------------------------------------------------------------
# Migration KIN-UI-002: рекреация таблицы на минимальной схеме не падает
# ---------------------------------------------------------------------------
def test_migrate_recreates_table_without_operationalerror():
"""_migrate не бросает OperationalError при рекреации projects на минимальной схеме.
Регрессионный тест KIN-UI-002: INSERT SELECT в блоке KIN-ARCH-003 ранее
падал на отсутствующих колонках (tech_stack, priority, pm_prompt и др.).
"""
conn = _old_schema_conn() # path NOT NULL — триггер рекреации
try:
_migrate(conn)
except Exception as exc:
pytest.fail(f"_migrate raised {type(exc).__name__}: {exc}")
conn.close()
def test_migrate_path_becomes_nullable_on_old_schema():
"""После миграции старой схемы (path NOT NULL) колонка path становится nullable."""
conn = _old_schema_conn()
_migrate(conn)
path_col = next(
r for r in conn.execute("PRAGMA table_info(projects)").fetchall()
if r[1] == "path"
)
assert path_col[3] == 0, "path должна быть nullable после миграции KIN-ARCH-003"
conn.close()
def test_migrate_preserves_existing_rows_on_recreation():
"""Рекреация таблицы сохраняет существующие строки."""
conn = _old_schema_conn()
conn.execute(
"INSERT INTO projects (id, name, path, status) VALUES ('p1', 'MyProj', '/p', 'active')"
)
conn.commit()
_migrate(conn)
row = conn.execute("SELECT id, name, path, status FROM projects WHERE id='p1'").fetchone()
assert row is not None
assert row["name"] == "MyProj"
assert row["path"] == "/p"
assert row["status"] == "active"
conn.close()
def test_migrate_adds_missing_columns_before_recreation():
"""_migrate добавляет tech_stack, priority, pm_prompt, claude_md_path, forgejo_repo, created_at перед рекреацией."""
conn = _old_schema_conn()
_migrate(conn)
cols = _cols(conn, "projects")
required = {"tech_stack", "priority", "pm_prompt", "claude_md_path", "forgejo_repo", "created_at"}
assert required.issubset(cols), f"Отсутствуют колонки: {required - cols}"
conn.close()
def test_migrate_operations_project_with_null_path():
"""После миграции можно вставить operations-проект с path=NULL."""
conn = _old_schema_conn()
_migrate(conn)
conn.execute(
"INSERT INTO projects (id, name, path, project_type) VALUES ('ops1', 'Ops', NULL, 'operations')"
)
conn.commit()
row = conn.execute("SELECT path, project_type FROM projects WHERE id='ops1'").fetchone()
assert row["path"] is None
assert row["project_type"] == "operations"
conn.close()
# ---------------------------------------------------------------------------
# Schema KIN-073: acceptance_criteria в таблице tasks
# ---------------------------------------------------------------------------
class TestTasksAcceptanceCriteriaSchema:
"""Колонка acceptance_criteria присутствует в таблице tasks."""
def test_schema_has_acceptance_criteria_column(self, conn):
assert "acceptance_criteria" in _cols(conn, "tasks")
def test_acceptance_criteria_defaults_to_null(self, conn):
"""Создание задачи без acceptance_criteria — поле NULL (nullable)."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p1', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('t1', 'p1', 'My Task')"
)
conn.commit()
row = conn.execute(
"SELECT acceptance_criteria FROM tasks WHERE id='t1'"
).fetchone()
assert row["acceptance_criteria"] is None
def test_create_task_with_acceptance_criteria_saves_field(self, conn):
"""Создание задачи с acceptance_criteria — значение сохраняется в БД."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p2', 'P', '/p')"
)
criteria = "Поле должно сохраняться. GET возвращает значение."
conn.execute(
"INSERT INTO tasks (id, project_id, title, acceptance_criteria)"
" VALUES ('t2', 'p2', 'Task with criteria', ?)",
(criteria,),
)
conn.commit()
row = conn.execute(
"SELECT acceptance_criteria FROM tasks WHERE id='t2'"
).fetchone()
assert row["acceptance_criteria"] == criteria
def test_get_task_returns_acceptance_criteria(self, conn):
"""SELECT задачи возвращает acceptance_criteria (критерий приёмки 3)."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p3', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title, acceptance_criteria)"
" VALUES ('t3', 'p3', 'T', 'AC value')",
)
conn.commit()
row = conn.execute("SELECT * FROM tasks WHERE id='t3'").fetchone()
assert row["acceptance_criteria"] == "AC value"
# ---------------------------------------------------------------------------
# Migration KIN-073: _migrate добавляет acceptance_criteria в старую схему
# ---------------------------------------------------------------------------
def test_migrate_adds_acceptance_criteria_to_old_schema():
"""_migrate добавляет acceptance_criteria в tasks если колонки нет."""
conn = _old_schema_conn()
_migrate(conn)
assert "acceptance_criteria" in _cols(conn, "tasks")
conn.close()
def test_migrate_acceptance_criteria_is_nullable_after_migration():
"""После миграции acceptance_criteria nullable — старые строки не ломаются."""
conn = _old_schema_conn()
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('pm', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('tm', 'pm', 'Old Task')"
)
conn.commit()
_migrate(conn)
row = conn.execute("SELECT acceptance_criteria FROM tasks WHERE id='tm'").fetchone()
assert row["acceptance_criteria"] is None
conn.close()
# ---------------------------------------------------------------------------
# Regression KIN-FIX-014: guard "if pipelines in existing_tables"
# Финальный UPDATE pipelines в _migrate не должен падать на схемах без этой таблицы.
# ---------------------------------------------------------------------------
def _old_schema_with_pipelines_conn() -> sqlite3.Connection:
"""Минимальная схема С таблицей pipelines (dept_sub + running записи)."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript("""
CREATE TABLE projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
path TEXT,
status TEXT DEFAULT 'active',
language TEXT DEFAULT 'ru',
execution_mode TEXT NOT NULL DEFAULT 'review'
);
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
title TEXT NOT NULL,
status TEXT DEFAULT 'pending',
execution_mode TEXT
);
CREATE TABLE pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
project_id TEXT NOT NULL,
route_type TEXT NOT NULL,
steps TEXT NOT NULL DEFAULT '[]',
status TEXT DEFAULT 'running'
);
""")
conn.commit()
return conn
def test_migrate_without_pipelines_table_does_not_raise():
"""Regression KIN-FIX-014: _migrate на схеме без таблицы pipelines не бросает исключение.
До фикса: безусловный UPDATE pipelines падал с 'no such table: pipelines'
на тест-фикстурах с минимальной схемой (только projects + tasks).
"""
conn = _old_schema_conn()
try:
_migrate(conn)
except Exception as exc:
pytest.fail(
f"_migrate raised {type(exc).__name__} при отсутствии таблицы pipelines: {exc}"
)
conn.close()
def test_migrate_sets_orphaned_dept_sub_running_to_failed():
"""_migrate обновляет dept_sub pipelines со статусом 'running''failed'.
Исправляет призрачные записи от двойного создания пайплайна (KIN-ARCH-012).
"""
conn = _old_schema_with_pipelines_conn()
conn.execute(
"INSERT INTO projects (id, name) VALUES ('p1', 'P')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('t1', 'p1', 'T')"
)
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, status)"
" VALUES ('t1', 'p1', 'dept_sub', 'running')"
)
conn.commit()
_migrate(conn)
row = conn.execute("SELECT status FROM pipelines WHERE route_type = 'dept_sub'").fetchone()
assert row["status"] == "failed", (
"dept_sub pipeline со статусом 'running' должен стать 'failed' после миграции"
)
conn.close()
def test_migrate_preserves_non_dept_sub_and_non_running_pipelines():
"""_migrate не трогает pipelines, которые не являются dept_sub+running."""
conn = _old_schema_with_pipelines_conn()
conn.execute("INSERT INTO projects (id, name) VALUES ('p2', 'P')")
conn.execute("INSERT INTO tasks (id, project_id, title) VALUES ('t2', 'p2', 'T')")
# dept_sub но уже completed — не трогать
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, status)"
" VALUES ('t2', 'p2', 'dept_sub', 'completed')"
)
# обычный running pipeline другого типа — не трогать
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, status)"
" VALUES ('t2', 'p2', 'direct', 'running')"
)
conn.commit()
_migrate(conn)
rows = {
r["route_type"]: r["status"]
for r in conn.execute("SELECT route_type, status FROM pipelines").fetchall()
}
assert rows["dept_sub"] == "completed", "completed pipeline не должен меняться"
assert rows["direct"] == "running", "non-dept_sub running pipeline не должен меняться"
conn.close()
# ---------------------------------------------------------------------------
# Schema KIN-OBS-014: pid INTEGER присутствует в SCHEMA-строке CREATE TABLE pipelines
# ---------------------------------------------------------------------------
def test_schema_string_contains_pid_integer_in_pipelines_ddl():
"""Регрессия KIN-OBS-014: SCHEMA-строка содержит 'pid INTEGER' в теле CREATE TABLE pipelines.
До фикса поле было только в _migrate(), но отсутствовало в SCHEMA —
при свежей инициализации БД колонка не создавалась.
Тест парсит блок CREATE TABLE pipelines напрямую из SCHEMA.
"""
from core.db import SCHEMA
start = SCHEMA.index("CREATE TABLE IF NOT EXISTS pipelines")
end = SCHEMA.index(");", start) + 2
pipelines_ddl = SCHEMA[start:end]
assert "pid INTEGER" in pipelines_ddl, (
"pid INTEGER не найден в SCHEMA CREATE TABLE pipelines — "
"поле должно быть в SCHEMA (core/db.py строки ~133-148), а не только в _migrate()"
)
# ---------------------------------------------------------------------------
# Schema KIN-OBS-018: pid INTEGER присутствует в таблице pipelines
# ---------------------------------------------------------------------------
class TestPipelinesSchemaKinObs018:
"""PRAGMA table_info(pipelines) должен содержать kolонку pid INTEGER."""
def test_schema_has_pid_column(self, conn):
"""Свежая инициализация: pipelines содержит колонку pid."""
assert "pid" in _cols(conn, "pipelines")
def test_pid_defaults_to_null(self, conn):
"""Вставка pipeline без pid — значение NULL."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p_pid', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('t_pid', 'p_pid', 'T')"
)
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, steps)"
" VALUES ('t_pid', 'p_pid', 'direct', '[]')"
)
conn.commit()
row = conn.execute(
"SELECT pid FROM pipelines WHERE task_id='t_pid'"
).fetchone()
assert row["pid"] is None
def test_pid_can_store_integer_value(self, conn):
"""Вставка pipeline с pid — значение сохраняется в БД."""
conn.execute(
"INSERT INTO projects (id, name, path) VALUES ('p_pid2', 'P', '/p')"
)
conn.execute(
"INSERT INTO tasks (id, project_id, title) VALUES ('t_pid2', 'p_pid2', 'T')"
)
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, steps, pid)"
" VALUES ('t_pid2', 'p_pid2', 'direct', '[]', 12345)"
)
conn.commit()
row = conn.execute(
"SELECT pid FROM pipelines WHERE task_id='t_pid2'"
).fetchone()
assert row["pid"] == 12345
# ---------------------------------------------------------------------------
# Migration KIN-OBS-018: _migrate добавляет pid в существующие БД без этой колонки
# Конвенция #384: три теста для _migrate guard
# ---------------------------------------------------------------------------
def test_migrate_adds_pid_to_pipelines_without_column():
"""_migrate добавляет pid в таблицу pipelines если колонки нет."""
conn = _old_schema_with_pipelines_conn() # схема без pid
_migrate(conn)
assert "pid" in _cols(conn, "pipelines")
conn.close()
def test_migrate_pid_is_nullable_after_migration():
"""После миграции pid nullable — существующие строки не ломаются (pid=NULL)."""
conn = _old_schema_with_pipelines_conn()
conn.execute("INSERT INTO projects (id, name) VALUES ('pm_pid', 'P')")
conn.execute("INSERT INTO tasks (id, project_id, title) VALUES ('tm_pid', 'pm_pid', 'T')")
conn.execute(
"INSERT INTO pipelines (task_id, project_id, route_type, status)"
" VALUES ('tm_pid', 'pm_pid', 'direct', 'completed')"
)
conn.commit()
_migrate(conn)
row = conn.execute("SELECT pid FROM pipelines WHERE task_id='tm_pid'").fetchone()
assert row["pid"] is None
conn.close()
def test_migrate_pid_guard_is_idempotent():
"""Повторный вызов _migrate не ломает схему pipelines (guard idempotent)."""
conn = _old_schema_with_pipelines_conn()
_migrate(conn)
before = _cols(conn, "pipelines")
_migrate(conn)
after = _cols(conn, "pipelines")
assert before == after
assert "pid" in after
conn.close()
# ---------------------------------------------------------------------------
# Schema KIN-084: pipeline_log в SCHEMA-строке и _migrate()
# Конвенция #387: DDL-парсинг SCHEMA-строки
# ---------------------------------------------------------------------------
def test_schema_string_contains_pipeline_log_table():
"""Регрессия KIN-084: SCHEMA-строка содержит CREATE TABLE pipeline_log."""
from core.db import SCHEMA
assert "CREATE TABLE IF NOT EXISTS pipeline_log" in SCHEMA
def test_schema_pipeline_log_has_required_columns():
"""SCHEMA pipeline_log содержит все обязательные колонки."""
from core.db import SCHEMA
start = SCHEMA.index("CREATE TABLE IF NOT EXISTS pipeline_log")
end = SCHEMA.index(");", start) + 2
ddl = SCHEMA[start:end]
assert "pipeline_id INTEGER" in ddl
assert "ts " in ddl
assert "level " in ddl
assert "message " in ddl
assert "extra_json " in ddl
def test_migrate_creates_pipeline_log_on_fresh_init(conn):
"""Свежая инициализация: pipeline_log существует."""
tables = {r["name"] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
assert "pipeline_log" in tables
def test_migrate_creates_pipeline_log_if_missing():
"""_migrate создаёт pipeline_log если таблица отсутствует в старой БД."""
conn = _old_schema_conn()
_migrate(conn)
tables = {r["name"] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
assert "pipeline_log" in tables
conn.close()
# ---------------------------------------------------------------------------
# KIN-129: DB_PATH резолвинг — KIN_DB_PATH env var и fallback на ~/.kin/kin.db
# ---------------------------------------------------------------------------
class TestDbPathResolution:
"""DB_PATH должен брать путь из KIN_DB_PATH или падать на ~/.kin/kin.db (KIN-129)."""
def test_db_path_uses_kin_db_path_when_set(self):
"""Если KIN_DB_PATH задан — DB_PATH равен именно ему."""
import core.db as db_module
custom = "/tmp/custom_kin_test_129.db"
with patch.dict("os.environ", {"KIN_DB_PATH": custom}):
importlib.reload(db_module)
assert str(db_module.DB_PATH) == custom
importlib.reload(db_module) # восстановить модуль после теста
def test_db_path_fallback_to_home_kin_when_env_absent(self):
"""Если KIN_DB_PATH не задан — DB_PATH = ~/.kin/kin.db."""
import core.db as db_module
env_without = {k: v for k, v in os.environ.items() if k != "KIN_DB_PATH"}
with patch.dict("os.environ", env_without, clear=True):
importlib.reload(db_module)
expected = Path.home() / ".kin" / "kin.db"
assert db_module.DB_PATH == expected
importlib.reload(db_module) # восстановить модуль после теста
def test_db_path_not_relative_to_source_file(self):
"""DB_PATH не должен быть относительным путём вида parent.parent/kin.db (регрессия KIN-129)."""
import core.db as db_module
env_without = {k: v for k, v in os.environ.items() if k != "KIN_DB_PATH"}
with patch.dict("os.environ", env_without, clear=True):
importlib.reload(db_module)
# Путь должен быть абсолютным и вести в ~/.kin/, а не в директорию проекта
assert db_module.DB_PATH.is_absolute()
assert ".kin" in db_module.DB_PATH.parts
importlib.reload(db_module)