kin: KIN-021 Аудит-лог для --dangerously-skip-permissions в auto mode

This commit is contained in:
Gros Frumos 2026-03-16 07:13:32 +02:00
parent 67071c757d
commit a0b0976d8d
16 changed files with 1477 additions and 14 deletions

View file

@ -608,6 +608,42 @@ def test_run_kin_040_allow_write_true_ignored(client):
# KIN-058 — регрессионный тест: stderr=DEVNULL у Popen в web API
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# KIN-020 — manual_escalation задачи: PATCH status='done' резолвит задачу
# ---------------------------------------------------------------------------
def test_patch_manual_escalation_task_to_done(client):
"""PATCH status='done' на manual_escalation задаче — статус обновляется — KIN-020."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.create_task(conn, "P1-002", "p1", "Fix .dockerignore manually",
brief={"task_type": "manual_escalation",
"source": "followup:P1-001",
"description": "Ручное применение .dockerignore"})
conn.close()
r = client.patch("/api/tasks/P1-002", json={"status": "done"})
assert r.status_code == 200
assert r.json()["status"] == "done"
def test_manual_escalation_task_brief_preserved_after_patch(client):
"""PATCH не затирает brief.task_type — поле manual_escalation сохраняется — KIN-020."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.create_task(conn, "P1-002", "p1", "Fix manually",
brief={"task_type": "manual_escalation",
"source": "followup:P1-001"})
conn.close()
client.patch("/api/tasks/P1-002", json={"status": "done"})
r = client.get("/api/tasks/P1-002")
assert r.status_code == 200
assert r.json()["brief"]["task_type"] == "manual_escalation"
def test_run_sets_stderr_devnull(client):
"""Регрессионный тест KIN-058: stderr=DEVNULL всегда устанавливается в Popen,
чтобы stderr дочернего процесса не загрязнял логи uvicorn."""
@ -626,3 +662,186 @@ def test_run_sets_stderr_devnull(client):
"Регрессия KIN-058: stderr у Popen должен быть DEVNULL, "
"иначе вывод агента попадает в логи uvicorn"
)
# ---------------------------------------------------------------------------
# KIN-065 — PATCH /api/projects/{id} — autocommit_enabled toggle
# ---------------------------------------------------------------------------
def test_patch_project_autocommit_enabled_true(client):
"""PATCH с autocommit_enabled=true → 200, поле установлено в 1."""
r = client.patch("/api/projects/p1", json={"autocommit_enabled": True})
assert r.status_code == 200
assert r.json()["autocommit_enabled"] == 1
def test_patch_project_autocommit_enabled_false(client):
"""После включения PATCH с autocommit_enabled=false → 200, поле установлено в 0."""
client.patch("/api/projects/p1", json={"autocommit_enabled": True})
r = client.patch("/api/projects/p1", json={"autocommit_enabled": False})
assert r.status_code == 200
assert r.json()["autocommit_enabled"] == 0
def test_patch_project_autocommit_persisted_via_sql(client):
"""После PATCH autocommit_enabled=True прямой SQL подтверждает значение 1."""
client.patch("/api/projects/p1", json={"autocommit_enabled": True})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT autocommit_enabled FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row is not None
assert row[0] == 1
def test_patch_project_autocommit_false_persisted_via_sql(client):
"""После PATCH autocommit_enabled=False прямой SQL подтверждает значение 0."""
client.patch("/api/projects/p1", json={"autocommit_enabled": True})
client.patch("/api/projects/p1", json={"autocommit_enabled": False})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT autocommit_enabled FROM projects WHERE id = 'p1'").fetchone()
conn.close()
assert row is not None
assert row[0] == 0
def test_patch_project_autocommit_null_before_first_update(client):
"""Новый проект имеет autocommit_enabled=NULL/0 (falsy) до первого обновления."""
client.post("/api/projects", json={"id": "p_new", "name": "New", "path": "/new"})
from core.db import init_db
conn = init_db(api_module.DB_PATH)
row = conn.execute("SELECT autocommit_enabled FROM projects WHERE id = 'p_new'").fetchone()
conn.close()
assert row is not None
assert not row[0] # DEFAULT 0 или NULL — в любом случае falsy
def test_patch_project_empty_body_returns_400(client):
"""PATCH проекта без полей → 400."""
r = client.patch("/api/projects/p1", json={})
assert r.status_code == 400
def test_patch_project_not_found(client):
"""PATCH несуществующего проекта → 404."""
r = client.patch("/api/projects/NOPE", json={"autocommit_enabled": True})
assert r.status_code == 404
def test_patch_project_autocommit_and_execution_mode_together(client):
"""PATCH с autocommit_enabled и execution_mode → оба поля обновлены."""
r = client.patch("/api/projects/p1", json={
"autocommit_enabled": True,
"execution_mode": "auto_complete",
})
assert r.status_code == 200
data = r.json()
assert data["autocommit_enabled"] == 1
assert data["execution_mode"] == "auto_complete"
def test_patch_project_returns_full_project_object(client):
"""PATCH возвращает полный объект проекта с id, name и autocommit_enabled."""
r = client.patch("/api/projects/p1", json={"autocommit_enabled": True})
assert r.status_code == 200
data = r.json()
assert data["id"] == "p1"
assert data["name"] == "P1"
assert "autocommit_enabled" in data
# ---------------------------------------------------------------------------
# KIN-008 — PATCH priority и route_type задачи
# ---------------------------------------------------------------------------
def test_patch_task_priority(client):
"""PATCH priority задачи обновляет поле и возвращает задачу."""
r = client.patch("/api/tasks/P1-001", json={"priority": 3})
assert r.status_code == 200
assert r.json()["priority"] == 3
def test_patch_task_priority_persisted(client):
"""После PATCH priority повторный GET возвращает новое значение."""
client.patch("/api/tasks/P1-001", json={"priority": 7})
r = client.get("/api/tasks/P1-001")
assert r.status_code == 200
assert r.json()["priority"] == 7
def test_patch_task_priority_invalid_zero(client):
"""PATCH с priority=0 → 400."""
r = client.patch("/api/tasks/P1-001", json={"priority": 0})
assert r.status_code == 400
def test_patch_task_priority_invalid_eleven(client):
"""PATCH с priority=11 → 400."""
r = client.patch("/api/tasks/P1-001", json={"priority": 11})
assert r.status_code == 400
def test_patch_task_route_type_set(client):
"""PATCH route_type сохраняет значение в brief."""
r = client.patch("/api/tasks/P1-001", json={"route_type": "feature"})
assert r.status_code == 200
data = r.json()
assert data["brief"]["route_type"] == "feature"
def test_patch_task_route_type_all_valid(client):
"""Все допустимые route_type принимаются."""
for rt in ("debug", "feature", "refactor", "hotfix"):
r = client.patch("/api/tasks/P1-001", json={"route_type": rt})
assert r.status_code == 200, f"route_type={rt} rejected"
assert r.json()["brief"]["route_type"] == rt
def test_patch_task_route_type_invalid(client):
"""Недопустимый route_type → 400."""
r = client.patch("/api/tasks/P1-001", json={"route_type": "unknown"})
assert r.status_code == 400
def test_patch_task_route_type_clear(client):
"""PATCH route_type='' очищает поле из brief."""
client.patch("/api/tasks/P1-001", json={"route_type": "debug"})
r = client.patch("/api/tasks/P1-001", json={"route_type": ""})
assert r.status_code == 200
data = r.json()
brief = data.get("brief")
if brief:
assert "route_type" not in brief
def test_patch_task_route_type_merges_brief(client):
"""route_type сохраняется вместе с другими полями brief без перезаписи."""
from core.db import init_db
from core import models
conn = init_db(api_module.DB_PATH)
models.update_task(conn, "P1-001", brief={"extra": "data"})
conn.close()
r = client.patch("/api/tasks/P1-001", json={"route_type": "hotfix"})
assert r.status_code == 200
brief = r.json()["brief"]
assert brief["route_type"] == "hotfix"
assert brief["extra"] == "data"
def test_patch_task_priority_and_route_type_together(client):
"""PATCH может обновить priority и route_type одновременно."""
r = client.patch("/api/tasks/P1-001", json={"priority": 2, "route_type": "refactor"})
assert r.status_code == 200
data = r.json()
assert data["priority"] == 2
assert data["brief"]["route_type"] == "refactor"
def test_patch_task_empty_body_still_returns_400(client):
"""Пустое тело по-прежнему возвращает 400 (регрессия KIN-008)."""
r = client.patch("/api/tasks/P1-001", json={})
assert r.status_code == 400

View file

@ -219,6 +219,35 @@ class TestResolvePendingAction:
# _run_claude with allow_write=True
assert result["rerun_result"]["success"] is True
def test_manual_task_brief_has_task_type_manual_escalation(self, conn):
"""brief["task_type"] должен быть 'manual_escalation' — KIN-020."""
action = {
"type": "permission_fix",
"original_item": {"title": "Fix .dockerignore", "type": "hotfix",
"priority": 3, "brief": "Create .dockerignore"},
}
result = resolve_pending_action(conn, "VDOL-001", action, "manual_task")
assert result is not None
assert result["brief"]["task_type"] == "manual_escalation"
def test_manual_task_brief_includes_source(self, conn):
"""brief["source"] должен содержать ссылку на родительскую задачу — KIN-020."""
action = {
"type": "permission_fix",
"original_item": {"title": "Fix X"},
}
result = resolve_pending_action(conn, "VDOL-001", action, "manual_task")
assert result["brief"]["source"] == "followup:VDOL-001"
def test_manual_task_brief_includes_description(self, conn):
"""brief["description"] копируется из original_item.brief — KIN-020."""
action = {
"type": "permission_fix",
"original_item": {"title": "Fix Y", "brief": "Detailed context here"},
}
result = resolve_pending_action(conn, "VDOL-001", action, "manual_task")
assert result["brief"]["description"] == "Detailed context here"
def test_nonexistent_task(self, conn):
action = {"type": "permission_fix", "original_item": {}}
assert resolve_pending_action(conn, "NOPE", action, "skip") is None
@ -261,6 +290,22 @@ class TestAutoResolvePendingActions:
tasks = models.list_tasks(conn, project_id="vdol")
assert len(tasks) == 2 # VDOL-001 + новая manual task
@patch("agents.runner._run_claude")
def test_escalated_manual_task_has_task_type_manual_escalation(self, mock_claude, conn):
"""При эскалации после провала rerun созданная задача имеет task_type='manual_escalation' — KIN-020."""
mock_claude.return_value = {"output": "", "returncode": 1}
action = {
"type": "permission_fix",
"description": "Fix X",
"original_item": {"title": "Fix X", "type": "frontend_dev", "brief": "Apply fix"},
"options": ["rerun", "manual_task", "skip"],
}
results = auto_resolve_pending_actions(conn, "VDOL-001", [action])
assert results[0]["resolved"] == "manual_task"
created_task = results[0]["result"]
assert created_task["brief"]["task_type"] == "manual_escalation"
@patch("agents.runner._run_claude")
def test_empty_pending_actions(self, mock_claude, conn):
"""Пустой список — пустой результат."""

186
tests/test_obsidian_sync.py Normal file
View file

@ -0,0 +1,186 @@
"""Tests for core/obsidian_sync.py — KIN-013."""
import sqlite3
import tempfile
from pathlib import Path
import pytest
from core.db import init_db
from core.obsidian_sync import (
export_decisions_to_md,
parse_task_checkboxes,
sync_obsidian,
)
from core import models
@pytest.fixture
def tmp_vault(tmp_path):
"""Returns a temporary vault root directory."""
return tmp_path / "vault"
@pytest.fixture
def db(tmp_path):
"""Returns an in-memory SQLite connection with schema + test data."""
db_path = tmp_path / "test.db"
conn = init_db(db_path)
models.create_project(conn, "proj1", "Test Project", "/tmp/proj1")
yield conn
conn.close()
# ---------------------------------------------------------------------------
# 1. export creates files with correct frontmatter
# ---------------------------------------------------------------------------
def test_export_decisions_creates_md_files(tmp_vault):
decisions = [
{
"id": 42,
"project_id": "proj1",
"type": "gotcha",
"category": "testing",
"title": "Proxy через SSH не работает без ssh-agent",
"description": "При подключении через ProxyJump ssh-agent должен быть запущен.",
"tags": ["testing", "mock", "subprocess"],
"created_at": "2026-03-10T12:00:00",
}
]
tmp_vault.mkdir(parents=True)
created = export_decisions_to_md("proj1", decisions, tmp_vault)
assert len(created) == 1
md_file = created[0]
assert md_file.exists()
content = md_file.read_text(encoding="utf-8")
assert "kin_decision_id: 42" in content
assert "project: proj1" in content
assert "type: gotcha" in content
assert "category: testing" in content
assert "2026-03-10" in content
assert "# Proxy через SSH не работает без ssh-agent" in content
assert "При подключении через ProxyJump" in content
# ---------------------------------------------------------------------------
# 2. export is idempotent (overwrite, not duplicate)
# ---------------------------------------------------------------------------
def test_export_idempotent(tmp_vault):
decisions = [
{
"id": 1,
"project_id": "p",
"type": "decision",
"category": None,
"title": "Use SQLite",
"description": "SQLite is the source of truth.",
"tags": [],
"created_at": "2026-01-01",
}
]
tmp_vault.mkdir(parents=True)
export_decisions_to_md("p", decisions, tmp_vault)
export_decisions_to_md("p", decisions, tmp_vault)
out_dir = tmp_vault / "p" / "decisions"
files = list(out_dir.glob("*.md"))
assert len(files) == 1
# ---------------------------------------------------------------------------
# 3. parse_task_checkboxes — done checkbox
# ---------------------------------------------------------------------------
def test_parse_task_checkboxes_done(tmp_vault):
tasks_dir = tmp_vault / "proj1" / "tasks"
tasks_dir.mkdir(parents=True)
(tasks_dir / "kanban.md").write_text(
"- [x] KIN-001 Implement login\n- [ ] KIN-002 Add tests\n",
encoding="utf-8",
)
results = parse_task_checkboxes(tmp_vault, "proj1")
done_items = [r for r in results if r["task_id"] == "KIN-001"]
assert len(done_items) == 1
assert done_items[0]["done"] is True
assert done_items[0]["title"] == "Implement login"
# ---------------------------------------------------------------------------
# 4. parse_task_checkboxes — pending checkbox
# ---------------------------------------------------------------------------
def test_parse_task_checkboxes_pending(tmp_vault):
tasks_dir = tmp_vault / "proj1" / "tasks"
tasks_dir.mkdir(parents=True)
(tasks_dir / "kanban.md").write_text(
"- [ ] KIN-002 Add tests\n",
encoding="utf-8",
)
results = parse_task_checkboxes(tmp_vault, "proj1")
pending = [r for r in results if r["task_id"] == "KIN-002"]
assert len(pending) == 1
assert pending[0]["done"] is False
# ---------------------------------------------------------------------------
# 5. parse_task_checkboxes — lines without task ID are skipped
# ---------------------------------------------------------------------------
def test_parse_task_checkboxes_no_id(tmp_vault):
tasks_dir = tmp_vault / "proj1" / "tasks"
tasks_dir.mkdir(parents=True)
(tasks_dir / "notes.md").write_text(
"- [x] Some task without ID\n"
"- [ ] Another line without identifier\n"
"- [x] KIN-003 With ID\n",
encoding="utf-8",
)
results = parse_task_checkboxes(tmp_vault, "proj1")
assert all(r["task_id"].startswith("KIN-") for r in results)
assert len(results) == 1
assert results[0]["task_id"] == "KIN-003"
# ---------------------------------------------------------------------------
# 6. sync_obsidian updates task status when done=True
# ---------------------------------------------------------------------------
def test_sync_updates_task_status(db, tmp_vault):
tmp_vault.mkdir(parents=True)
models.update_project(db, "proj1", obsidian_vault_path=str(tmp_vault))
task = models.create_task(db, "proj1-001", "proj1", "Do something", status="in_progress")
assert task["status"] == "in_progress"
# Write checkbox file
tasks_dir = tmp_vault / "proj1" / "tasks"
tasks_dir.mkdir(parents=True)
(tasks_dir / "sprint.md").write_text(
"- [x] proj1-001 Do something\n",
encoding="utf-8",
)
result = sync_obsidian(db, "proj1")
assert result["tasks_updated"] == 1
assert not result["errors"]
updated = models.get_task(db, "proj1-001")
assert updated["status"] == "done"
# ---------------------------------------------------------------------------
# 7. sync_obsidian raises ValueError when vault_path not set
# ---------------------------------------------------------------------------
def test_sync_no_vault_path(db):
# project exists but obsidian_vault_path is NULL
with pytest.raises(ValueError, match="obsidian_vault_path not set"):
sync_obsidian(db, "proj1")

View file

@ -1557,3 +1557,110 @@ class TestReviewModeExecutionMode:
assert row["execution_mode"] == "review", (
"Регрессия KIN-055: execution_mode должен быть 'review' в SQLite после pipeline"
)
# ---------------------------------------------------------------------------
# KIN-021: Audit log for --dangerously-skip-permissions
# ---------------------------------------------------------------------------
class TestAuditLogDangerousSkip:
@patch("agents.runner._run_autocommit")
@patch("agents.runner._run_learning_extraction")
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_audit_log_written_on_permission_retry(
self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn
):
"""При retry с --dangerously-skip-permissions записывается событие в audit_log."""
permission_fail = _mock_claude_failure("permission denied: cannot write file")
retry_success = _mock_claude_success({"result": "fixed"})
mock_run.side_effect = [permission_fail, retry_success]
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
mock_learn.return_value = {"added": 0, "skipped": 0}
models.update_project(conn, "vdol", execution_mode="auto_complete")
steps = [{"role": "debugger", "brief": "find"}]
result = run_pipeline(conn, "VDOL-001", steps)
assert result["success"] is True
# Проверяем audit_log через прямой SQL
rows = conn.execute(
"SELECT * FROM audit_log WHERE task_id='VDOL-001'"
).fetchall()
assert len(rows) == 1
assert rows[0]["event_type"] == "dangerous_skip"
assert rows[0]["step_id"] == "debugger"
assert "debugger" in rows[0]["reason"]
@patch("agents.runner._run_autocommit")
@patch("agents.runner._run_learning_extraction")
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_dangerously_skipped_flag_set_on_task(
self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn
):
"""tasks.dangerously_skipped=1 после retry с --dangerously-skip-permissions."""
permission_fail = _mock_claude_failure("permission denied: cannot write file")
retry_success = _mock_claude_success({"result": "fixed"})
mock_run.side_effect = [permission_fail, retry_success]
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
mock_learn.return_value = {"added": 0, "skipped": 0}
models.update_project(conn, "vdol", execution_mode="auto_complete")
steps = [{"role": "debugger", "brief": "find"}]
run_pipeline(conn, "VDOL-001", steps)
# Верификация через прямой SQL (минуя ORM)
row = conn.execute(
"SELECT dangerously_skipped FROM tasks WHERE id='VDOL-001'"
).fetchone()
assert row is not None
assert row["dangerously_skipped"] == 1
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_no_audit_log_in_review_mode(self, mock_run, mock_hooks, conn):
"""В review mode retry не происходит, audit_log остаётся пустым."""
permission_fail = _mock_claude_failure("permission denied: cannot write file")
mock_run.return_value = permission_fail
mock_hooks.return_value = []
steps = [{"role": "debugger", "brief": "find"}]
result = run_pipeline(conn, "VDOL-001", steps)
assert result["success"] is False
rows = conn.execute(
"SELECT * FROM audit_log WHERE task_id='VDOL-001'"
).fetchall()
assert len(rows) == 0
@patch("agents.runner._run_autocommit")
@patch("agents.runner._run_learning_extraction")
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_audit_log_no_entry_on_normal_success(
self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn
):
"""При успешном выполнении без retry audit_log не записывается."""
mock_run.return_value = _mock_claude_success({"result": "done"})
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
mock_learn.return_value = {"added": 0, "skipped": 0}
models.update_project(conn, "vdol", execution_mode="auto_complete")
steps = [{"role": "tester", "brief": "test"}]
result = run_pipeline(conn, "VDOL-001", steps)
assert result["success"] is True
rows = conn.execute(
"SELECT * FROM audit_log WHERE task_id='VDOL-001'"
).fetchall()
assert len(rows) == 0