kin/tests/test_models.py

1012 lines
43 KiB
Python
Raw Normal View History

"""Tests for core/models.py — all functions, in-memory SQLite."""
import re
import pytest
from core.db import init_db
from core import models
from core.models import TASK_CATEGORIES
@pytest.fixture
def conn():
"""Fresh in-memory DB for each test."""
c = init_db(db_path=":memory:")
yield c
c.close()
# -- Projects --
def test_create_and_get_project(conn):
p = models.create_project(conn, "vdol", "В долю поперёк", "~/projects/vdolipoperek",
tech_stack=["vue3", "nuxt"])
assert p["id"] == "vdol"
assert p["tech_stack"] == ["vue3", "nuxt"]
assert p["status"] == "active"
fetched = models.get_project(conn, "vdol")
assert fetched["name"] == "В долю поперёк"
def test_get_project_not_found(conn):
assert models.get_project(conn, "nope") is None
def test_list_projects_filter(conn):
models.create_project(conn, "a", "A", "/a", status="active")
models.create_project(conn, "b", "B", "/b", status="paused")
models.create_project(conn, "c", "C", "/c", status="active")
assert len(models.list_projects(conn)) == 3
assert len(models.list_projects(conn, status="active")) == 2
assert len(models.list_projects(conn, status="paused")) == 1
def test_update_project(conn):
models.create_project(conn, "x", "X", "/x", priority=5)
updated = models.update_project(conn, "x", priority=1, status="maintenance")
assert updated["priority"] == 1
assert updated["status"] == "maintenance"
def test_update_project_tech_stack_json(conn):
models.create_project(conn, "x", "X", "/x", tech_stack=["python"])
updated = models.update_project(conn, "x", tech_stack=["python", "fastapi"])
assert updated["tech_stack"] == ["python", "fastapi"]
# -- project_type and SSH fields (KIN-071) --
def test_create_operations_project(conn):
"""KIN-071: operations project stores SSH fields. KIN-ARCH-005: path не передаётся."""
p = models.create_project(
conn, "srv1", "My Server",
project_type="operations",
ssh_host="10.0.0.1",
ssh_user="root",
ssh_key_path="~/.ssh/id_rsa",
ssh_proxy_jump="jumpt",
)
assert p["project_type"] == "operations"
assert p["ssh_host"] == "10.0.0.1"
assert p["ssh_user"] == "root"
assert p["ssh_key_path"] == "~/.ssh/id_rsa"
assert p["ssh_proxy_jump"] == "jumpt"
assert p["path"] is None
def test_create_development_project_defaults(conn):
"""KIN-071: development is default project_type."""
p = models.create_project(conn, "devp", "Dev Project", "/path")
assert p["project_type"] == "development"
assert p["ssh_host"] is None
def test_update_project_ssh_fields(conn):
"""KIN-071: update_project can set SSH fields."""
models.create_project(conn, "srv2", "Server 2", project_type="operations")
updated = models.update_project(conn, "srv2", ssh_host="192.168.1.1", ssh_user="pelmen")
assert updated["ssh_host"] == "192.168.1.1"
assert updated["ssh_user"] == "pelmen"
assert updated["path"] is None
# ---------------------------------------------------------------------------
# KIN-ARCH-003 — path nullable для operations-проектов
# Исправляет баг: workaround с пустой строкой ("") для operations-проектов
# ---------------------------------------------------------------------------
def test_kin_arch_003_operations_project_without_path_stores_null(conn):
"""KIN-ARCH-003: operations-проект без path сохраняется с path=NULL, не пустой строкой.
До фикса: workaround передавать path='' чтобы обойти NOT NULL constraint.
После фикса: path=None (NULL в БД) допустим для operations-проектов.
"""
p = models.create_project(
conn, "ops_null", "Ops Null Path",
project_type="operations",
ssh_host="10.0.0.1",
)
assert p["path"] is None, (
"KIN-ARCH-003 регрессия: path должен быть NULL, а не пустой строкой"
)
def test_kin_arch_003_check_constraint_rejects_null_path_for_development(conn):
"""KIN-ARCH-003: CHECK constraint (path IS NOT NULL OR project_type='operations')
отклоняет path=NULL для development-проектов."""
import sqlite3 as _sqlite3
with pytest.raises(_sqlite3.IntegrityError):
models.create_project(
conn, "dev_no_path", "Dev No Path",
path=None, project_type="development",
)
# -- validate_completion_mode (KIN-063) --
def test_validate_completion_mode_valid_auto_complete():
"""validate_completion_mode принимает 'auto_complete'."""
assert models.validate_completion_mode("auto_complete") == "auto_complete"
def test_validate_completion_mode_valid_review():
"""validate_completion_mode принимает 'review'."""
assert models.validate_completion_mode("review") == "review"
def test_validate_completion_mode_invalid_fallback():
"""validate_completion_mode возвращает 'review' для невалидных значений (фоллбэк)."""
assert models.validate_completion_mode("auto") == "review"
assert models.validate_completion_mode("") == "review"
assert models.validate_completion_mode("unknown") == "review"
# -- get_effective_mode (KIN-063) --
def test_get_effective_mode_task_overrides_project(conn):
"""Task execution_mode имеет приоритет над project execution_mode."""
models.create_project(conn, "p1", "P1", "/p1", execution_mode="review")
models.create_task(conn, "P1-001", "p1", "Task", execution_mode="auto_complete")
mode = models.get_effective_mode(conn, "p1", "P1-001")
assert mode == "auto_complete"
def test_get_effective_mode_falls_back_to_project(conn):
"""Если задача без execution_mode — применяется project execution_mode."""
models.create_project(conn, "p1", "P1", "/p1", execution_mode="auto_complete")
models.create_task(conn, "P1-001", "p1", "Task") # execution_mode=None
mode = models.get_effective_mode(conn, "p1", "P1-001")
assert mode == "auto_complete"
def test_get_effective_mode_project_review_overrides_default(conn):
"""Project execution_mode='review' + task без override → возвращает 'review'.
Сценарий: PM хотел auto_complete, но проект настроен на review человеком.
get_effective_mode должен вернуть project-level 'review'.
"""
models.create_project(conn, "p1", "P1", "/p1", execution_mode="review")
models.create_task(conn, "P1-001", "p1", "Task") # нет task-level override
mode = models.get_effective_mode(conn, "p1", "P1-001")
assert mode == "review"
# -- Tasks --
def test_create_and_get_task(conn):
models.create_project(conn, "p1", "P1", "/p1")
t = models.create_task(conn, "P1-001", "p1", "Fix bug",
brief={"summary": "broken login"})
assert t["id"] == "P1-001"
assert t["brief"] == {"summary": "broken login"}
assert t["status"] == "pending"
def test_list_tasks_filters(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.create_project(conn, "p2", "P2", "/p2")
models.create_task(conn, "P1-001", "p1", "Task A", status="pending")
models.create_task(conn, "P1-002", "p1", "Task B", status="done")
models.create_task(conn, "P2-001", "p2", "Task C", status="pending")
assert len(models.list_tasks(conn)) == 3
assert len(models.list_tasks(conn, project_id="p1")) == 2
assert len(models.list_tasks(conn, status="pending")) == 2
assert len(models.list_tasks(conn, project_id="p1", status="done")) == 1
def test_update_task(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Task")
updated = models.update_task(conn, "P1-001", status="in_progress",
spec={"steps": [1, 2, 3]})
assert updated["status"] == "in_progress"
assert updated["spec"] == {"steps": [1, 2, 3]}
assert updated["updated_at"] is not None
def test_subtask(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Parent")
child = models.create_task(conn, "P1-001a", "p1", "Child",
parent_task_id="P1-001")
assert child["parent_task_id"] == "P1-001"
# -- Decisions --
def test_add_and_get_decisions(conn):
models.create_project(conn, "p1", "P1", "/p1")
d = models.add_decision(conn, "p1", "gotcha", "iOS Safari bottom sheet",
"position:fixed breaks on iOS Safari",
category="ui", tags=["ios-safari", "css"])
assert d["type"] == "gotcha"
assert d["tags"] == ["ios-safari", "css"]
results = models.get_decisions(conn, "p1")
assert len(results) == 1
def test_decisions_filter_by_category(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "decision", "Use WAL", "perf",
category="architecture")
models.add_decision(conn, "p1", "gotcha", "Safari bug", "css",
category="ui")
assert len(models.get_decisions(conn, "p1", category="ui")) == 1
def test_decisions_filter_by_tags(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "Bug A", "desc",
tags=["safari", "css"])
models.add_decision(conn, "p1", "gotcha", "Bug B", "desc",
tags=["chrome", "js"])
models.add_decision(conn, "p1", "gotcha", "Bug C", "desc",
tags=["safari", "js"])
assert len(models.get_decisions(conn, "p1", tags=["safari"])) == 2
assert len(models.get_decisions(conn, "p1", tags=["js"])) == 2
assert len(models.get_decisions(conn, "p1", tags=["css"])) == 1
def test_decisions_filter_by_types(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "decision", "A", "a")
models.add_decision(conn, "p1", "gotcha", "B", "b")
models.add_decision(conn, "p1", "workaround", "C", "c")
assert len(models.get_decisions(conn, "p1", types=["gotcha", "workaround"])) == 2
def test_decisions_limit(conn):
models.create_project(conn, "p1", "P1", "/p1")
for i in range(10):
models.add_decision(conn, "p1", "decision", f"D{i}", f"desc{i}")
assert len(models.get_decisions(conn, "p1", limit=3)) == 3
# -- Modules --
def test_add_and_get_modules(conn):
models.create_project(conn, "p1", "P1", "/p1")
m = models.add_module(conn, "p1", "search", "frontend", "src/search/",
description="Search UI", dependencies=["auth"])
assert m["name"] == "search"
assert m["dependencies"] == ["auth"]
mods = models.get_modules(conn, "p1")
assert len(mods) == 1
def test_add_module_created_true_for_new_module(conn):
"""KIN-081: add_module возвращает _created=True для нового модуля (INSERT)."""
models.create_project(conn, "p1", "P1", "/p1")
m = models.add_module(conn, "p1", "api", "backend", "src/api/")
assert m["_created"] is True
assert m["name"] == "api"
def test_add_module_created_false_for_duplicate_name(conn):
"""KIN-081: add_module возвращает _created=False при дублировании по имени (INSERT OR IGNORE).
UNIQUE constraint (project_id, name). Второй INSERT с тем же name игнорируется,
возвращается существующая запись с _created=False.
"""
models.create_project(conn, "p1", "P1", "/p1")
m1 = models.add_module(conn, "p1", "api", "backend", "src/api/")
assert m1["_created"] is True
# Same name, different path — should be ignored
m2 = models.add_module(conn, "p1", "api", "frontend", "src/api-v2/")
assert m2["_created"] is False
assert m2["name"] == "api"
# Only one module in DB
assert len(models.get_modules(conn, "p1")) == 1
def test_add_module_duplicate_returns_original_row(conn):
"""KIN-081: при дублировании add_module возвращает оригинальную запись (не новые данные)."""
models.create_project(conn, "p1", "P1", "/p1")
m1 = models.add_module(conn, "p1", "api", "backend", "src/api/",
description="original desc")
m2 = models.add_module(conn, "p1", "api", "frontend", "src/api-v2/",
description="new desc")
# Should return original row, not updated one
assert m2["type"] == "backend"
assert m2["description"] == "original desc"
assert m2["id"] == m1["id"]
def test_add_module_same_name_different_projects_are_independent(conn):
"""KIN-081: два проекта могут иметь одноимённые модули — UNIQUE per project_id."""
models.create_project(conn, "p1", "P1", "/p1")
models.create_project(conn, "p2", "P2", "/p2")
m1 = models.add_module(conn, "p1", "api", "backend", "src/api/")
m2 = models.add_module(conn, "p2", "api", "backend", "src/api/")
assert m1["_created"] is True
assert m2["_created"] is True
assert m1["id"] != m2["id"]
# -- delete_project --
def test_delete_project_removes_project_record(conn):
"""KIN-081: delete_project удаляет запись из таблицы projects."""
models.create_project(conn, "p1", "P1", "/p1")
assert models.get_project(conn, "p1") is not None
models.delete_project(conn, "p1")
assert models.get_project(conn, "p1") is None
def test_delete_project_cascades_to_related_tables(conn):
"""KIN-081: delete_project удаляет связанные modules, decisions, tasks, agent_logs."""
models.create_project(conn, "p1", "P1", "/p1")
models.add_module(conn, "p1", "api", "backend", "src/api/")
models.add_decision(conn, "p1", "gotcha", "Bug X", "desc")
models.create_task(conn, "P1-001", "p1", "Task")
models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-001")
models.delete_project(conn, "p1")
assert conn.execute("SELECT COUNT(*) FROM modules WHERE project_id='p1'").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM decisions WHERE project_id='p1'").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM tasks WHERE project_id='p1'").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM agent_logs WHERE project_id='p1'").fetchone()[0] == 0
def test_delete_project_nonexistent_does_not_raise(conn):
"""KIN-081: delete_project на несуществующий проект не бросает исключение."""
models.delete_project(conn, "nonexistent")
2026-03-17 22:41:47 +02:00
def test_delete_project_with_pipeline_and_handoffs(conn):
"""FK bug fix: delete_project не падает при наличии department_handoffs и pipeline_log."""
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Task", status="in_progress")
pipeline = models.create_pipeline(conn, "P1-001", "p1", "hotfix", [{"role": "backend_dev"}])
models.create_handoff(conn, pipeline["id"], "P1-001", "engineering")
models.write_log(conn, pipeline["id"], "test log message")
models.log_audit_event(conn, "dangerous_skip", task_id="P1-001", project_id="p1")
# Must not raise OperationalError: FOREIGN KEY constraint failed
models.delete_project(conn, "p1")
assert conn.execute("SELECT COUNT(*) FROM department_handoffs").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM pipeline_log").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM audit_log WHERE project_id='p1'").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM pipelines WHERE project_id='p1'").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM tasks WHERE project_id='p1'").fetchone()[0] == 0
def test_delete_project_cleans_hooks_and_project_links(conn):
"""FK fix: delete_project удаляет hooks, hook_logs и project_links."""
models.create_project(conn, "p1", "P1", "/p1")
models.create_project(conn, "p2", "P2", "/p2")
# Create hook and hook_log for p1
conn.execute(
"INSERT INTO hooks (project_id, name, event, command) VALUES ('p1', 'h', 'pipeline_completed', 'echo ok')"
)
hook_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.execute(
"INSERT INTO hook_logs (hook_id, project_id, success) VALUES (?, 'p1', 1)", (hook_id,)
)
conn.commit()
models.create_project_link(conn, "p1", "p2", "depends_on")
models.delete_project(conn, "p1")
assert conn.execute("SELECT COUNT(*) FROM hooks WHERE project_id='p1'").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM hook_logs WHERE project_id='p1'").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM project_links WHERE from_project='p1' OR to_project='p1'").fetchone()[0] == 0
# -- Agent Logs --
def test_log_agent_run(conn):
models.create_project(conn, "p1", "P1", "/p1")
log = models.log_agent_run(conn, "p1", "developer", "implement",
tokens_used=5000, model="sonnet",
cost_usd=0.015, duration_seconds=45)
assert log["agent_role"] == "developer"
assert log["cost_usd"] == 0.015
assert log["success"] == 1 # SQLite boolean
2026-03-17 18:23:04 +02:00
def test_count_agent_logs_since_returns_correct_count(conn):
"""count_agent_logs_since возвращает количество логов >= since_iso."""
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Task")
models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-001")
models.log_agent_run(conn, "p1", "reviewer", "review", task_id="P1-001")
count = models.count_agent_logs_since(conn, "P1-001", "2000-01-01T00:00:00")
assert count == 2
def test_count_agent_logs_since_filters_by_task_id(conn):
"""count_agent_logs_since не считает логи других задач."""
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Task A")
models.create_task(conn, "P1-002", "p1", "Task B")
models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-001")
models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-002")
assert models.count_agent_logs_since(conn, "P1-001", "2000-01-01T00:00:00") == 1
assert models.count_agent_logs_since(conn, "P1-002", "2000-01-01T00:00:00") == 1
def test_count_agent_logs_since_excludes_before_cutoff(conn):
"""count_agent_logs_since не считает логи строго до since_iso."""
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Task")
models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-001")
# since_iso в далёком будущем — ни один лог не попадает
count = models.count_agent_logs_since(conn, "P1-001", "2099-01-01T00:00:00")
assert count == 0
def test_count_agent_logs_since_empty_returns_zero(conn):
"""count_agent_logs_since возвращает 0 при отсутствии логов."""
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Task")
count = models.count_agent_logs_since(conn, "P1-001", "2000-01-01T00:00:00")
assert count == 0
# -- Pipelines --
def test_create_and_update_pipeline(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Task")
pipe = models.create_pipeline(conn, "P1-001", "p1", "feature",
[{"step": "architect"}, {"step": "dev"}])
assert pipe["status"] == "running"
assert pipe["steps"] == [{"step": "architect"}, {"step": "dev"}]
updated = models.update_pipeline(conn, pipe["id"], status="completed",
total_cost_usd=0.05, total_tokens=10000)
assert updated["status"] == "completed"
assert updated["completed_at"] is not None
# -- Support --
def test_create_and_list_tickets(conn):
models.create_project(conn, "p1", "P1", "/p1")
t = models.create_ticket(conn, "p1", "telegram_bot", "Не работает поиск",
client_id="tg:12345", classification="bug")
assert t["source"] == "telegram_bot"
assert t["status"] == "new"
tickets = models.list_tickets(conn, project_id="p1")
assert len(tickets) == 1
assert len(models.list_tickets(conn, status="resolved")) == 0
# -- Statistics --
def test_project_summary(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "A", status="done")
models.create_task(conn, "P1-002", "p1", "B", status="in_progress")
models.create_task(conn, "P1-003", "p1", "C", status="blocked")
summary = models.get_project_summary(conn)
assert len(summary) == 1
s = summary[0]
assert s["total_tasks"] == 3
assert s["done_tasks"] == 1
assert s["active_tasks"] == 1
assert s["blocked_tasks"] == 1
def test_cost_summary(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.log_agent_run(conn, "p1", "dev", "implement",
cost_usd=0.10, tokens_used=5000)
models.log_agent_run(conn, "p1", "reviewer", "review",
cost_usd=0.05, tokens_used=2000)
costs = models.get_cost_summary(conn, days=1)
assert len(costs) == 1
assert costs[0]["total_cost_usd"] == pytest.approx(0.15)
assert costs[0]["total_tokens"] == 7000
assert costs[0]["runs"] == 2
def test_cost_summary_empty(conn):
models.create_project(conn, "p1", "P1", "/p1")
assert models.get_cost_summary(conn, days=7) == []
# -- add_decision_if_new --
def test_add_decision_if_new_adds_new_decision(conn):
models.create_project(conn, "p1", "P1", "/p1")
d = models.add_decision_if_new(conn, "p1", "gotcha", "Use WAL mode", "description")
assert d is not None
assert d["title"] == "Use WAL mode"
assert d["type"] == "gotcha"
def test_add_decision_if_new_skips_exact_duplicate(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "Use WAL mode", "desc1")
result = models.add_decision_if_new(conn, "p1", "gotcha", "Use WAL mode", "desc2")
assert result is None
# Existing decision not duplicated
assert len(models.get_decisions(conn, "p1")) == 1
def test_add_decision_if_new_skips_case_insensitive_duplicate(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "decision", "Use UUID for task IDs", "desc")
result = models.add_decision_if_new(conn, "p1", "decision", "use uuid for task ids", "other desc")
assert result is None
assert len(models.get_decisions(conn, "p1")) == 1
def test_add_decision_if_new_allows_same_title_different_type(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "gotcha", "SQLite WAL", "desc")
result = models.add_decision_if_new(conn, "p1", "convention", "SQLite WAL", "other desc")
assert result is not None
assert len(models.get_decisions(conn, "p1")) == 2
def test_add_decision_if_new_skips_whitespace_duplicate(conn):
models.create_project(conn, "p1", "P1", "/p1")
models.add_decision(conn, "p1", "convention", "Run tests after each change", "desc")
result = models.add_decision_if_new(conn, "p1", "convention", " Run tests after each change ", "desc2")
assert result is None
assert len(models.get_decisions(conn, "p1")) == 1
# -- next_task_id (KIN-OBS-009) --
def test_next_task_id_with_category_first(conn):
"""Первая задача с category='SEC''VDOL-SEC-001'."""
models.create_project(conn, "vdol", "VDOL", "/vdol")
task_id = models.next_task_id(conn, "vdol", category="SEC")
assert task_id == "VDOL-SEC-001"
def test_next_task_id_with_category_increments(conn):
"""Вторая задача с category='SEC''VDOL-SEC-002'."""
models.create_project(conn, "vdol", "VDOL", "/vdol")
models.create_task(conn, "VDOL-SEC-001", "vdol", "Task 1", category="SEC")
task_id = models.next_task_id(conn, "vdol", category="SEC")
assert task_id == "VDOL-SEC-002"
def test_next_task_id_category_counters_independent(conn):
"""Счётчики категорий независимы: SEC-002 не влияет на UI-001."""
models.create_project(conn, "vdol", "VDOL", "/vdol")
models.create_task(conn, "VDOL-SEC-001", "vdol", "Sec Task 1", category="SEC")
models.create_task(conn, "VDOL-SEC-002", "vdol", "Sec Task 2", category="SEC")
task_id = models.next_task_id(conn, "vdol", category="UI")
assert task_id == "VDOL-UI-001"
def test_next_task_id_without_category_backward_compat(conn):
"""Задача без category → 'VDOL-001' (backward compat)."""
models.create_project(conn, "vdol", "VDOL", "/vdol")
task_id = models.next_task_id(conn, "vdol")
assert task_id == "VDOL-001"
def test_next_task_id_mixed_formats_no_collision(conn):
"""Смешанный проект: счётчики старого и нового форматов не пересекаются."""
models.create_project(conn, "kin", "KIN", "/kin")
models.create_task(conn, "KIN-001", "kin", "Old style task")
models.create_task(conn, "KIN-002", "kin", "Old style task 2")
# Новый формат с категорией не мешает старому
cat_id = models.next_task_id(conn, "kin", category="OBS")
assert cat_id == "KIN-OBS-001"
# Старый формат не мешает новому
old_id = models.next_task_id(conn, "kin")
assert old_id == "KIN-003"
# -- Obsidian sync regex (KIN-OBS-009, решение #75) --
_OBSIDIAN_TASK_PATTERN = re.compile(
r"^[-*]\s+\[([xX ])\]\s+([A-Z][A-Z0-9]*-(?:[A-Z][A-Z0-9]*-)?\d+)\s+(.+)$"
)
def test_obsidian_regex_matches_old_format():
"""Старый формат KIN-001 матчится."""
m = _OBSIDIAN_TASK_PATTERN.match("- [x] KIN-001 Fix login bug")
assert m is not None
assert m.group(2) == "KIN-001"
def test_obsidian_regex_matches_new_format():
"""Новый формат VDOL-SEC-001 матчится."""
m = _OBSIDIAN_TASK_PATTERN.match("- [ ] VDOL-SEC-001 Security audit")
assert m is not None
assert m.group(2) == "VDOL-SEC-001"
def test_obsidian_regex_matches_obs_format():
"""Формат KIN-OBS-009 матчится (проверяем задачу этой фичи)."""
m = _OBSIDIAN_TASK_PATTERN.match("* [X] KIN-OBS-009 Task ID по категориям")
assert m is not None
assert m.group(2) == "KIN-OBS-009"
def test_obsidian_regex_no_match_lowercase():
"""Нижний регистр не матчится."""
assert _OBSIDIAN_TASK_PATTERN.match("- [x] proj-001 lowercase id") is None
def test_obsidian_regex_no_match_numeric_prefix():
"""Числовой префикс не матчится."""
assert _OBSIDIAN_TASK_PATTERN.match("- [x] 123-abc invalid format") is None
def test_obsidian_regex_done_state(conn):
"""Статус done/pending корректно извлекается."""
m_done = _OBSIDIAN_TASK_PATTERN.match("- [x] KIN-UI-003 Done task")
m_pending = _OBSIDIAN_TASK_PATTERN.match("- [ ] KIN-UI-004 Pending task")
assert m_done.group(1) == "x"
assert m_pending.group(1) == " "
# -- next_task_id для всех 12 категорий (KIN-OBS-009) --
@pytest.mark.parametrize("cat", TASK_CATEGORIES)
def test_next_task_id_all_categories_generate_correct_format(conn, cat):
"""next_task_id генерирует ID формата PROJ-CAT-001 для каждой из 12 категорий."""
models.create_project(conn, "vdol", "VDOL", "/vdol")
task_id = models.next_task_id(conn, "vdol", category=cat)
assert task_id == f"VDOL-{cat}-001"
# -- update_task category не ломает brief (KIN-OBS-009, решение #74) --
def test_update_task_category_preserves_brief(conn):
"""update_task(category=...) не перетирает существующее поле brief."""
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Task", brief={"summary": "important context"})
updated = models.update_task(conn, "P1-001", category="SEC")
assert updated["category"] == "SEC"
assert updated["brief"] == {"summary": "important context"}
def test_update_task_category_preserves_status_and_priority(conn):
"""update_task(category=...) не меняет остальные поля задачи."""
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-001", "p1", "Task", status="in_progress", priority=3)
updated = models.update_task(conn, "P1-001", category="UI")
assert updated["category"] == "UI"
assert updated["status"] == "in_progress"
assert updated["priority"] == 3
# -- KIN-ARCH-006: autocommit_enabled и obsidian_vault_path в SCHEMA --
def test_schema_project_has_autocommit_enabled_column(conn):
"""KIN-ARCH-006: таблица projects содержит колонку autocommit_enabled."""
cols = {r[1] for r in conn.execute("PRAGMA table_info(projects)").fetchall()}
assert "autocommit_enabled" in cols
def test_schema_project_has_obsidian_vault_path_column(conn):
"""KIN-ARCH-006: таблица projects содержит колонку obsidian_vault_path."""
cols = {r[1] for r in conn.execute("PRAGMA table_info(projects)").fetchall()}
assert "obsidian_vault_path" in cols
def test_autocommit_enabled_default_is_zero(conn):
"""KIN-ARCH-006: autocommit_enabled по умолчанию равен 0."""
models.create_project(conn, "p1", "P1", "/p1")
p = models.get_project(conn, "p1")
assert p["autocommit_enabled"] == 0
def test_obsidian_vault_path_default_is_none(conn):
"""KIN-ARCH-006: obsidian_vault_path по умолчанию равен NULL."""
models.create_project(conn, "p1", "P1", "/p1")
p = models.get_project(conn, "p1")
assert p["obsidian_vault_path"] is None
def test_autocommit_enabled_can_be_set_to_one(conn):
"""KIN-ARCH-006: autocommit_enabled можно установить в 1 через update_project."""
models.create_project(conn, "p1", "P1", "/p1")
updated = models.update_project(conn, "p1", autocommit_enabled=1)
assert updated["autocommit_enabled"] == 1
def test_obsidian_vault_path_can_be_set(conn):
"""KIN-ARCH-006: obsidian_vault_path можно установить через update_project."""
models.create_project(conn, "p1", "P1", "/p1")
updated = models.update_project(conn, "p1", obsidian_vault_path="/vault/my-notes")
assert updated["obsidian_vault_path"] == "/vault/my-notes"
# ---------------------------------------------------------------------------
# KIN-090: Task Attachments
# ---------------------------------------------------------------------------
@pytest.fixture
def task_conn(conn):
"""conn with seeded project and task for attachment tests."""
models.create_project(conn, "prj", "Project", "/tmp/prj")
models.create_task(conn, "PRJ-001", "prj", "Fix bug")
return conn
def test_create_attachment_returns_dict(task_conn):
"""KIN-090: create_attachment возвращает dict со всеми полями."""
att = models.create_attachment(
task_conn, "PRJ-001", "screenshot.png",
"/tmp/prj/.kin/attachments/PRJ-001/screenshot.png",
"image/png", 1024,
)
assert att["id"] is not None
assert att["task_id"] == "PRJ-001"
assert att["filename"] == "screenshot.png"
assert att["path"] == "/tmp/prj/.kin/attachments/PRJ-001/screenshot.png"
assert att["mime_type"] == "image/png"
assert att["size"] == 1024
assert att["created_at"] is not None
def test_create_attachment_persists_in_sqlite(task_conn):
"""KIN-090: AC4 — данные вложения персистируются в SQLite."""
att = models.create_attachment(
task_conn, "PRJ-001", "bug.png",
"/tmp/prj/.kin/attachments/PRJ-001/bug.png",
"image/png", 512,
)
fetched = models.get_attachment(task_conn, att["id"])
assert fetched is not None
assert fetched["filename"] == "bug.png"
assert fetched["size"] == 512
def test_list_attachments_empty_for_new_task(task_conn):
"""KIN-090: list_attachments возвращает [] для задачи без вложений."""
result = models.list_attachments(task_conn, "PRJ-001")
assert result == []
def test_list_attachments_returns_all_for_task(task_conn):
"""KIN-090: list_attachments возвращает все вложения задачи."""
models.create_attachment(task_conn, "PRJ-001", "a.png",
"/tmp/prj/.kin/attachments/PRJ-001/a.png", "image/png", 100)
models.create_attachment(task_conn, "PRJ-001", "b.jpg",
"/tmp/prj/.kin/attachments/PRJ-001/b.jpg", "image/jpeg", 200)
result = models.list_attachments(task_conn, "PRJ-001")
assert len(result) == 2
filenames = {a["filename"] for a in result}
assert filenames == {"a.png", "b.jpg"}
def test_list_attachments_isolated_by_task(task_conn):
"""KIN-090: list_attachments не возвращает вложения других задач."""
models.create_task(task_conn, "PRJ-002", "prj", "Other task")
models.create_attachment(task_conn, "PRJ-001", "a.png",
"/tmp/.kin/PRJ-001/a.png", "image/png", 100)
models.create_attachment(task_conn, "PRJ-002", "b.png",
"/tmp/.kin/PRJ-002/b.png", "image/png", 100)
assert len(models.list_attachments(task_conn, "PRJ-001")) == 1
assert len(models.list_attachments(task_conn, "PRJ-002")) == 1
def test_get_attachment_not_found_returns_none(task_conn):
"""KIN-090: get_attachment возвращает None если вложение не найдено."""
assert models.get_attachment(task_conn, 99999) is None
def test_delete_attachment_returns_true(task_conn):
"""KIN-090: delete_attachment возвращает True при успешном удалении."""
att = models.create_attachment(task_conn, "PRJ-001", "del.png",
"/tmp/del.png", "image/png", 50)
assert models.delete_attachment(task_conn, att["id"]) is True
assert models.get_attachment(task_conn, att["id"]) is None
def test_delete_attachment_not_found_returns_false(task_conn):
"""KIN-090: delete_attachment возвращает False если запись не найдена."""
assert models.delete_attachment(task_conn, 99999) is False
2026-03-17 15:40:31 +02:00
# ---------------------------------------------------------------------------
# KIN-ARCH-008: test_command на уровне проекта
# ---------------------------------------------------------------------------
def test_schema_project_has_test_command_column(conn):
"""KIN-ARCH-008: таблица projects содержит колонку test_command."""
cols = {row["name"] for row in conn.execute("PRAGMA table_info(projects)")}
assert "test_command" in cols
2026-03-17 19:30:15 +02:00
def test_test_command_default_is_null(conn):
"""KIN-101: новый проект без test_command получает NULL (авто-определение фреймворка)."""
2026-03-17 15:40:31 +02:00
p = models.create_project(conn, "prj_tc", "TC Project", "/tmp/tc")
2026-03-17 19:30:15 +02:00
assert p["test_command"] is None
2026-03-17 15:40:31 +02:00
def test_test_command_can_be_set(conn):
"""KIN-ARCH-008: update_project сохраняет кастомный test_command."""
models.create_project(conn, "prj_tc2", "TC Project 2", "/tmp/tc2")
updated = models.update_project(conn, "prj_tc2", test_command="pytest -v --tb=short")
assert updated["test_command"] == "pytest -v --tb=short"
2026-03-17 17:26:31 +02:00
# ---------------------------------------------------------------------------
# KIN-084: write_log() и get_pipeline_logs()
# ---------------------------------------------------------------------------
@pytest.fixture
def pipeline_conn(conn):
"""Возвращает (conn, pipeline_id) для тестов write_log / get_pipeline_logs."""
models.create_project(conn, "plog_proj", "Log Project", "/tmp/plog")
models.create_task(conn, "PLOG-001", "plog_proj", "Log Task")
pipe = models.create_pipeline(conn, "PLOG-001", "plog_proj", "feature", [{"role": "dev"}])
return conn, pipe["id"]
def test_write_log_returns_dict(pipeline_conn):
"""KIN-084: write_log возвращает dict с id, pipeline_id, message, level."""
db, pid = pipeline_conn
entry = models.write_log(db, pid, "Pipeline started")
assert isinstance(entry, dict)
assert entry["id"] is not None
assert entry["pipeline_id"] == pid
assert entry["message"] == "Pipeline started"
assert entry["level"] == "INFO"
assert entry["ts"] is not None
def test_write_log_extra_none_gives_null(pipeline_conn):
"""KIN-084: write_log без extra → extra_json=None."""
db, pid = pipeline_conn
entry = models.write_log(db, pid, "No extra", extra=None)
assert entry["extra_json"] is None
def test_write_log_extra_dict_decoded(pipeline_conn):
"""KIN-084: write_log с extra=dict → extra_json декодируется в dict."""
db, pid = pipeline_conn
entry = models.write_log(db, pid, "With extra", extra={"role": "dev", "model": "sonnet"})
assert isinstance(entry["extra_json"], dict)
assert entry["extra_json"]["role"] == "dev"
assert entry["extra_json"]["model"] == "sonnet"
def test_write_log_custom_ts_stored_exactly(pipeline_conn):
"""KIN-OBS-025: write_log с ts='...' сохраняет переданный timestamp без изменений (UTC-naive)."""
db, pid = pipeline_conn
custom_ts = "2026-03-17T10:00:05"
entry = models.write_log(db, pid, "PM start: task planning", ts=custom_ts, extra={"role": "pm"})
assert entry["ts"] == custom_ts
def test_write_log_no_ts_uses_db_default(pipeline_conn):
"""KIN-OBS-025: write_log без ts — таймстемп заполняется БД (не None)."""
db, pid = pipeline_conn
entry = models.write_log(db, pid, "Regular entry")
assert entry["ts"] is not None
# DB default — UTC-naive ISO string, no timezone suffix
assert "+" not in entry["ts"]
assert "Z" not in entry["ts"]
2026-03-17 17:26:31 +02:00
def test_get_pipeline_logs_since_id_zero_returns_all(pipeline_conn):
"""KIN-084: get_pipeline_logs(since_id=0) возвращает все записи."""
db, pid = pipeline_conn
models.write_log(db, pid, "Entry 1")
models.write_log(db, pid, "Entry 2")
models.write_log(db, pid, "Entry 3")
logs = models.get_pipeline_logs(db, pid, since_id=0)
assert len(logs) == 3
def test_get_pipeline_logs_since_id_filters(pipeline_conn):
"""KIN-084: get_pipeline_logs(since_id=N) возвращает только id > N."""
db, pid = pipeline_conn
e1 = models.write_log(db, pid, "Entry 1")
models.write_log(db, pid, "Entry 2")
models.write_log(db, pid, "Entry 3")
logs = models.get_pipeline_logs(db, pid, since_id=e1["id"])
assert len(logs) == 2
assert all(log["id"] > e1["id"] for log in logs)
def test_get_pipeline_logs_ordered_asc(pipeline_conn):
"""KIN-084: get_pipeline_logs возвращает записи в хронологическом порядке."""
db, pid = pipeline_conn
models.write_log(db, pid, "First")
models.write_log(db, pid, "Second")
models.write_log(db, pid, "Third")
logs = models.get_pipeline_logs(db, pid)
ids = [log["id"] for log in logs]
assert ids == sorted(ids)
2026-03-18 21:46:41 +02:00
# ---------------------------------------------------------------------------
# KIN-UI-018: Защита от circular references в has_open_children /
# _check_parent_completion (decision #816, #817)
# ---------------------------------------------------------------------------
def test_circular_reference_protection_has_open_children_returns_false(conn):
"""KIN-UI-018 (decision #816): has_open_children возвращает False при циклической ссылке A→B→A.
Задачи A и B создаются напрямую в БД с взаимными parent_task_id.
Ожидаемый результат: False (не True, не RecursionError).
"""
models.create_project(conn, "p1", "P1", "/p1")
# Создаём задачи без parent сначала
models.create_task(conn, "P1-CYC-A", "p1", "Task A")
models.create_task(conn, "P1-CYC-B", "p1", "Task B")
# Устанавливаем цикл напрямую в БД, минуя валидацию API
conn.execute("UPDATE tasks SET parent_task_id = 'P1-CYC-B' WHERE id = 'P1-CYC-A'")
conn.execute("UPDATE tasks SET parent_task_id = 'P1-CYC-A' WHERE id = 'P1-CYC-B'")
conn.commit()
result_a = models.has_open_children(conn, "P1-CYC-A")
result_b = models.has_open_children(conn, "P1-CYC-B")
assert result_a is False
assert result_b is False
def test_circular_reference_protection_check_parent_completion_returns_without_error(conn):
"""KIN-UI-018 (decision #817): _check_parent_completion не падает и не зависает при цикле A→B→A.
Задачи A и B в статусе 'revising' с взаимными parent_task_id.
Ожидаемый результат: возврат без RecursionError, статус задач не изменился.
"""
models.create_project(conn, "p1", "P1", "/p1")
models.create_task(conn, "P1-CPC-A", "p1", "Task A", status="revising")
models.create_task(conn, "P1-CPC-B", "p1", "Task B", status="revising")
# Устанавливаем цикл напрямую в БД
conn.execute("UPDATE tasks SET parent_task_id = 'P1-CPC-B' WHERE id = 'P1-CPC-A'")
conn.execute("UPDATE tasks SET parent_task_id = 'P1-CPC-A' WHERE id = 'P1-CPC-B'")
conn.commit()
# Не должно бросить RecursionError или зависнуть
models._check_parent_completion(conn, "P1-CPC-A")
models._check_parent_completion(conn, "P1-CPC-B")
# Статусы не изменились — цикл обнаружен и прерван
task_a = models.get_task(conn, "P1-CPC-A")
task_b = models.get_task(conn, "P1-CPC-B")
assert task_a["status"] == "revising"
assert task_b["status"] == "revising"
# ---------------------------------------------------------------------------
# KIN-UI-020: VALID_TASK_STATUSES — frozenset membership checks
# ---------------------------------------------------------------------------
def test_valid_task_statuses_is_frozenset():
"""KIN-UI-020: VALID_TASK_STATUSES должен быть frozenset, не list."""
assert isinstance(models.VALID_TASK_STATUSES, frozenset)
@pytest.mark.parametrize("status", [
"pending", "in_progress", "review", "done",
"blocked", "decomposed", "cancelled", "revising",
])
def test_valid_task_statuses_membership(status):
"""KIN-UI-020: каждый валидный статус присутствует в VALID_TASK_STATUSES (membership check)."""
assert status in models.VALID_TASK_STATUSES
def test_invalid_status_not_in_valid_task_statuses():
"""KIN-UI-020: невалидный статус отсутствует в VALID_TASK_STATUSES."""
assert "invalid_status" not in models.VALID_TASK_STATUSES
assert "" not in models.VALID_TASK_STATUSES
assert "active" not in models.VALID_TASK_STATUSES