"""Tests for core/models.py — all functions, in-memory SQLite.""" import re import pytest from unittest.mock import patch from core.db import init_db from core import models from core.models import TASK_CATEGORIES @pytest.fixture def conn(): """Fresh in-memory DB for each test.""" c = init_db(db_path=":memory:") yield c c.close() # -- Projects -- def test_create_and_get_project(conn): p = models.create_project(conn, "vdol", "В долю поперёк", "~/projects/vdolipoperek", tech_stack=["vue3", "nuxt"]) assert p["id"] == "vdol" assert p["tech_stack"] == ["vue3", "nuxt"] assert p["status"] == "active" fetched = models.get_project(conn, "vdol") assert fetched["name"] == "В долю поперёк" def test_get_project_not_found(conn): assert models.get_project(conn, "nope") is None def test_list_projects_filter(conn): models.create_project(conn, "a", "A", "/a", status="active") models.create_project(conn, "b", "B", "/b", status="paused") models.create_project(conn, "c", "C", "/c", status="active") assert len(models.list_projects(conn)) == 3 assert len(models.list_projects(conn, status="active")) == 2 assert len(models.list_projects(conn, status="paused")) == 1 def test_update_project(conn): models.create_project(conn, "x", "X", "/x", priority=5) updated = models.update_project(conn, "x", priority=1, status="maintenance") assert updated["priority"] == 1 assert updated["status"] == "maintenance" def test_update_project_tech_stack_json(conn): models.create_project(conn, "x", "X", "/x", tech_stack=["python"]) updated = models.update_project(conn, "x", tech_stack=["python", "fastapi"]) assert updated["tech_stack"] == ["python", "fastapi"] # -- project_type and SSH fields (KIN-071) -- def test_create_operations_project(conn): """KIN-071: operations project stores SSH fields. KIN-ARCH-005: path не передаётся.""" p = models.create_project( conn, "srv1", "My Server", project_type="operations", ssh_host="10.0.0.1", ssh_user="root", ssh_key_path="~/.ssh/id_rsa", ssh_proxy_jump="jumpt", ) assert p["project_type"] == "operations" assert p["ssh_host"] == "10.0.0.1" assert p["ssh_user"] == "root" assert p["ssh_key_path"] == "~/.ssh/id_rsa" assert p["ssh_proxy_jump"] == "jumpt" assert p["path"] is None def test_create_development_project_defaults(conn): """KIN-071: development is default project_type.""" p = models.create_project(conn, "devp", "Dev Project", "/path") assert p["project_type"] == "development" assert p["ssh_host"] is None def test_update_project_ssh_fields(conn): """KIN-071: update_project can set SSH fields.""" models.create_project(conn, "srv2", "Server 2", project_type="operations") updated = models.update_project(conn, "srv2", ssh_host="192.168.1.1", ssh_user="pelmen") assert updated["ssh_host"] == "192.168.1.1" assert updated["ssh_user"] == "pelmen" assert updated["path"] is None # --------------------------------------------------------------------------- # KIN-ARCH-003 — path nullable для operations-проектов # Исправляет баг: workaround с пустой строкой ("") для operations-проектов # --------------------------------------------------------------------------- def test_kin_arch_003_operations_project_without_path_stores_null(conn): """KIN-ARCH-003: operations-проект без path сохраняется с path=NULL, не пустой строкой. До фикса: workaround — передавать path='' чтобы обойти NOT NULL constraint. После фикса: path=None (NULL в БД) допустим для operations-проектов. """ p = models.create_project( conn, "ops_null", "Ops Null Path", project_type="operations", ssh_host="10.0.0.1", ) assert p["path"] is None, ( "KIN-ARCH-003 регрессия: path должен быть NULL, а не пустой строкой" ) def test_kin_arch_003_check_constraint_rejects_null_path_for_development(conn): """KIN-ARCH-003: CHECK constraint (path IS NOT NULL OR project_type='operations') отклоняет path=NULL для development-проектов.""" import sqlite3 as _sqlite3 with pytest.raises(_sqlite3.IntegrityError): models.create_project( conn, "dev_no_path", "Dev No Path", path=None, project_type="development", ) # -- validate_completion_mode (KIN-063) -- def test_validate_completion_mode_valid_auto_complete(): """validate_completion_mode принимает 'auto_complete'.""" assert models.validate_completion_mode("auto_complete") == "auto_complete" def test_validate_completion_mode_valid_review(): """validate_completion_mode принимает 'review'.""" assert models.validate_completion_mode("review") == "review" def test_validate_completion_mode_invalid_fallback(): """validate_completion_mode возвращает 'review' для невалидных значений (фоллбэк).""" assert models.validate_completion_mode("auto") == "review" assert models.validate_completion_mode("") == "review" assert models.validate_completion_mode("unknown") == "review" # -- get_effective_mode (KIN-063) -- def test_get_effective_mode_task_overrides_project(conn): """Task execution_mode имеет приоритет над project execution_mode.""" models.create_project(conn, "p1", "P1", "/p1", execution_mode="review") models.create_task(conn, "P1-001", "p1", "Task", execution_mode="auto_complete") mode = models.get_effective_mode(conn, "p1", "P1-001") assert mode == "auto_complete" def test_get_effective_mode_falls_back_to_project(conn): """Если задача без execution_mode — применяется project execution_mode.""" models.create_project(conn, "p1", "P1", "/p1", execution_mode="auto_complete") models.create_task(conn, "P1-001", "p1", "Task") # execution_mode=None mode = models.get_effective_mode(conn, "p1", "P1-001") assert mode == "auto_complete" def test_get_effective_mode_project_review_overrides_default(conn): """Project execution_mode='review' + task без override → возвращает 'review'. Сценарий: PM хотел auto_complete, но проект настроен на review человеком. get_effective_mode должен вернуть project-level 'review'. """ models.create_project(conn, "p1", "P1", "/p1", execution_mode="review") models.create_task(conn, "P1-001", "p1", "Task") # нет task-level override mode = models.get_effective_mode(conn, "p1", "P1-001") assert mode == "review" # -- Tasks -- def test_create_and_get_task(conn): models.create_project(conn, "p1", "P1", "/p1") t = models.create_task(conn, "P1-001", "p1", "Fix bug", brief={"summary": "broken login"}) assert t["id"] == "P1-001" assert t["brief"] == {"summary": "broken login"} assert t["status"] == "pending" def test_list_tasks_filters(conn): models.create_project(conn, "p1", "P1", "/p1") models.create_project(conn, "p2", "P2", "/p2") models.create_task(conn, "P1-001", "p1", "Task A", status="pending") models.create_task(conn, "P1-002", "p1", "Task B", status="done") models.create_task(conn, "P2-001", "p2", "Task C", status="pending") assert len(models.list_tasks(conn)) == 3 assert len(models.list_tasks(conn, project_id="p1")) == 2 assert len(models.list_tasks(conn, status="pending")) == 2 assert len(models.list_tasks(conn, project_id="p1", status="done")) == 1 def test_update_task(conn): models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Task") updated = models.update_task(conn, "P1-001", status="in_progress", spec={"steps": [1, 2, 3]}) assert updated["status"] == "in_progress" assert updated["spec"] == {"steps": [1, 2, 3]} assert updated["updated_at"] is not None def test_subtask(conn): models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Parent") child = models.create_task(conn, "P1-001a", "p1", "Child", parent_task_id="P1-001") assert child["parent_task_id"] == "P1-001" # -- Decisions -- def test_add_and_get_decisions(conn): models.create_project(conn, "p1", "P1", "/p1") d = models.add_decision(conn, "p1", "gotcha", "iOS Safari bottom sheet", "position:fixed breaks on iOS Safari", category="ui", tags=["ios-safari", "css"]) assert d["type"] == "gotcha" assert d["tags"] == ["ios-safari", "css"] results = models.get_decisions(conn, "p1") assert len(results) == 1 def test_decisions_filter_by_category(conn): models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "decision", "Use WAL", "perf", category="architecture") models.add_decision(conn, "p1", "gotcha", "Safari bug", "css", category="ui") assert len(models.get_decisions(conn, "p1", category="ui")) == 1 def test_decisions_filter_by_tags(conn): models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "gotcha", "Bug A", "desc", tags=["safari", "css"]) models.add_decision(conn, "p1", "gotcha", "Bug B", "desc", tags=["chrome", "js"]) models.add_decision(conn, "p1", "gotcha", "Bug C", "desc", tags=["safari", "js"]) assert len(models.get_decisions(conn, "p1", tags=["safari"])) == 2 assert len(models.get_decisions(conn, "p1", tags=["js"])) == 2 assert len(models.get_decisions(conn, "p1", tags=["css"])) == 1 def test_decisions_filter_by_types(conn): models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "decision", "A", "a") models.add_decision(conn, "p1", "gotcha", "B", "b") models.add_decision(conn, "p1", "workaround", "C", "c") assert len(models.get_decisions(conn, "p1", types=["gotcha", "workaround"])) == 2 def test_decisions_limit(conn): models.create_project(conn, "p1", "P1", "/p1") for i in range(10): models.add_decision(conn, "p1", "decision", f"D{i}", f"desc{i}") assert len(models.get_decisions(conn, "p1", limit=3)) == 3 # -- Modules -- def test_add_and_get_modules(conn): models.create_project(conn, "p1", "P1", "/p1") m = models.add_module(conn, "p1", "search", "frontend", "src/search/", description="Search UI", dependencies=["auth"]) assert m["name"] == "search" assert m["dependencies"] == ["auth"] mods = models.get_modules(conn, "p1") assert len(mods) == 1 def test_add_module_created_true_for_new_module(conn): """KIN-081: add_module возвращает _created=True для нового модуля (INSERT).""" models.create_project(conn, "p1", "P1", "/p1") m = models.add_module(conn, "p1", "api", "backend", "src/api/") assert m["_created"] is True assert m["name"] == "api" def test_add_module_created_false_for_duplicate_name(conn): """KIN-081: add_module возвращает _created=False при дублировании по имени (INSERT OR IGNORE). UNIQUE constraint — (project_id, name). Второй INSERT с тем же name игнорируется, возвращается существующая запись с _created=False. """ models.create_project(conn, "p1", "P1", "/p1") m1 = models.add_module(conn, "p1", "api", "backend", "src/api/") assert m1["_created"] is True # Same name, different path — should be ignored m2 = models.add_module(conn, "p1", "api", "frontend", "src/api-v2/") assert m2["_created"] is False assert m2["name"] == "api" # Only one module in DB assert len(models.get_modules(conn, "p1")) == 1 def test_add_module_duplicate_returns_original_row(conn): """KIN-081: при дублировании add_module возвращает оригинальную запись (не новые данные).""" models.create_project(conn, "p1", "P1", "/p1") m1 = models.add_module(conn, "p1", "api", "backend", "src/api/", description="original desc") m2 = models.add_module(conn, "p1", "api", "frontend", "src/api-v2/", description="new desc") # Should return original row, not updated one assert m2["type"] == "backend" assert m2["description"] == "original desc" assert m2["id"] == m1["id"] def test_add_module_same_name_different_projects_are_independent(conn): """KIN-081: два проекта могут иметь одноимённые модули — UNIQUE per project_id.""" models.create_project(conn, "p1", "P1", "/p1") models.create_project(conn, "p2", "P2", "/p2") m1 = models.add_module(conn, "p1", "api", "backend", "src/api/") m2 = models.add_module(conn, "p2", "api", "backend", "src/api/") assert m1["_created"] is True assert m2["_created"] is True assert m1["id"] != m2["id"] # -- delete_project -- def test_delete_project_removes_project_record(conn): """KIN-081: delete_project удаляет запись из таблицы projects.""" models.create_project(conn, "p1", "P1", "/p1") assert models.get_project(conn, "p1") is not None models.delete_project(conn, "p1") assert models.get_project(conn, "p1") is None def test_delete_project_cascades_to_related_tables(conn): """KIN-081: delete_project удаляет связанные modules, decisions, tasks, agent_logs.""" models.create_project(conn, "p1", "P1", "/p1") models.add_module(conn, "p1", "api", "backend", "src/api/") models.add_decision(conn, "p1", "gotcha", "Bug X", "desc") models.create_task(conn, "P1-001", "p1", "Task") models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-001") models.delete_project(conn, "p1") assert conn.execute("SELECT COUNT(*) FROM modules WHERE project_id='p1'").fetchone()[0] == 0 assert conn.execute("SELECT COUNT(*) FROM decisions WHERE project_id='p1'").fetchone()[0] == 0 assert conn.execute("SELECT COUNT(*) FROM tasks WHERE project_id='p1'").fetchone()[0] == 0 assert conn.execute("SELECT COUNT(*) FROM agent_logs WHERE project_id='p1'").fetchone()[0] == 0 def test_delete_project_nonexistent_does_not_raise(conn): """KIN-081: delete_project на несуществующий проект не бросает исключение.""" models.delete_project(conn, "nonexistent") def test_delete_project_with_pipeline_and_handoffs(conn): """FK bug fix: delete_project не падает при наличии department_handoffs и pipeline_log.""" models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Task", status="in_progress") pipeline = models.create_pipeline(conn, "P1-001", "p1", "hotfix", [{"role": "backend_dev"}]) models.create_handoff(conn, pipeline["id"], "P1-001", "engineering") models.write_log(conn, pipeline["id"], "test log message") models.log_audit_event(conn, "dangerous_skip", task_id="P1-001", project_id="p1") # Must not raise OperationalError: FOREIGN KEY constraint failed models.delete_project(conn, "p1") assert conn.execute("SELECT COUNT(*) FROM department_handoffs").fetchone()[0] == 0 assert conn.execute("SELECT COUNT(*) FROM pipeline_log").fetchone()[0] == 0 assert conn.execute("SELECT COUNT(*) FROM audit_log WHERE project_id='p1'").fetchone()[0] == 0 assert conn.execute("SELECT COUNT(*) FROM pipelines WHERE project_id='p1'").fetchone()[0] == 0 assert conn.execute("SELECT COUNT(*) FROM tasks WHERE project_id='p1'").fetchone()[0] == 0 def test_delete_project_cleans_hooks_and_project_links(conn): """FK fix: delete_project удаляет hooks, hook_logs и project_links.""" models.create_project(conn, "p1", "P1", "/p1") models.create_project(conn, "p2", "P2", "/p2") # Create hook and hook_log for p1 conn.execute( "INSERT INTO hooks (project_id, name, event, command) VALUES ('p1', 'h', 'pipeline_completed', 'echo ok')" ) hook_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] conn.execute( "INSERT INTO hook_logs (hook_id, project_id, success) VALUES (?, 'p1', 1)", (hook_id,) ) conn.commit() models.create_project_link(conn, "p1", "p2", "depends_on") models.delete_project(conn, "p1") assert conn.execute("SELECT COUNT(*) FROM hooks WHERE project_id='p1'").fetchone()[0] == 0 assert conn.execute("SELECT COUNT(*) FROM hook_logs WHERE project_id='p1'").fetchone()[0] == 0 assert conn.execute("SELECT COUNT(*) FROM project_links WHERE from_project='p1' OR to_project='p1'").fetchone()[0] == 0 # -- Agent Logs -- def test_log_agent_run(conn): models.create_project(conn, "p1", "P1", "/p1") log = models.log_agent_run(conn, "p1", "developer", "implement", tokens_used=5000, model="sonnet", cost_usd=0.015, duration_seconds=45) assert log["agent_role"] == "developer" assert log["cost_usd"] == 0.015 assert log["success"] == 1 # SQLite boolean def test_count_agent_logs_since_returns_correct_count(conn): """count_agent_logs_since возвращает количество логов >= since_iso.""" models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Task") models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-001") models.log_agent_run(conn, "p1", "reviewer", "review", task_id="P1-001") count = models.count_agent_logs_since(conn, "P1-001", "2000-01-01T00:00:00") assert count == 2 def test_count_agent_logs_since_filters_by_task_id(conn): """count_agent_logs_since не считает логи других задач.""" models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Task A") models.create_task(conn, "P1-002", "p1", "Task B") models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-001") models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-002") assert models.count_agent_logs_since(conn, "P1-001", "2000-01-01T00:00:00") == 1 assert models.count_agent_logs_since(conn, "P1-002", "2000-01-01T00:00:00") == 1 def test_count_agent_logs_since_excludes_before_cutoff(conn): """count_agent_logs_since не считает логи строго до since_iso.""" models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Task") models.log_agent_run(conn, "p1", "developer", "implement", task_id="P1-001") # since_iso в далёком будущем — ни один лог не попадает count = models.count_agent_logs_since(conn, "P1-001", "2099-01-01T00:00:00") assert count == 0 def test_count_agent_logs_since_empty_returns_zero(conn): """count_agent_logs_since возвращает 0 при отсутствии логов.""" models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Task") count = models.count_agent_logs_since(conn, "P1-001", "2000-01-01T00:00:00") assert count == 0 # -- Pipelines -- def test_create_and_update_pipeline(conn): models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Task") pipe = models.create_pipeline(conn, "P1-001", "p1", "feature", [{"step": "architect"}, {"step": "dev"}]) assert pipe["status"] == "running" assert pipe["steps"] == [{"step": "architect"}, {"step": "dev"}] updated = models.update_pipeline(conn, pipe["id"], status="completed", total_cost_usd=0.05, total_tokens=10000) assert updated["status"] == "completed" assert updated["completed_at"] is not None # -- Support -- def test_create_and_list_tickets(conn): models.create_project(conn, "p1", "P1", "/p1") t = models.create_ticket(conn, "p1", "telegram_bot", "Не работает поиск", client_id="tg:12345", classification="bug") assert t["source"] == "telegram_bot" assert t["status"] == "new" tickets = models.list_tickets(conn, project_id="p1") assert len(tickets) == 1 assert len(models.list_tickets(conn, status="resolved")) == 0 # -- Statistics -- def test_project_summary(conn): models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "A", status="done") models.create_task(conn, "P1-002", "p1", "B", status="in_progress") models.create_task(conn, "P1-003", "p1", "C", status="blocked") summary = models.get_project_summary(conn) assert len(summary) == 1 s = summary[0] assert s["total_tasks"] == 3 assert s["done_tasks"] == 1 assert s["active_tasks"] == 1 assert s["blocked_tasks"] == 1 def test_cost_summary(conn): models.create_project(conn, "p1", "P1", "/p1") models.log_agent_run(conn, "p1", "dev", "implement", cost_usd=0.10, tokens_used=5000) models.log_agent_run(conn, "p1", "reviewer", "review", cost_usd=0.05, tokens_used=2000) costs = models.get_cost_summary(conn, days=1) assert len(costs) == 1 assert costs[0]["total_cost_usd"] == pytest.approx(0.15) assert costs[0]["total_tokens"] == 7000 assert costs[0]["runs"] == 2 def test_cost_summary_empty(conn): models.create_project(conn, "p1", "P1", "/p1") assert models.get_cost_summary(conn, days=7) == [] # -- add_decision_if_new -- def test_add_decision_if_new_adds_new_decision(conn): models.create_project(conn, "p1", "P1", "/p1") d = models.add_decision_if_new(conn, "p1", "gotcha", "Use WAL mode", "description") assert d is not None assert d["title"] == "Use WAL mode" assert d["type"] == "gotcha" def test_add_decision_if_new_skips_exact_duplicate(conn): models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "gotcha", "Use WAL mode", "desc1") result = models.add_decision_if_new(conn, "p1", "gotcha", "Use WAL mode", "desc2") assert result is None # Existing decision not duplicated assert len(models.get_decisions(conn, "p1")) == 1 def test_add_decision_if_new_skips_case_insensitive_duplicate(conn): models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "decision", "Use UUID for task IDs", "desc") result = models.add_decision_if_new(conn, "p1", "decision", "use uuid for task ids", "other desc") assert result is None assert len(models.get_decisions(conn, "p1")) == 1 def test_add_decision_if_new_allows_same_title_different_type(conn): models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "gotcha", "SQLite WAL", "desc") result = models.add_decision_if_new(conn, "p1", "convention", "SQLite WAL", "other desc") assert result is not None assert len(models.get_decisions(conn, "p1")) == 2 def test_add_decision_if_new_skips_whitespace_duplicate(conn): models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "convention", "Run tests after each change", "desc") result = models.add_decision_if_new(conn, "p1", "convention", " Run tests after each change ", "desc2") assert result is None assert len(models.get_decisions(conn, "p1")) == 1 # -- next_task_id (KIN-OBS-009) -- def test_next_task_id_with_category_first(conn): """Первая задача с category='SEC' → 'VDOL-SEC-001'.""" models.create_project(conn, "vdol", "VDOL", "/vdol") task_id = models.next_task_id(conn, "vdol", category="SEC") assert task_id == "VDOL-SEC-001" def test_next_task_id_with_category_increments(conn): """Вторая задача с category='SEC' → 'VDOL-SEC-002'.""" models.create_project(conn, "vdol", "VDOL", "/vdol") models.create_task(conn, "VDOL-SEC-001", "vdol", "Task 1", category="SEC") task_id = models.next_task_id(conn, "vdol", category="SEC") assert task_id == "VDOL-SEC-002" def test_next_task_id_category_counters_independent(conn): """Счётчики категорий независимы: SEC-002 не влияет на UI-001.""" models.create_project(conn, "vdol", "VDOL", "/vdol") models.create_task(conn, "VDOL-SEC-001", "vdol", "Sec Task 1", category="SEC") models.create_task(conn, "VDOL-SEC-002", "vdol", "Sec Task 2", category="SEC") task_id = models.next_task_id(conn, "vdol", category="UI") assert task_id == "VDOL-UI-001" def test_next_task_id_without_category_backward_compat(conn): """Задача без category → 'VDOL-001' (backward compat).""" models.create_project(conn, "vdol", "VDOL", "/vdol") task_id = models.next_task_id(conn, "vdol") assert task_id == "VDOL-001" def test_next_task_id_mixed_formats_no_collision(conn): """Смешанный проект: счётчики старого и нового форматов не пересекаются.""" models.create_project(conn, "kin", "KIN", "/kin") models.create_task(conn, "KIN-001", "kin", "Old style task") models.create_task(conn, "KIN-002", "kin", "Old style task 2") # Новый формат с категорией не мешает старому cat_id = models.next_task_id(conn, "kin", category="OBS") assert cat_id == "KIN-OBS-001" # Старый формат не мешает новому old_id = models.next_task_id(conn, "kin") assert old_id == "KIN-003" # -- Obsidian sync regex (KIN-OBS-009, решение #75) -- _OBSIDIAN_TASK_PATTERN = re.compile( r"^[-*]\s+\[([xX ])\]\s+([A-Z][A-Z0-9]*-(?:[A-Z][A-Z0-9]*-)?\d+)\s+(.+)$" ) def test_obsidian_regex_matches_old_format(): """Старый формат KIN-001 матчится.""" m = _OBSIDIAN_TASK_PATTERN.match("- [x] KIN-001 Fix login bug") assert m is not None assert m.group(2) == "KIN-001" def test_obsidian_regex_matches_new_format(): """Новый формат VDOL-SEC-001 матчится.""" m = _OBSIDIAN_TASK_PATTERN.match("- [ ] VDOL-SEC-001 Security audit") assert m is not None assert m.group(2) == "VDOL-SEC-001" def test_obsidian_regex_matches_obs_format(): """Формат KIN-OBS-009 матчится (проверяем задачу этой фичи).""" m = _OBSIDIAN_TASK_PATTERN.match("* [X] KIN-OBS-009 Task ID по категориям") assert m is not None assert m.group(2) == "KIN-OBS-009" def test_obsidian_regex_no_match_lowercase(): """Нижний регистр не матчится.""" assert _OBSIDIAN_TASK_PATTERN.match("- [x] proj-001 lowercase id") is None def test_obsidian_regex_no_match_numeric_prefix(): """Числовой префикс не матчится.""" assert _OBSIDIAN_TASK_PATTERN.match("- [x] 123-abc invalid format") is None def test_obsidian_regex_done_state(conn): """Статус done/pending корректно извлекается.""" m_done = _OBSIDIAN_TASK_PATTERN.match("- [x] KIN-UI-003 Done task") m_pending = _OBSIDIAN_TASK_PATTERN.match("- [ ] KIN-UI-004 Pending task") assert m_done.group(1) == "x" assert m_pending.group(1) == " " # -- next_task_id для всех 12 категорий (KIN-OBS-009) -- @pytest.mark.parametrize("cat", TASK_CATEGORIES) def test_next_task_id_all_categories_generate_correct_format(conn, cat): """next_task_id генерирует ID формата PROJ-CAT-001 для каждой из 12 категорий.""" models.create_project(conn, "vdol", "VDOL", "/vdol") task_id = models.next_task_id(conn, "vdol", category=cat) assert task_id == f"VDOL-{cat}-001" # -- update_task category не ломает brief (KIN-OBS-009, решение #74) -- def test_update_task_category_preserves_brief(conn): """update_task(category=...) не перетирает существующее поле brief.""" models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Task", brief={"summary": "important context"}) updated = models.update_task(conn, "P1-001", category="SEC") assert updated["category"] == "SEC" assert updated["brief"] == {"summary": "important context"} def test_update_task_category_preserves_status_and_priority(conn): """update_task(category=...) не меняет остальные поля задачи.""" models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-001", "p1", "Task", status="in_progress", priority=3) updated = models.update_task(conn, "P1-001", category="UI") assert updated["category"] == "UI" assert updated["status"] == "in_progress" assert updated["priority"] == 3 # -- KIN-ARCH-006: autocommit_enabled и obsidian_vault_path в SCHEMA -- def test_schema_project_has_autocommit_enabled_column(conn): """KIN-ARCH-006: таблица projects содержит колонку autocommit_enabled.""" cols = {r[1] for r in conn.execute("PRAGMA table_info(projects)").fetchall()} assert "autocommit_enabled" in cols def test_schema_project_has_obsidian_vault_path_column(conn): """KIN-ARCH-006: таблица projects содержит колонку obsidian_vault_path.""" cols = {r[1] for r in conn.execute("PRAGMA table_info(projects)").fetchall()} assert "obsidian_vault_path" in cols def test_autocommit_enabled_default_is_zero(conn): """KIN-ARCH-006: autocommit_enabled по умолчанию равен 0.""" models.create_project(conn, "p1", "P1", "/p1") p = models.get_project(conn, "p1") assert p["autocommit_enabled"] == 0 def test_obsidian_vault_path_default_is_none(conn): """KIN-ARCH-006: obsidian_vault_path по умолчанию равен NULL.""" models.create_project(conn, "p1", "P1", "/p1") p = models.get_project(conn, "p1") assert p["obsidian_vault_path"] is None def test_autocommit_enabled_can_be_set_to_one(conn): """KIN-ARCH-006: autocommit_enabled можно установить в 1 через update_project.""" models.create_project(conn, "p1", "P1", "/p1") updated = models.update_project(conn, "p1", autocommit_enabled=1) assert updated["autocommit_enabled"] == 1 def test_obsidian_vault_path_can_be_set(conn): """KIN-ARCH-006: obsidian_vault_path можно установить через update_project.""" models.create_project(conn, "p1", "P1", "/p1") updated = models.update_project(conn, "p1", obsidian_vault_path="/vault/my-notes") assert updated["obsidian_vault_path"] == "/vault/my-notes" # --------------------------------------------------------------------------- # KIN-090: Task Attachments # --------------------------------------------------------------------------- @pytest.fixture def task_conn(conn): """conn with seeded project and task for attachment tests.""" models.create_project(conn, "prj", "Project", "/tmp/prj") models.create_task(conn, "PRJ-001", "prj", "Fix bug") return conn def test_create_attachment_returns_dict(task_conn): """KIN-090: create_attachment возвращает dict со всеми полями.""" att = models.create_attachment( task_conn, "PRJ-001", "screenshot.png", "/tmp/prj/.kin/attachments/PRJ-001/screenshot.png", "image/png", 1024, ) assert att["id"] is not None assert att["task_id"] == "PRJ-001" assert att["filename"] == "screenshot.png" assert att["path"] == "/tmp/prj/.kin/attachments/PRJ-001/screenshot.png" assert att["mime_type"] == "image/png" assert att["size"] == 1024 assert att["created_at"] is not None def test_create_attachment_persists_in_sqlite(task_conn): """KIN-090: AC4 — данные вложения персистируются в SQLite.""" att = models.create_attachment( task_conn, "PRJ-001", "bug.png", "/tmp/prj/.kin/attachments/PRJ-001/bug.png", "image/png", 512, ) fetched = models.get_attachment(task_conn, att["id"]) assert fetched is not None assert fetched["filename"] == "bug.png" assert fetched["size"] == 512 def test_list_attachments_empty_for_new_task(task_conn): """KIN-090: list_attachments возвращает [] для задачи без вложений.""" result = models.list_attachments(task_conn, "PRJ-001") assert result == [] def test_list_attachments_returns_all_for_task(task_conn): """KIN-090: list_attachments возвращает все вложения задачи.""" models.create_attachment(task_conn, "PRJ-001", "a.png", "/tmp/prj/.kin/attachments/PRJ-001/a.png", "image/png", 100) models.create_attachment(task_conn, "PRJ-001", "b.jpg", "/tmp/prj/.kin/attachments/PRJ-001/b.jpg", "image/jpeg", 200) result = models.list_attachments(task_conn, "PRJ-001") assert len(result) == 2 filenames = {a["filename"] for a in result} assert filenames == {"a.png", "b.jpg"} def test_list_attachments_isolated_by_task(task_conn): """KIN-090: list_attachments не возвращает вложения других задач.""" models.create_task(task_conn, "PRJ-002", "prj", "Other task") models.create_attachment(task_conn, "PRJ-001", "a.png", "/tmp/.kin/PRJ-001/a.png", "image/png", 100) models.create_attachment(task_conn, "PRJ-002", "b.png", "/tmp/.kin/PRJ-002/b.png", "image/png", 100) assert len(models.list_attachments(task_conn, "PRJ-001")) == 1 assert len(models.list_attachments(task_conn, "PRJ-002")) == 1 def test_get_attachment_not_found_returns_none(task_conn): """KIN-090: get_attachment возвращает None если вложение не найдено.""" assert models.get_attachment(task_conn, 99999) is None def test_delete_attachment_returns_true(task_conn): """KIN-090: delete_attachment возвращает True при успешном удалении.""" att = models.create_attachment(task_conn, "PRJ-001", "del.png", "/tmp/del.png", "image/png", 50) assert models.delete_attachment(task_conn, att["id"]) is True assert models.get_attachment(task_conn, att["id"]) is None def test_delete_attachment_not_found_returns_false(task_conn): """KIN-090: delete_attachment возвращает False если запись не найдена.""" assert models.delete_attachment(task_conn, 99999) is False # --------------------------------------------------------------------------- # KIN-ARCH-008: test_command на уровне проекта # --------------------------------------------------------------------------- def test_schema_project_has_test_command_column(conn): """KIN-ARCH-008: таблица projects содержит колонку test_command.""" cols = {row["name"] for row in conn.execute("PRAGMA table_info(projects)")} assert "test_command" in cols def test_test_command_default_is_null(conn): """KIN-101: новый проект без test_command получает NULL (авто-определение фреймворка).""" p = models.create_project(conn, "prj_tc", "TC Project", "/tmp/tc") assert p["test_command"] is None def test_test_command_can_be_set(conn): """KIN-ARCH-008: update_project сохраняет кастомный test_command.""" models.create_project(conn, "prj_tc2", "TC Project 2", "/tmp/tc2") updated = models.update_project(conn, "prj_tc2", test_command="pytest -v --tb=short") assert updated["test_command"] == "pytest -v --tb=short" # --------------------------------------------------------------------------- # KIN-084: write_log() и get_pipeline_logs() # --------------------------------------------------------------------------- @pytest.fixture def pipeline_conn(conn): """Возвращает (conn, pipeline_id) для тестов write_log / get_pipeline_logs.""" models.create_project(conn, "plog_proj", "Log Project", "/tmp/plog") models.create_task(conn, "PLOG-001", "plog_proj", "Log Task") pipe = models.create_pipeline(conn, "PLOG-001", "plog_proj", "feature", [{"role": "dev"}]) return conn, pipe["id"] def test_write_log_returns_dict(pipeline_conn): """KIN-084: write_log возвращает dict с id, pipeline_id, message, level.""" db, pid = pipeline_conn entry = models.write_log(db, pid, "Pipeline started") assert isinstance(entry, dict) assert entry["id"] is not None assert entry["pipeline_id"] == pid assert entry["message"] == "Pipeline started" assert entry["level"] == "INFO" assert entry["ts"] is not None def test_write_log_extra_none_gives_null(pipeline_conn): """KIN-084: write_log без extra → extra_json=None.""" db, pid = pipeline_conn entry = models.write_log(db, pid, "No extra", extra=None) assert entry["extra_json"] is None def test_write_log_extra_dict_decoded(pipeline_conn): """KIN-084: write_log с extra=dict → extra_json декодируется в dict.""" db, pid = pipeline_conn entry = models.write_log(db, pid, "With extra", extra={"role": "dev", "model": "sonnet"}) assert isinstance(entry["extra_json"], dict) assert entry["extra_json"]["role"] == "dev" assert entry["extra_json"]["model"] == "sonnet" def test_write_log_custom_ts_stored_exactly(pipeline_conn): """KIN-OBS-025: write_log с ts='...' сохраняет переданный timestamp без изменений (UTC-naive).""" db, pid = pipeline_conn custom_ts = "2026-03-17T10:00:05" entry = models.write_log(db, pid, "PM start: task planning", ts=custom_ts, extra={"role": "pm"}) assert entry["ts"] == custom_ts def test_write_log_no_ts_uses_db_default(pipeline_conn): """KIN-OBS-025: write_log без ts — таймстемп заполняется БД (не None).""" db, pid = pipeline_conn entry = models.write_log(db, pid, "Regular entry") assert entry["ts"] is not None # DB default — UTC-naive ISO string, no timezone suffix assert "+" not in entry["ts"] assert "Z" not in entry["ts"] def test_get_pipeline_logs_since_id_zero_returns_all(pipeline_conn): """KIN-084: get_pipeline_logs(since_id=0) возвращает все записи.""" db, pid = pipeline_conn models.write_log(db, pid, "Entry 1") models.write_log(db, pid, "Entry 2") models.write_log(db, pid, "Entry 3") logs = models.get_pipeline_logs(db, pid, since_id=0) assert len(logs) == 3 def test_get_pipeline_logs_since_id_filters(pipeline_conn): """KIN-084: get_pipeline_logs(since_id=N) возвращает только id > N.""" db, pid = pipeline_conn e1 = models.write_log(db, pid, "Entry 1") models.write_log(db, pid, "Entry 2") models.write_log(db, pid, "Entry 3") logs = models.get_pipeline_logs(db, pid, since_id=e1["id"]) assert len(logs) == 2 assert all(log["id"] > e1["id"] for log in logs) def test_get_pipeline_logs_ordered_asc(pipeline_conn): """KIN-084: get_pipeline_logs возвращает записи в хронологическом порядке.""" db, pid = pipeline_conn models.write_log(db, pid, "First") models.write_log(db, pid, "Second") models.write_log(db, pid, "Third") logs = models.get_pipeline_logs(db, pid) ids = [log["id"] for log in logs] assert ids == sorted(ids) # --------------------------------------------------------------------------- # KIN-UI-018: Защита от circular references в has_open_children / # _check_parent_completion (decision #816, #817) # --------------------------------------------------------------------------- def test_circular_reference_protection_has_open_children_returns_false(conn): """KIN-UI-018 (decision #816): has_open_children возвращает False при циклической ссылке A→B→A. Задачи A и B в статусе 'done' создаются напрямую в БД с взаимными parent_task_id. Статус 'done' необходим: рекурсия запускается только для done/cancelled потомков, поэтому только с done-задачами цикл реально проходит проверку visited-защиты. Ожидаемый результат: False (не True, не RecursionError). """ models.create_project(conn, "p1", "P1", "/p1") # Создаём задачи в статусе 'done' — рекурсия проходит через них models.create_task(conn, "P1-CYC-A", "p1", "Task A", status="done") models.create_task(conn, "P1-CYC-B", "p1", "Task B", status="done") # Устанавливаем цикл напрямую в БД, минуя валидацию API conn.execute("UPDATE tasks SET parent_task_id = 'P1-CYC-B' WHERE id = 'P1-CYC-A'") conn.execute("UPDATE tasks SET parent_task_id = 'P1-CYC-A' WHERE id = 'P1-CYC-B'") conn.commit() result_a = models.has_open_children(conn, "P1-CYC-A") result_b = models.has_open_children(conn, "P1-CYC-B") assert result_a is False assert result_b is False def test_circular_reference_protection_check_parent_completion_returns_without_error(conn): """KIN-UI-018 (decision #817): _check_parent_completion не падает и не зависает при цикле A→B→A. Задачи A и B в статусе 'revising' с взаимными parent_task_id. Ожидаемый результат: возврат без RecursionError, статус задач не изменился. """ models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-CPC-A", "p1", "Task A", status="revising") models.create_task(conn, "P1-CPC-B", "p1", "Task B", status="revising") # Устанавливаем цикл напрямую в БД conn.execute("UPDATE tasks SET parent_task_id = 'P1-CPC-B' WHERE id = 'P1-CPC-A'") conn.execute("UPDATE tasks SET parent_task_id = 'P1-CPC-A' WHERE id = 'P1-CPC-B'") conn.commit() # Не должно бросить RecursionError или зависнуть models._check_parent_completion(conn, "P1-CPC-A") models._check_parent_completion(conn, "P1-CPC-B") # Статусы не изменились — цикл обнаружен и прерван task_a = models.get_task(conn, "P1-CPC-A") task_b = models.get_task(conn, "P1-CPC-B") assert task_a["status"] == "revising" assert task_b["status"] == "revising" # --------------------------------------------------------------------------- # KIN-UI-020: VALID_TASK_STATUSES — frozenset membership checks # --------------------------------------------------------------------------- def test_valid_task_statuses_is_frozenset(): """KIN-UI-020: VALID_TASK_STATUSES должен быть frozenset, не list.""" assert isinstance(models.VALID_TASK_STATUSES, frozenset) @pytest.mark.parametrize("status", [ "pending", "in_progress", "review", "done", "blocked", "decomposed", "cancelled", "revising", ]) def test_valid_task_statuses_membership(status): """KIN-UI-020: каждый валидный статус присутствует в VALID_TASK_STATUSES (membership check).""" assert status in models.VALID_TASK_STATUSES def test_invalid_status_not_in_valid_task_statuses(): """KIN-UI-020: невалидный статус отсутствует в VALID_TASK_STATUSES.""" assert "invalid_status" not in models.VALID_TASK_STATUSES assert "" not in models.VALID_TASK_STATUSES assert "active" not in models.VALID_TASK_STATUSES # --------------------------------------------------------------------------- # KIN-UI-023: _check_parent_completion — атомарность каскада # --------------------------------------------------------------------------- def test_check_parent_completion_happy_path_full_cascade(conn): """KIN-UI-023 (happy path): grandchild→child→parent→grandparent — все переходят в done атомарно. Иерархия: P1-GRAND (revising) → P1-PAR (revising) → P1-CHILD (revising) → P1-GC (done). После _check_parent_completion(P1-GC) все три предка должны стать done за одну транзакцию. """ models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-GRAND", "p1", "Grandparent", status="revising") models.create_task(conn, "P1-PAR", "p1", "Parent", status="revising", parent_task_id="P1-GRAND") models.create_task(conn, "P1-CHILD", "p1", "Child", status="revising", parent_task_id="P1-PAR") models.create_task(conn, "P1-GC", "p1", "Grandchild", status="done", parent_task_id="P1-CHILD") conn.commit() models._check_parent_completion(conn, "P1-GC") child = models.get_task(conn, "P1-CHILD") parent = models.get_task(conn, "P1-PAR") grandparent = models.get_task(conn, "P1-GRAND") assert child["status"] == "done" assert parent["status"] == "done" assert grandparent["status"] == "done" assert child["completed_at"] is not None assert parent["completed_at"] is not None assert grandparent["completed_at"] is not None def test_check_parent_completion_rollback_on_mid_cascade_exception(conn): """KIN-UI-023 (atomicity): исключение в середине каскада откатывает все изменения. Иерархия: P1-GRAND (revising) → P1-PAR (revising) → P1-CHILD (revising) → P1-GC (done). Инжектируем RuntimeError на 2-м вызове has_open_children (при проверке P1-PAR). К тому моменту P1-CHILD уже получил UPDATE→done в pending-транзакции. После rollback ни один из предков не должен остаться в состоянии done. """ models.create_project(conn, "p1", "P1", "/p1") models.create_task(conn, "P1-GRAND", "p1", "Grandparent", status="revising") models.create_task(conn, "P1-PAR", "p1", "Parent", status="revising", parent_task_id="P1-GRAND") models.create_task(conn, "P1-CHILD", "p1", "Child", status="revising", parent_task_id="P1-PAR") models.create_task(conn, "P1-GC", "p1", "Grandchild", status="done", parent_task_id="P1-CHILD") conn.commit() call_count = [0] original_has_open_children = models.has_open_children def failing_has_open_children(c, tid, visited=None): call_count[0] += 1 if call_count[0] >= 2: raise RuntimeError("Simulated mid-cascade failure") return original_has_open_children(c, tid, visited) with patch.object(models, "has_open_children", side_effect=failing_has_open_children): with pytest.raises(RuntimeError, match="Simulated mid-cascade failure"): models._check_parent_completion(conn, "P1-GC") # После rollback все предки должны оставаться в revising — БД не в промежуточном состоянии child = models.get_task(conn, "P1-CHILD") parent = models.get_task(conn, "P1-PAR") grandparent = models.get_task(conn, "P1-GRAND") assert child["status"] == "revising", "P1-CHILD должен остаться revising после rollback" assert parent["status"] == "revising", "P1-PAR должен остаться revising после rollback" assert grandparent["status"] == "revising", "P1-GRAND должен остаться revising после rollback" # --------------------------------------------------------------------------- # Регрессия: get_decisions — edge case с пустым массивом [] # Баг: `if types:` и `if tags:` не различают None (нет фильтра) и [] (пустой фильтр). # Пустой список фальшивый в Python → фильтр не применяется → возвращаются ВСЕ записи. # Ожидаемое поведение: types=[] и tags=[] должны возвращать 0 результатов. # --------------------------------------------------------------------------- def test_get_decisions_empty_types_returns_no_results(conn): """Регрессия: types=[] должен вернуть 0 решений, а не все. Баг: `if types:` фальшивый для [] → фильтр не применяется → возвращаются все решения. Правильное поведение: пустой список типов означает «ни один тип не подходит» → 0 результатов. """ models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "decision", "Решение A", "описание") models.add_decision(conn, "p1", "gotcha", "Ловушка B", "описание") result = models.get_decisions(conn, "p1", types=[]) assert result == [], ( f"types=[] должен вернуть [], получено {len(result)} записей — " "регрессия: `if types:` не различает None и []" ) def test_get_decisions_empty_tags_returns_no_results(conn): """Регрессия: tags=[] должен вернуть 0 решений, а не все. Баг: `if tags:` фальшивый для [] → фильтр не применяется → возвращаются все решения. Правильное поведение: пустой список тегов означает «ни один тег не подходит» → 0 результатов. """ models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "gotcha", "Ловушка 1", "desc", tags=["safari", "css"]) models.add_decision(conn, "p1", "gotcha", "Ловушка 2", "desc", tags=["chrome"]) result = models.get_decisions(conn, "p1", tags=[]) assert result == [], ( f"tags=[] должен вернуть [], получено {len(result)} записей — " "регрессия: `if tags:` не различает None и []" ) def test_get_decisions_none_types_returns_all(conn): """types=None не должен фильтровать — возвращает все решения (нет фильтра).""" models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "decision", "Решение A", "описание") models.add_decision(conn, "p1", "gotcha", "Ловушка B", "описание") result = models.get_decisions(conn, "p1", types=None) assert len(result) == 2, "types=None должен вернуть все 2 решения" def test_get_decisions_none_tags_returns_all(conn): """tags=None не должен фильтровать — возвращает все решения (нет фильтра).""" models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "gotcha", "Ловушка 1", "desc", tags=["safari"]) models.add_decision(conn, "p1", "gotcha", "Ловушка 2", "desc", tags=["chrome"]) result = models.get_decisions(conn, "p1", tags=None) assert len(result) == 2, "tags=None должен вернуть все 2 решения" def test_get_decisions_empty_types_differs_from_none(conn): """types=[] (пустой фильтр → 0 результатов) семантически отличается от types=None (нет фильтра → все).""" models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "decision", "Решение A", "описание") models.add_decision(conn, "p1", "gotcha", "Ловушка B", "описание") result_empty = models.get_decisions(conn, "p1", types=[]) result_none = models.get_decisions(conn, "p1", types=None) assert len(result_empty) == 0, "types=[] должен давать 0 результатов" assert len(result_none) == 2, "types=None должен давать все 2 результата" def test_get_decisions_empty_tags_differs_from_none(conn): """tags=[] (пустой фильтр → 0 результатов) семантически отличается от tags=None (нет фильтра → все).""" models.create_project(conn, "p1", "P1", "/p1") models.add_decision(conn, "p1", "gotcha", "Ловушка 1", "desc", tags=["safari"]) models.add_decision(conn, "p1", "gotcha", "Ловушка 2", "desc", tags=["chrome"]) result_empty = models.get_decisions(conn, "p1", tags=[]) result_none = models.get_decisions(conn, "p1", tags=None) assert len(result_empty) == 0, "tags=[] должен давать 0 результатов" assert len(result_none) == 2, "tags=None должен давать все 2 результата"