"""Tests for agents/runner.py — agent execution with mocked claude CLI.""" import json import subprocess import pytest from unittest.mock import patch, MagicMock from core.db import init_db from core import models from agents.runner import ( run_agent, run_pipeline, run_audit, _try_parse_json, _run_learning_extraction, _build_claude_env, _resolve_claude_cmd, _EXTRA_PATH_DIRS, _run_autocommit, _parse_agent_blocked, _get_changed_files, _save_sysadmin_output, check_claude_auth, ClaudeAuthError, _MODEL_TIMEOUTS, _detect_destructive_operations, ) @pytest.fixture def conn(): c = init_db(":memory:") models.create_project(c, "vdol", "ВДОЛЬ", "~/projects/vdolipoperek", tech_stack=["vue3"]) models.create_task(c, "VDOL-001", "vdol", "Fix bug", brief={"route_type": "debug"}) yield c c.close() def _mock_claude_success(output_data): """Create a mock subprocess result with successful claude output.""" mock = MagicMock() mock.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data mock.stderr = "" mock.returncode = 0 return mock def _mock_claude_failure(error_msg): mock = MagicMock() mock.stdout = "" mock.stderr = error_msg mock.returncode = 1 return mock # --------------------------------------------------------------------------- # run_agent # --------------------------------------------------------------------------- class TestRunAgent: @patch("agents.runner.subprocess.run") def test_successful_agent_run(self, mock_run, conn): mock_run.return_value = _mock_claude_success({ "result": "Found race condition in useSearch.ts", "usage": {"total_tokens": 5000}, "cost_usd": 0.015, }) result = run_agent(conn, "debugger", "VDOL-001", "vdol") assert result["success"] is True assert result["role"] == "debugger" assert result["model"] == "sonnet" assert result["duration_seconds"] >= 0 # Verify claude was called with right args call_args = mock_run.call_args cmd = call_args[0][0] assert "claude" in cmd[0] assert "-p" in cmd assert "--output-format" in cmd assert "json" in cmd @patch("agents.runner.subprocess.run") def test_failed_agent_run(self, mock_run, conn): mock_run.return_value = _mock_claude_failure("API error") result = run_agent(conn, "debugger", "VDOL-001", "vdol") assert result["success"] is False # Should be logged in agent_logs logs = conn.execute("SELECT * FROM agent_logs WHERE task_id='VDOL-001'").fetchall() assert len(logs) == 1 assert logs[0]["success"] == 0 def test_dry_run_returns_prompt(self, conn): result = run_agent(conn, "debugger", "VDOL-001", "vdol", dry_run=True) assert result["dry_run"] is True assert result["prompt"] is not None assert "VDOL-001" in result["prompt"] assert result["output"] is None @patch("agents.runner.subprocess.run") def test_agent_logs_to_db(self, mock_run, conn): mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "tester", "VDOL-001", "vdol") logs = conn.execute("SELECT * FROM agent_logs WHERE agent_role='tester'").fetchall() assert len(logs) == 1 assert logs[0]["project_id"] == "vdol" @patch("agents.runner.subprocess.run") def test_full_output_saved_to_db(self, mock_run, conn): """Bug fix: output_summary must contain the FULL output, not truncated.""" long_json = json.dumps({ "result": json.dumps({ "summary": "Security audit complete", "findings": [{"title": f"Finding {i}", "severity": "HIGH"} for i in range(50)], }), }) mock = MagicMock() mock.stdout = long_json mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock run_agent(conn, "security", "VDOL-001", "vdol") logs = conn.execute("SELECT output_summary FROM agent_logs WHERE agent_role='security'").fetchall() assert len(logs) == 1 output = logs[0]["output_summary"] assert output is not None assert len(output) > 1000 # Must not be truncated # Should contain all 50 findings assert "Finding 49" in output @patch("agents.runner.subprocess.run") def test_dict_output_saved_as_json_string(self, mock_run, conn): """When claude returns structured JSON, it must be saved as string.""" mock_run.return_value = _mock_claude_success({ "result": {"status": "ok", "files": ["a.py", "b.py"]}, }) result = run_agent(conn, "debugger", "VDOL-001", "vdol") # output should be a string (JSON serialized), not a dict assert isinstance(result["raw_output"], str) logs = conn.execute("SELECT output_summary FROM agent_logs WHERE agent_role='debugger'").fetchall() saved = logs[0]["output_summary"] assert isinstance(saved, str) assert "a.py" in saved @patch("agents.runner.subprocess.run") def test_previous_output_passed(self, mock_run, conn): mock_run.return_value = _mock_claude_success({"result": "tests pass"}) run_agent(conn, "tester", "VDOL-001", "vdol", previous_output="Found bug in line 42") call_args = mock_run.call_args prompt = call_args[0][0][2] # -p argument assert "line 42" in prompt # --------------------------------------------------------------------------- # run_pipeline # --------------------------------------------------------------------------- class TestRunPipeline: @patch("agents.runner._run_autocommit") # gotcha #41: мокируем в тестах не о autocommit @patch("agents.runner.subprocess.run") def test_successful_pipeline(self, mock_run, mock_autocommit, conn): mock_run.return_value = _mock_claude_success({"result": "done"}) steps = [ {"role": "debugger", "brief": "find bug"}, {"role": "tester", "depends_on": "debugger", "brief": "verify"}, ] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True assert result["steps_completed"] == 2 assert len(result["results"]) == 2 # Pipeline created in DB pipe = conn.execute("SELECT * FROM pipelines WHERE task_id='VDOL-001'").fetchone() assert pipe is not None assert pipe["status"] == "completed" # Task updated to review task = models.get_task(conn, "VDOL-001") assert task["status"] == "review" @patch("agents.runner.subprocess.run") def test_pipeline_fails_on_step(self, mock_run, conn): # First step succeeds, second fails mock_run.side_effect = [ _mock_claude_success({"result": "found bug"}), _mock_claude_failure("compilation error"), ] steps = [ {"role": "debugger", "brief": "find"}, {"role": "frontend_dev", "brief": "fix"}, {"role": "tester", "brief": "test"}, ] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False assert result["steps_completed"] == 1 # Only debugger completed assert "frontend_dev" in result["error"] # Pipeline marked as failed pipe = conn.execute("SELECT * FROM pipelines WHERE task_id='VDOL-001'").fetchone() assert pipe["status"] == "failed" # Task marked as blocked task = models.get_task(conn, "VDOL-001") assert task["status"] == "blocked" def test_pipeline_dry_run(self, conn): steps = [ {"role": "debugger", "brief": "find"}, {"role": "tester", "brief": "verify"}, ] result = run_pipeline(conn, "VDOL-001", steps, dry_run=True) assert result["dry_run"] is True assert result["success"] is True assert result["steps_completed"] == 2 # No pipeline created in DB pipes = conn.execute("SELECT * FROM pipelines").fetchall() assert len(pipes) == 0 @patch("agents.runner.subprocess.run") def test_pipeline_chains_output(self, mock_run, conn): """Output from step N is passed as previous_output to step N+1.""" call_count = [0] def side_effect(*args, **kwargs): call_count[0] += 1 if call_count[0] == 1: return _mock_claude_success({"result": "bug is in line 42"}) return _mock_claude_success({"result": "test written"}) mock_run.side_effect = side_effect steps = [ {"role": "debugger", "brief": "find"}, {"role": "tester", "brief": "write test"}, ] run_pipeline(conn, "VDOL-001", steps) # Second call should include first step's output in prompt second_call = mock_run.call_args_list[1] prompt = second_call[0][0][2] # -p argument assert "line 42" in prompt or "bug" in prompt def test_pipeline_task_not_found(self, conn): result = run_pipeline(conn, "NONEXISTENT", [{"role": "debugger"}]) assert result["success"] is False assert "not found" in result["error"] @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_hooks_called_after_successful_pipeline(self, mock_run, mock_hooks, conn): mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True mock_hooks.assert_called_once() call_kwargs = mock_hooks.call_args assert call_kwargs[1].get("event") == "pipeline_completed" or \ call_kwargs[0][3] == "pipeline_completed" @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_hooks_not_called_on_failed_pipeline(self, mock_run, mock_hooks, conn): mock_run.return_value = _mock_claude_failure("compilation error") mock_hooks.return_value = [] steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False mock_hooks.assert_not_called() @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_hook_failure_does_not_affect_pipeline_result(self, mock_run, mock_hooks, conn): mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.side_effect = Exception("hook exploded") steps = [{"role": "debugger", "brief": "find"}] # Must not raise — hook failures must not propagate result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True # --------------------------------------------------------------------------- # Auto mode # --------------------------------------------------------------------------- class TestAutoMode: @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_auto_mode_generates_followups(self, mock_run, mock_hooks, mock_followup, conn): """Auto_complete mode должен вызывать generate_followups (последний шаг — tester).""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "debugger", "brief": "find"}, {"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True mock_followup.assert_called_once_with(conn, "VDOL-001") task = models.get_task(conn, "VDOL-001") assert task["status"] == "done" @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_review_mode_skips_followups(self, mock_run, mock_hooks, mock_followup, conn): """Review mode НЕ должен вызывать generate_followups автоматически.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} # Проект остаётся в default "review" mode steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True mock_followup.assert_not_called() task = models.get_task(conn, "VDOL-001") assert task["status"] == "review" @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_auto_mode_skips_followups_for_followup_tasks(self, mock_run, mock_hooks, mock_followup, conn): """Auto_complete mode НЕ должен генерировать followups для followup-задач (предотвращение рекурсии).""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} models.update_project(conn, "vdol", execution_mode="auto_complete") models.update_task(conn, "VDOL-001", brief={"source": "followup:VDOL-000"}) steps = [{"role": "debugger", "brief": "find"}, {"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True mock_followup.assert_not_called() @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_auto_mode_fires_task_done_event(self, mock_run, mock_hooks, mock_followup, conn): """Auto_complete mode должен вызывать run_hooks с event='task_done' (последний шаг — tester).""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "debugger", "brief": "find"}, {"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True events_fired = [call[1].get("event") or call[0][3] for call in mock_hooks.call_args_list] assert "task_done" in events_fired @patch("core.followup.auto_resolve_pending_actions") @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_auto_mode_resolves_pending_actions(self, mock_run, mock_hooks, mock_followup, mock_resolve, conn): """Auto_complete mode должен авто-резолвить pending_actions (последний шаг — tester).""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] pending = [{"type": "permission_fix", "description": "Fix X", "original_item": {}, "options": ["rerun"]}] mock_followup.return_value = {"created": [], "pending_actions": pending} mock_resolve.return_value = [{"resolved": "rerun", "result": {}}] models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "debugger", "brief": "find"}, {"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True mock_resolve.assert_called_once_with(conn, "VDOL-001", pending) # --------------------------------------------------------------------------- # KIN-080: Guard — не перезаписывать статус, если пользователь изменил вручную # --------------------------------------------------------------------------- class TestPipelineStatusGuard: """Тесты guard-check: pipeline не должен перезаписывать статус задачи, если пользователь вручную изменил его на 'done' или 'cancelled' пока pipeline выполнялся.""" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner._get_changed_files") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_pipeline_preserves_done_status_set_during_execution( self, mock_run, mock_hooks, mock_get_files, mock_learn, mock_autocommit, conn ): """Guard: если пользователь вручную поставил 'done' пока pipeline работал — итоговый статус должен остаться 'done', а не перезаписаться в 'review'.""" def side_effect(*args, **kwargs): # Имитируем ручную смену статуса во время выполнения агента models.update_task(conn, "VDOL-001", status="done") return _mock_claude_success({"result": "done"}) mock_run.side_effect = side_effect mock_hooks.return_value = [] mock_get_files.return_value = [] mock_learn.return_value = {"added": 0, "skipped": 0} steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "done" # guard НЕ перезаписал в "review" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner._get_changed_files") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_pipeline_preserves_cancelled_status_set_during_execution( self, mock_run, mock_hooks, mock_get_files, mock_learn, mock_autocommit, conn ): """Guard: если пользователь вручную поставил 'cancelled' пока pipeline работал — итоговый статус должен остаться 'cancelled'.""" def side_effect(*args, **kwargs): models.update_task(conn, "VDOL-001", status="cancelled") return _mock_claude_success({"result": "done"}) mock_run.side_effect = side_effect mock_hooks.return_value = [] mock_get_files.return_value = [] mock_learn.return_value = {"added": 0, "skipped": 0} steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "cancelled" # guard НЕ перезаписал в "review" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner._get_changed_files") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_pipeline_sets_review_when_no_manual_override( self, mock_run, mock_hooks, mock_get_files, mock_learn, mock_autocommit, conn ): """Нормальный случай: задача в in_progress, пользователь не трогал статус — после pipeline устанавливается 'review'.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] mock_get_files.return_value = [] mock_learn.return_value = {"added": 0, "skipped": 0} steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "review" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner._get_changed_files") @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_auto_mode_preserves_done_status_set_during_execution( self, mock_run, mock_hooks, mock_followup, mock_get_files, mock_learn, mock_autocommit, conn ): """Guard в auto_complete mode: если пользователь вручную поставил 'done' пока pipeline работал — guard пропускает обновление (уже done).""" def side_effect(*args, **kwargs): models.update_task(conn, "VDOL-001", status="done") return _mock_claude_success({"result": "done"}) mock_run.side_effect = side_effect mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} mock_get_files.return_value = [] mock_learn.return_value = {"added": 0, "skipped": 0} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "tester", "brief": "verify"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "done" # --------------------------------------------------------------------------- # Retry on permission error # --------------------------------------------------------------------------- class TestRetryOnPermissionError: @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner._get_changed_files") # KIN-003: prevents git subprocess calls @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_retry_on_permission_error_auto_mode(self, mock_run, mock_hooks, mock_followup, mock_get_files, mock_learn, mock_autocommit, conn): """Auto mode: retry при permission error должен срабатывать.""" permission_fail = _mock_claude_failure("permission denied: cannot write file") retry_success = _mock_claude_success({"result": "fixed"}) mock_run.side_effect = [permission_fail, retry_success] mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} mock_learn.return_value = {"added": 0, "skipped": 0} mock_get_files.return_value = [] models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True assert mock_run.call_count == 2 # Second call must include --dangerously-skip-permissions second_cmd = mock_run.call_args_list[1][0][0] assert "--dangerously-skip-permissions" in second_cmd @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_review_mode_does_not_retry_on_permission_error(self, mock_run, mock_hooks, conn): """Review mode: retry при permission error НЕ должен срабатывать.""" permission_fail = _mock_claude_failure("permission denied: cannot write file") mock_run.return_value = permission_fail mock_hooks.return_value = [] # Проект остаётся в default "review" mode steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False assert mock_run.call_count == 1 # --------------------------------------------------------------------------- # JSON parsing # --------------------------------------------------------------------------- class TestTryParseJson: def test_direct_json(self): assert _try_parse_json('{"a": 1}') == {"a": 1} def test_json_in_code_fence(self): text = 'Some text\n```json\n{"a": 1}\n```\nMore text' assert _try_parse_json(text) == {"a": 1} def test_json_embedded_in_text(self): text = 'Here is the result: {"status": "ok", "count": 42} and more' result = _try_parse_json(text) assert result == {"status": "ok", "count": 42} def test_empty_string(self): assert _try_parse_json("") is None def test_no_json(self): assert _try_parse_json("just plain text") is None def test_json_array(self): assert _try_parse_json('[1, 2, 3]') == [1, 2, 3] def test_empty_array_returns_list_not_none(self): """Empty array [] must return [], not None — [] is falsy but valid JSON.""" result = _try_parse_json('[]') assert result == [] assert result is not None def test_empty_object_returns_dict_not_none(self): """Empty object {} must return {}, not None.""" result = _try_parse_json('{}') assert result == {} assert result is not None # --------------------------------------------------------------------------- # Falsy output preservation (empty array / empty dict from agent) # --------------------------------------------------------------------------- class TestRunAgentFalsyOutputPreservation: """Regression tests for bug: `parsed_output if parsed_output else output_text` drops falsy values like [] and {} instead of preserving them. Bug location: runner.py line ~236. """ @patch("agents.runner.subprocess.run") def test_empty_array_output_preserved_not_replaced_by_raw_string(self, mock_run, conn): """When agent returns '[]', output must be [] (list), not the raw string '[]'.""" mock = MagicMock() mock.stdout = json.dumps({"result": "[]"}) mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock result = run_agent(conn, "tester", "VDOL-001", "vdol") # _try_parse_json parses the outer envelope; result['result'] is the string '[]' # which then gets parsed again → [] list # The key check: output must be a list, NOT the raw string '[]' assert result["output"] == [] or result["output"] == "[]", \ f"output should be [] or '[]', got {result['output']!r}" # Stronger assertion: if the fix is in place, output == [] assert result["output"] == [], \ f"Fix not applied: empty array lost, got raw string {result['output']!r}" @patch("agents.runner.subprocess.run") def test_empty_array_top_level_output_preserved(self, mock_run, conn): """When claude stdout IS '[]' (top-level array), output must be [] not '[]'.""" mock = MagicMock() mock.stdout = "[]" mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock result = run_agent(conn, "tester", "VDOL-001", "vdol") # _try_parse_json('[]') returns [] (a list, falsy) # Bug: `parsed_output if parsed_output else output_text` drops [] → returns '[]' # Fix: `output_text if parsed_output is None else parsed_output` assert result["output"] == [], \ f"Fix not applied: empty list dropped, got {result['output']!r}" @patch("agents.runner.subprocess.run") def test_empty_dict_top_level_output_preserved(self, mock_run, conn): """When claude stdout IS '{}', output must be {} not '{}'.""" mock = MagicMock() mock.stdout = "{}" mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock result = run_agent(conn, "tester", "VDOL-001", "vdol") assert result["output"] == {}, \ f"Fix not applied: empty dict dropped, got {result['output']!r}" @patch("agents.runner.subprocess.run") def test_nonempty_array_output_still_works(self, mock_run, conn): """Normal non-empty array output [1,2,3] must still be returned correctly.""" mock = MagicMock() mock.stdout = "[1, 2, 3]" mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock result = run_agent(conn, "tester", "VDOL-001", "vdol") assert result["output"] == [1, 2, 3] @patch("agents.runner.subprocess.run") def test_none_parsed_output_falls_back_to_raw_string(self, mock_run, conn): """When output is not JSON at all, raw string must be returned.""" mock = MagicMock() mock.stdout = "this is plain text" mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock result = run_agent(conn, "tester", "VDOL-001", "vdol") assert result["output"] == "this is plain text" # --------------------------------------------------------------------------- # _run_claude falsy result field (empty array in 'result' key) # --------------------------------------------------------------------------- class TestRunClaudeEmptyResultField: """Regression tests for bug in _run_claude: `parsed.get('result') or parsed.get('content')` drops empty array/dict in 'result' and falls through to 'content'. Bug location: runner.py line ~313. """ @patch("agents.runner.subprocess.run") def test_empty_array_in_result_field_not_lost(self, mock_run, conn): """When claude returns {"result": []}, output must be '[]', not content fallback.""" mock = MagicMock() # Claude envelope: result field contains empty array mock.stdout = json.dumps({"result": [], "content": "fallback text"}) mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock result = run_agent(conn, "tester", "VDOL-001", "vdol") # Bug: `parsed.get("result") or parsed.get("content")` → "fallback text" # Fix: explicit None check → "[]" (json.dumps([])) raw = result["raw_output"] assert "fallback text" not in raw or raw == json.dumps({ "result": [], "content": "fallback text" }), "output should not silently fall through to 'content' when 'result' is []" @patch("agents.runner.subprocess.run") def test_result_field_present_and_nonempty_used(self, mock_run, conn): """Normal case: non-empty result field is returned, content ignored.""" mock = MagicMock() mock.stdout = json.dumps({"result": "done", "content": "ignored"}) mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock result = run_agent(conn, "tester", "VDOL-001", "vdol") assert result["output"] == "done" @patch("agents.runner.subprocess.run") def test_result_field_missing_falls_back_to_content(self, mock_run, conn): """When 'result' key is absent, 'content' must be used as fallback.""" mock = MagicMock() mock.stdout = json.dumps({"content": "content value"}) mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock result = run_agent(conn, "tester", "VDOL-001", "vdol") assert result["output"] == "content value" # --------------------------------------------------------------------------- # Non-interactive mode # --------------------------------------------------------------------------- class TestNonInteractive: @patch("agents.runner.subprocess.run") def test_noninteractive_sets_stdin_devnull(self, mock_run, conn): """When noninteractive=True, subprocess.run should get stdin=subprocess.DEVNULL.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=True) call_kwargs = mock_run.call_args[1] assert call_kwargs.get("stdin") == subprocess.DEVNULL @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": ""}, clear=False) @patch("agents.runner.subprocess.run") def test_noninteractive_uses_model_timeout(self, mock_run, conn): mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=True) call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 2400 # sonnet default @patch.dict("os.environ", {"KIN_NONINTERACTIVE": ""}) @patch("agents.runner.subprocess.run") def test_interactive_uses_model_timeout(self, mock_run, conn): mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=False) call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 2400 # sonnet default @patch.dict("os.environ", {"KIN_NONINTERACTIVE": ""}) @patch("agents.runner.subprocess.run") def test_interactive_no_stdin_override(self, mock_run, conn): """In interactive mode, stdin should not be set to DEVNULL.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=False) call_kwargs = mock_run.call_args[1] assert call_kwargs.get("stdin") is None @patch.dict("os.environ", {"KIN_NONINTERACTIVE": "1"}) @patch("agents.runner.subprocess.run") def test_env_var_activates_noninteractive(self, mock_run, conn): """KIN_NONINTERACTIVE=1 env var should activate non-interactive mode.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=False) call_kwargs = mock_run.call_args[1] assert call_kwargs.get("stdin") == subprocess.DEVNULL assert call_kwargs.get("timeout") == 2400 # sonnet default @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": "900"}) @patch("agents.runner.subprocess.run") def test_custom_timeout_via_env_var(self, mock_run, conn): """KIN_AGENT_TIMEOUT overrides model-based timeout.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol") call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 900 @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": ""}) @patch("agents.runner.subprocess.run") def test_opus_timeout_2400(self, mock_run, conn): """Opus model gets 2400s (40 min) timeout.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", model="opus") call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 2400 @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": ""}) @patch("agents.runner.subprocess.run") def test_haiku_timeout_1200(self, mock_run, conn): """Haiku model gets 1200s (20 min) timeout.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", model="haiku") call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 1200 @patch.dict("os.environ", {"KIN_AGENT_TIMEOUT": "999"}) @patch("agents.runner.subprocess.run") def test_env_timeout_overrides_model(self, mock_run, conn): """KIN_AGENT_TIMEOUT env var overrides model-based timeout.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", model="opus") call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 999 @patch("agents.runner.subprocess.run") def test_allow_write_adds_skip_permissions(self, mock_run, conn): mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", allow_write=True) cmd = mock_run.call_args[0][0] assert "--dangerously-skip-permissions" in cmd @patch("agents.runner.subprocess.run") def test_no_allow_write_no_skip_permissions(self, mock_run, conn): mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol", allow_write=False) cmd = mock_run.call_args[0][0] assert "--dangerously-skip-permissions" not in cmd # --------------------------------------------------------------------------- # run_audit # --------------------------------------------------------------------------- class TestRunAudit: @patch("agents.runner.subprocess.run") def test_audit_success(self, mock_run, conn): """Audit should return parsed already_done/still_pending/unclear.""" audit_output = json.dumps({ "already_done": [{"id": "VDOL-001", "reason": "Fixed in runner.py"}], "still_pending": [], "unclear": [], }) mock_run.return_value = _mock_claude_success({"result": audit_output}) result = run_audit(conn, "vdol") assert result["success"] is True assert len(result["already_done"]) == 1 assert result["already_done"][0]["id"] == "VDOL-001" @patch("agents.runner.subprocess.run") def test_audit_logs_to_db(self, mock_run, conn): """Audit should log to agent_logs with role=backlog_audit.""" mock_run.return_value = _mock_claude_success({ "result": json.dumps({"already_done": [], "still_pending": [], "unclear": []}), }) run_audit(conn, "vdol") logs = conn.execute( "SELECT * FROM agent_logs WHERE agent_role='backlog_audit'" ).fetchall() assert len(logs) == 1 assert logs[0]["action"] == "audit" def test_audit_no_pending_tasks(self, conn): """If no pending tasks, return success with empty lists.""" # Mark existing task as done models.update_task(conn, "VDOL-001", status="done") result = run_audit(conn, "vdol") assert result["success"] is True assert result["already_done"] == [] assert "No pending tasks" in result.get("message", "") def test_audit_project_not_found(self, conn): result = run_audit(conn, "nonexistent") assert result["success"] is False assert "not found" in result["error"] @patch("agents.runner.subprocess.run") def test_audit_uses_sonnet(self, mock_run, conn): """Audit should use sonnet model.""" mock_run.return_value = _mock_claude_success({ "result": json.dumps({"already_done": [], "still_pending": [], "unclear": []}), }) run_audit(conn, "vdol") cmd = mock_run.call_args[0][0] model_idx = cmd.index("--model") assert cmd[model_idx + 1] == "sonnet" @patch("agents.runner.subprocess.run") def test_audit_includes_tasks_in_prompt(self, mock_run, conn): """The prompt should contain the task title.""" mock_run.return_value = _mock_claude_success({ "result": json.dumps({"already_done": [], "still_pending": [], "unclear": []}), }) run_audit(conn, "vdol") prompt = mock_run.call_args[0][0][2] # -p argument assert "VDOL-001" in prompt assert "Fix bug" in prompt @patch("agents.runner.subprocess.run") def test_audit_auto_apply_marks_done(self, mock_run, conn): """auto_apply=True should mark already_done tasks as done in DB.""" mock_run.return_value = _mock_claude_success({ "result": json.dumps({ "already_done": [{"id": "VDOL-001", "reason": "Done"}], "still_pending": [], "unclear": [], }), }) result = run_audit(conn, "vdol", auto_apply=True) assert result["success"] is True assert "VDOL-001" in result["applied"] task = models.get_task(conn, "VDOL-001") assert task["status"] == "done" @patch("agents.runner.subprocess.run") def test_audit_no_auto_apply_keeps_pending(self, mock_run, conn): """auto_apply=False should NOT change task status.""" mock_run.return_value = _mock_claude_success({ "result": json.dumps({ "already_done": [{"id": "VDOL-001", "reason": "Done"}], "still_pending": [], "unclear": [], }), }) result = run_audit(conn, "vdol", auto_apply=False) assert result["success"] is True assert result["applied"] == [] task = models.get_task(conn, "VDOL-001") assert task["status"] == "pending" @patch("agents.runner.subprocess.run") def test_audit_uses_dangerously_skip_permissions(self, mock_run, conn): """Audit must use --dangerously-skip-permissions for tool access.""" mock_run.return_value = _mock_claude_success({ "result": json.dumps({"already_done": [], "still_pending": [], "unclear": []}), }) run_audit(conn, "vdol") cmd = mock_run.call_args[0][0] assert "--dangerously-skip-permissions" in cmd # --------------------------------------------------------------------------- # KIN-019: Silent FAILED diagnostics (regression tests) # --------------------------------------------------------------------------- class TestSilentFailedDiagnostics: """Regression: агент падает без вывода — runner должен сохранять диагностику в БД.""" @patch("agents.runner.subprocess.run") def test_agent_empty_stdout_saves_stderr_as_error_message_in_db(self, mock_run, conn): """Когда stdout пустой и returncode != 0, stderr должен сохраняться как error_message в agent_logs.""" mock = MagicMock() mock.stdout = "" mock.stderr = "API rate limit exceeded (429)" mock.returncode = 1 mock_run.return_value = mock run_agent(conn, "debugger", "VDOL-001", "vdol") log = conn.execute( "SELECT error_message FROM agent_logs WHERE task_id='VDOL-001'" ).fetchone() assert log is not None assert log["error_message"] is not None assert "rate limit" in log["error_message"] @patch("agents.runner.subprocess.run") def test_agent_empty_stdout_returns_error_key_with_stderr(self, mock_run, conn): """run_agent должен вернуть ключ 'error' с содержимым stderr при пустом stdout и ненулевом returncode.""" mock = MagicMock() mock.stdout = "" mock.stderr = "Permission denied: cannot write to /etc/hosts" mock.returncode = 1 mock_run.return_value = mock result = run_agent(conn, "debugger", "VDOL-001", "vdol") assert result["success"] is False assert "error" in result assert result["error"] is not None assert "Permission denied" in result["error"] @patch("agents.runner.subprocess.run") def test_pipeline_error_message_includes_agent_stderr(self, mock_run, conn): """Сообщение об ошибке pipeline должно включать stderr агента, а не только generic 'step failed'.""" mock = MagicMock() mock.stdout = "" mock.stderr = "Internal server error: unexpected EOF" mock.returncode = 1 mock_run.return_value = mock steps = [{"role": "tester", "brief": "run tests"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False assert "Internal server error" in result["error"] or "unexpected EOF" in result["error"] @patch("agents.runner.build_context") def test_pipeline_exception_in_run_agent_marks_task_blocked(self, mock_ctx, conn): """Исключение внутри run_agent (например, из build_context) должно ставить задачу в blocked.""" mock_ctx.side_effect = RuntimeError("DB connection lost") steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False task = models.get_task(conn, "VDOL-001") assert task["status"] == "blocked" @patch("agents.runner.build_context") def test_pipeline_exception_logs_to_agent_logs(self, mock_ctx, conn): """Исключение в run_agent должно быть залогировано в agent_logs с success=False.""" mock_ctx.side_effect = ValueError("bad context data") steps = [{"role": "tester", "brief": "test"}] run_pipeline(conn, "VDOL-001", steps) logs = conn.execute( "SELECT * FROM agent_logs WHERE task_id='VDOL-001' AND success=0" ).fetchall() assert len(logs) >= 1 @patch("agents.runner.build_context") def test_pipeline_exception_marks_pipeline_failed_in_db(self, mock_ctx, conn): """При исключении запись pipeline должна существовать в БД и иметь статус failed.""" mock_ctx.side_effect = RuntimeError("network timeout") steps = [{"role": "debugger", "brief": "find"}] run_pipeline(conn, "VDOL-001", steps) pipe = conn.execute("SELECT * FROM pipelines WHERE task_id='VDOL-001'").fetchone() assert pipe is not None assert pipe["status"] == "failed" @patch("agents.runner.subprocess.run") def test_agent_success_has_no_error_key_populated(self, mock_run, conn): """При успешном запуске агента ключ 'error' в результате должен быть None (нет ложных срабатываний).""" mock_run.return_value = _mock_claude_success({"result": "all good"}) result = run_agent(conn, "debugger", "VDOL-001", "vdol") assert result["success"] is True assert result.get("error") is None # --------------------------------------------------------------------------- # Auto-learning: _run_learning_extraction # --------------------------------------------------------------------------- class TestRunLearningExtraction: @patch("agents.runner.subprocess.run") def test_extracts_and_saves_decisions(self, mock_run, conn): """Успешный сценарий: learner возвращает JSON с decisions, они сохраняются в БД.""" learner_output = json.dumps({ "decisions": [ {"type": "gotcha", "title": "SQLite WAL mode needed", "description": "Without WAL concurrent reads fail", "tags": ["sqlite", "db"]}, {"type": "convention", "title": "Always run tests after change", "description": "Prevents regressions", "tags": ["testing"]}, ] }) mock_run.return_value = _mock_claude_success({"result": learner_output}) step_results = [ {"role": "debugger", "raw_output": "Found issue with sqlite concurrent access"}, ] result = _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) assert result["added"] == 2 assert result["skipped"] == 0 decisions = conn.execute("SELECT * FROM decisions WHERE project_id='vdol'").fetchall() assert len(decisions) == 2 titles = {d["title"] for d in decisions} assert "SQLite WAL mode needed" in titles assert "Always run tests after change" in titles @patch("agents.runner.subprocess.run") def test_skips_duplicate_decisions(self, mock_run, conn): """Дедупликация: если decision с таким title+type уже есть, пропускается.""" from core import models as m m.add_decision(conn, "vdol", "gotcha", "SQLite WAL mode needed", "existing desc") learner_output = json.dumps({ "decisions": [ {"type": "gotcha", "title": "SQLite WAL mode needed", "description": "duplicate", "tags": []}, {"type": "convention", "title": "New convention here", "description": "new desc", "tags": []}, ] }) mock_run.return_value = _mock_claude_success({"result": learner_output}) step_results = [{"role": "tester", "raw_output": "test output"}] result = _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) assert result["added"] == 1 assert result["skipped"] == 1 assert len(conn.execute("SELECT * FROM decisions WHERE project_id='vdol'").fetchall()) == 2 @patch("agents.runner.subprocess.run") def test_limits_to_5_decisions(self, mock_run, conn): """Learner не должен сохранять более 5 decisions даже если агент вернул больше.""" decisions_list = [ {"type": "decision", "title": f"Decision {i}", "description": f"desc {i}", "tags": []} for i in range(8) ] learner_output = json.dumps({"decisions": decisions_list}) mock_run.return_value = _mock_claude_success({"result": learner_output}) step_results = [{"role": "architect", "raw_output": "long output"}] result = _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) assert result["added"] == 5 assert len(conn.execute("SELECT * FROM decisions WHERE project_id='vdol'").fetchall()) == 5 @patch("agents.runner.subprocess.run") def test_non_json_output_returns_error(self, mock_run, conn): """Если learner вернул не-JSON, функция возвращает error, не бросает исключение.""" mock_run.return_value = _mock_claude_success({"result": "plain text, not json"}) step_results = [{"role": "debugger", "raw_output": "output"}] result = _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) assert result["added"] == 0 assert "error" in result assert len(conn.execute("SELECT * FROM decisions WHERE project_id='vdol'").fetchall()) == 0 @patch("agents.runner.subprocess.run") def test_decisions_linked_to_task(self, mock_run, conn): """Сохранённые decisions должны быть привязаны к task_id.""" learner_output = json.dumps({ "decisions": [ {"type": "gotcha", "title": "Important gotcha", "description": "desc", "tags": []}, ] }) mock_run.return_value = _mock_claude_success({"result": learner_output}) step_results = [{"role": "debugger", "raw_output": "output"}] _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) d = conn.execute("SELECT * FROM decisions WHERE project_id='vdol'").fetchone() assert d["task_id"] == "VDOL-001" @patch("agents.runner._run_learning_extraction") @patch("agents.runner.subprocess.run") def test_pipeline_triggers_learning_after_completion(self, mock_run, mock_learn, conn): """run_pipeline должен вызывать _run_learning_extraction после успешного завершения.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_learn.return_value = {"added": 1, "skipped": 0} steps = [{"role": "debugger", "brief": "find bug"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True mock_learn.assert_called_once() call_args = mock_learn.call_args[0] assert call_args[1] == "VDOL-001" # task_id assert call_args[2] == "vdol" # project_id @patch("agents.runner._run_learning_extraction") @patch("agents.runner.subprocess.run") def test_learning_error_does_not_break_pipeline(self, mock_run, mock_learn, conn): """Если _run_learning_extraction бросает исключение, pipeline не падает.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_learn.side_effect = Exception("learning failed") steps = [{"role": "debugger", "brief": "find bug"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True def test_pipeline_dry_run_skips_learning(self, conn): """Dry run не должен вызывать _run_learning_extraction.""" steps = [{"role": "debugger", "brief": "find bug"}] result = run_pipeline(conn, "VDOL-001", steps, dry_run=True) assert result["dry_run"] is True # No decisions saved (dry run — no DB activity) assert len(conn.execute("SELECT * FROM decisions WHERE project_id='vdol'").fetchall()) == 0 @patch("agents.runner.subprocess.run") def test_empty_learner_output_returns_no_decisions(self, mock_run, conn): """Пустой stdout от learner (subprocess вернул "") — не бросает исключение, возвращает error.""" # Используем пустую строку как stdout (не dict), чтобы raw_output оказался пустым mock_run.return_value = _mock_claude_success("") step_results = [{"role": "debugger", "raw_output": "output"}] result = _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) assert result["added"] == 0 assert "error" in result assert len(conn.execute("SELECT * FROM decisions WHERE project_id='vdol'").fetchall()) == 0 @patch("agents.runner.subprocess.run") def test_empty_decisions_list_returns_zero_counts(self, mock_run, conn): """Learner возвращает {"decisions": []} — added=0, skipped=0, без ошибки.""" mock_run.return_value = _mock_claude_success({"result": json.dumps({"decisions": []})}) step_results = [{"role": "debugger", "raw_output": "output"}] result = _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) assert result["added"] == 0 assert result["skipped"] == 0 assert "error" not in result @patch("agents.runner.subprocess.run") def test_decision_missing_title_is_skipped(self, mock_run, conn): """Decision без title молча пропускается, не вызывает исключение.""" learner_output = json.dumps({ "decisions": [ {"type": "gotcha", "description": "no title here", "tags": []}, {"type": "convention", "title": "Valid decision", "description": "desc", "tags": []}, ] }) mock_run.return_value = _mock_claude_success({"result": learner_output}) step_results = [{"role": "debugger", "raw_output": "output"}] result = _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) assert result["added"] == 1 assert len(conn.execute("SELECT * FROM decisions WHERE project_id='vdol'").fetchall()) == 1 @patch("agents.runner.subprocess.run") def test_decisions_field_not_list_returns_error(self, mock_run, conn): """Если поле decisions не является списком — возвращается error dict.""" mock_run.return_value = _mock_claude_success({"result": json.dumps({"decisions": "not a list"})}) step_results = [{"role": "debugger", "raw_output": "output"}] result = _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) assert result["added"] == 0 assert "error" in result @patch("agents.runner.subprocess.run") def test_logs_agent_run_to_db(self, mock_run, conn): """KIN-060: _run_learning_extraction должна писать запись в agent_logs.""" learner_output = json.dumps({ "decisions": [ {"type": "gotcha", "title": "Log test", "description": "desc", "tags": []}, ] }) mock_run.return_value = _mock_claude_success({"result": learner_output}) step_results = [{"role": "debugger", "raw_output": "output"}] _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) logs = conn.execute( "SELECT * FROM agent_logs WHERE agent_role='learner' AND project_id='vdol'" ).fetchall() assert len(logs) == 1 log = logs[0] assert log["task_id"] == "VDOL-001" assert log["action"] == "learn" assert log["model"] == "sonnet" @patch("agents.runner.subprocess.run") def test_learner_cost_included_in_cost_summary(self, mock_run, conn): """KIN-060: get_cost_summary() включает затраты learner-агента.""" learner_output = json.dumps({"decisions": []}) mock_run.return_value = _mock_claude_success({ "result": learner_output, "cost_usd": 0.042, "usage": {"total_tokens": 3000}, }) step_results = [{"role": "debugger", "raw_output": "output"}] _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) costs = models.get_cost_summary(conn, days=1) assert len(costs) == 1 assert costs[0]["project_id"] == "vdol" assert costs[0]["total_cost_usd"] == pytest.approx(0.042) assert costs[0]["total_tokens"] == 3000 # ----------------------------------------------------------------------- # KIN-061: Regression — валидация поля type в decision # ----------------------------------------------------------------------- @patch("agents.runner.subprocess.run") def test_valid_type_gotcha_is_saved_as_is(self, mock_run, conn): """KIN-061: валидный тип 'gotcha' сохраняется без изменений.""" learner_output = json.dumps({ "decisions": [ {"type": "gotcha", "title": "Use WAL mode", "description": "Concurrent reads need WAL", "tags": []}, ] }) mock_run.return_value = _mock_claude_success({"result": learner_output}) result = _run_learning_extraction(conn, "VDOL-001", "vdol", [{"role": "debugger", "raw_output": "x"}]) assert result["added"] == 1 d = conn.execute("SELECT type FROM decisions WHERE project_id='vdol'").fetchone() assert d["type"] == "gotcha" @patch("agents.runner.subprocess.run") def test_invalid_type_falls_back_to_decision(self, mock_run, conn): """KIN-061: невалидный тип 'unknown_type' заменяется на 'decision'.""" learner_output = json.dumps({ "decisions": [ {"type": "unknown_type", "title": "Some title", "description": "Some desc", "tags": []}, ] }) mock_run.return_value = _mock_claude_success({"result": learner_output}) result = _run_learning_extraction(conn, "VDOL-001", "vdol", [{"role": "debugger", "raw_output": "x"}]) assert result["added"] == 1 d = conn.execute("SELECT type FROM decisions WHERE project_id='vdol'").fetchone() assert d["type"] == "decision" @patch("agents.runner.subprocess.run") def test_missing_type_falls_back_to_decision(self, mock_run, conn): """KIN-061: отсутствующий ключ 'type' в decision заменяется на 'decision'.""" learner_output = json.dumps({ "decisions": [ {"title": "No type key here", "description": "desc without type", "tags": []}, ] }) mock_run.return_value = _mock_claude_success({"result": learner_output}) result = _run_learning_extraction(conn, "VDOL-001", "vdol", [{"role": "debugger", "raw_output": "x"}]) assert result["added"] == 1 d = conn.execute("SELECT type FROM decisions WHERE project_id='vdol'").fetchone() assert d["type"] == "decision" # ----------------------------------------------------------------------- # KIN-062: KIN_LEARNER_TIMEOUT — отдельный таймаут для learner-агента # ----------------------------------------------------------------------- @patch.dict("os.environ", {"KIN_LEARNER_TIMEOUT": ""}, clear=False) @patch("agents.runner.subprocess.run") def test_learner_uses_120s_default_timeout(self, mock_run, conn): """KIN-062: по умолчанию learner использует таймаут 120s (KIN_LEARNER_TIMEOUT не задан).""" mock_run.return_value = _mock_claude_success({"result": json.dumps({"decisions": []})}) step_results = [{"role": "debugger", "raw_output": "output"}] _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 120 @patch.dict("os.environ", {"KIN_LEARNER_TIMEOUT": "300"}, clear=False) @patch("agents.runner.subprocess.run") def test_learner_uses_custom_timeout_from_env(self, mock_run, conn): """KIN-062: KIN_LEARNER_TIMEOUT переопределяет дефолтный таймаут learner-агента.""" mock_run.return_value = _mock_claude_success({"result": json.dumps({"decisions": []})}) step_results = [{"role": "debugger", "raw_output": "output"}] _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 300 @patch.dict("os.environ", {"KIN_LEARNER_TIMEOUT": "60", "KIN_AGENT_TIMEOUT": "900"}, clear=False) @patch("agents.runner.subprocess.run") def test_learner_timeout_independent_of_agent_timeout(self, mock_run, conn): """KIN-062: KIN_LEARNER_TIMEOUT не зависит от KIN_AGENT_TIMEOUT.""" mock_run.return_value = _mock_claude_success({"result": json.dumps({"decisions": []})}) step_results = [{"role": "debugger", "raw_output": "output"}] _run_learning_extraction(conn, "VDOL-001", "vdol", step_results) call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 60 # --------------------------------------------------------------------------- # KIN-056: Regression — web path timeout parity with CLI # --------------------------------------------------------------------------- class TestRegressionKIN056: """Регрессионные тесты KIN-056: агенты таймаутили через 300s из web, но не из CLI. Причина: noninteractive режим использовал timeout=300s. Web API всегда устанавливает KIN_NONINTERACTIVE=1, поэтому таймаут был 300s. Фикс: единый timeout=600s независимо от noninteractive (переопределяется KIN_AGENT_TIMEOUT). Каждый тест ПАДАЛ бы со старым кодом (timeout=300 для noninteractive) и ПРОХОДИТ после фикса. """ @patch.dict("os.environ", {"KIN_NONINTERACTIVE": "1", "KIN_AGENT_TIMEOUT": ""}) @patch("agents.runner.subprocess.run") def test_web_noninteractive_env_does_not_use_300s(self, mock_run, conn): """Web путь устанавливает KIN_NONINTERACTIVE=1. До фикса это давало timeout=300s.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol") call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") != 300, ( "Регрессия KIN-056: timeout не должен быть 300s в noninteractive режиме" ) @patch.dict("os.environ", {"KIN_NONINTERACTIVE": "1", "KIN_AGENT_TIMEOUT": ""}) @patch("agents.runner.subprocess.run") def test_web_noninteractive_timeout_uses_model_default(self, mock_run, conn): """Web путь: KIN_NONINTERACTIVE=1 → timeout = model default (sonnet=2400s).""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol") call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 2400 @patch("agents.runner.subprocess.run") def test_web_and_cli_paths_use_same_timeout(self, mock_run, conn): """Таймаут через web-путь (KIN_NONINTERACTIVE=1) == таймаут CLI (noninteractive=True).""" mock_run.return_value = _mock_claude_success({"result": "ok"}) # Web path: env var KIN_NONINTERACTIVE=1, noninteractive param not set with patch.dict("os.environ", {"KIN_NONINTERACTIVE": "1", "KIN_AGENT_TIMEOUT": ""}): run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=False) web_timeout = mock_run.call_args[1].get("timeout") mock_run.reset_mock() # CLI path: noninteractive=True, no env var with patch.dict("os.environ", {"KIN_NONINTERACTIVE": "", "KIN_AGENT_TIMEOUT": ""}): run_agent(conn, "debugger", "VDOL-001", "vdol", noninteractive=True) cli_timeout = mock_run.call_args[1].get("timeout") assert web_timeout == cli_timeout, ( f"Таймаут web ({web_timeout}s) != CLI ({cli_timeout}s) — регрессия KIN-056" ) @patch.dict("os.environ", {"KIN_NONINTERACTIVE": "1", "KIN_AGENT_TIMEOUT": "900"}) @patch("agents.runner.subprocess.run") def test_web_noninteractive_respects_kin_agent_timeout_override(self, mock_run, conn): """Web путь: KIN_AGENT_TIMEOUT переопределяет дефолтный таймаут даже при KIN_NONINTERACTIVE=1.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol") call_kwargs = mock_run.call_args[1] assert call_kwargs.get("timeout") == 900 # --------------------------------------------------------------------------- # KIN-TIMEOUT: Регрессия — таймауты были слишком короткими (opus 1800s, sonnet 1200s, haiku 600s) # --------------------------------------------------------------------------- class TestRegressionModelTimeouts: """Регрессионные тесты: увеличенные таймауты для opus/sonnet/haiku. Старые значения: opus=1800s, sonnet=1200s, haiku=600s. Новые значения: opus=2400s, sonnet=2400s, haiku=1200s. Каждый тест ПАДАЛ бы со старыми значениями и ПРОХОДИТ после фикса. """ def test_opus_model_timeout_is_2400(self): """Opus: таймаут должен быть 2400s (40 мин), а не старый 1800s.""" assert _MODEL_TIMEOUTS["opus"] == 2400, ( f"Регрессия: ожидалось 2400s для opus, получено {_MODEL_TIMEOUTS['opus']}s" ) def test_sonnet_model_timeout_is_2400(self): """Sonnet: таймаут должен быть 2400s (40 мин), а не старый 1200s.""" assert _MODEL_TIMEOUTS["sonnet"] == 2400, ( f"Регрессия: ожидалось 2400s для sonnet, получено {_MODEL_TIMEOUTS['sonnet']}s" ) def test_haiku_model_timeout_is_1200(self): """Haiku: таймаут должен быть 1200s (20 мин), а не старый 600s.""" assert _MODEL_TIMEOUTS["haiku"] == 1200, ( f"Регрессия: ожидалось 1200s для haiku, получено {_MODEL_TIMEOUTS['haiku']}s" ) def test_all_models_have_minimum_timeout_20min(self): """Все модели должны иметь таймаут не менее 1200s (20 мин).""" for model, timeout in _MODEL_TIMEOUTS.items(): assert timeout >= 1200, ( f"Регрессия: модель '{model}' имеет таймаут {timeout}s < 1200s" ) # --------------------------------------------------------------------------- # KIN-057: claude CLI в PATH при запуске через launchctl # --------------------------------------------------------------------------- class TestClaudePath: """Регрессионные тесты KIN-057: launchctl-демоны могут не видеть claude в PATH.""" def test_build_claude_env_contains_extra_paths(self): """_build_claude_env должен добавить /opt/homebrew/bin и /usr/local/bin в PATH.""" env = _build_claude_env() path_dirs = env["PATH"].split(":") for extra_dir in _EXTRA_PATH_DIRS: assert extra_dir in path_dirs, ( f"Регрессия KIN-057: {extra_dir} не найден в PATH, сгенерированном _build_claude_env" ) def test_build_claude_env_no_duplicate_paths(self): """_build_claude_env не должен дублировать уже существующие пути. Мокируем PATH на фиксированное значение, чтобы тест не зависел от реального окружения (решение #48). """ fixed_path = "/usr/bin:/bin" with patch.dict("os.environ", {"PATH": fixed_path}, clear=False): env = _build_claude_env() path_dirs = env["PATH"].split(":") seen = set() for d in path_dirs: assert d not in seen, f"Дублирующийся PATH entry: {d}" seen.add(d) def test_build_claude_env_preserves_existing_path(self): """_build_claude_env должен сохранять уже существующие пути.""" with patch.dict("os.environ", {"PATH": "/custom/bin:/usr/bin:/bin"}): env = _build_claude_env() path_dirs = env["PATH"].split(":") assert "/custom/bin" in path_dirs assert "/usr/bin" in path_dirs def test_resolve_claude_cmd_returns_string(self): """_resolve_claude_cmd должен всегда возвращать строку.""" cmd = _resolve_claude_cmd() assert isinstance(cmd, str) assert len(cmd) > 0 def test_resolve_claude_cmd_fallback_when_not_found(self): """_resolve_claude_cmd должен вернуть 'claude' если CLI не найден в PATH.""" with patch("agents.runner.shutil.which", return_value=None): cmd = _resolve_claude_cmd() assert cmd == "claude" def test_resolve_claude_cmd_returns_full_path_when_found(self): """_resolve_claude_cmd должен вернуть полный путь если claude найден.""" with patch("agents.runner.shutil.which", return_value="/opt/homebrew/bin/claude"): cmd = _resolve_claude_cmd() assert cmd == "/opt/homebrew/bin/claude" @patch("agents.runner.subprocess.run") def test_run_claude_passes_env_to_subprocess(self, mock_run, conn): """_run_claude должен передавать env= в subprocess.run (а не наследовать голый PATH).""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol") call_kwargs = mock_run.call_args[1] assert "env" in call_kwargs, ( "Регрессия KIN-057: subprocess.run должен получать явный env с расширенным PATH" ) assert call_kwargs["env"] is not None @patch("agents.runner.subprocess.run") def test_run_claude_env_has_homebrew_in_path(self, mock_run, conn): """env переданный в subprocess.run должен содержать /opt/homebrew/bin в PATH.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) run_agent(conn, "debugger", "VDOL-001", "vdol") call_kwargs = mock_run.call_args[1] env = call_kwargs.get("env", {}) assert "/opt/homebrew/bin" in env.get("PATH", ""), ( "Регрессия KIN-057: /opt/homebrew/bin не найден в env['PATH'] subprocess.run" ) @patch("agents.runner.subprocess.run") def test_file_not_found_returns_127(self, mock_run, conn): """Если claude не найден (FileNotFoundError), должен вернуться returncode 127.""" mock_run.side_effect = FileNotFoundError("claude not found") result = run_agent(conn, "debugger", "VDOL-001", "vdol") assert result["success"] is False assert "not found" in (result.get("error") or "").lower() @patch.dict("os.environ", {"PATH": ""}) def test_launchctl_empty_path_build_env_adds_extra_dirs(self): """Регрессия KIN-057: когда launchctl запускает с пустым PATH, _build_claude_env должен добавить _EXTRA_PATH_DIRS чтобы claude был доступен. Без фикса: os.environ["PATH"]="" → shutil.which("claude") → None → FileNotFoundError. После фикса: _build_claude_env строит PATH с /opt/homebrew/bin и др. """ env = _build_claude_env() path_dirs = env["PATH"].split(":") # Явная проверка каждой критичной директории for extra_dir in _EXTRA_PATH_DIRS: assert extra_dir in path_dirs, ( f"KIN-057: при пустом os PATH директория {extra_dir} должна быть добавлена" ) @patch.dict("os.environ", {"PATH": ""}) def test_launchctl_empty_path_shutil_which_fails_without_fix(self): """Воспроизводит сломанное поведение: при PATH='' shutil.which возвращает None. Это точно то, что происходило до фикса — launchctl не видел claude. Тест документирует, ПОЧЕМУ нужен _build_claude_env вместо прямого os.environ. """ import shutil # Без фикса: поиск с пустым PATH не найдёт claude result_without_fix = shutil.which("claude", path="") assert result_without_fix is None, ( "Если этот assert упал — shutil.which нашёл claude в пустом PATH, " "что невозможно. Ожидаем None — именно поэтому нужен _build_claude_env." ) # С фиксом: _resolve_claude_cmd строит расширенный PATH и находит claude # (или возвращает fallback "claude", но не бросает FileNotFoundError) cmd = _resolve_claude_cmd() assert isinstance(cmd, str) and len(cmd) > 0, ( "KIN-057: _resolve_claude_cmd должен возвращать строку даже при пустом os PATH" ) # --------------------------------------------------------------------------- # KIN-FIX-013: регрессия — удалён мёртвый код SSH_AGENT_PID # --------------------------------------------------------------------------- def test_build_claude_env_no_ssh_agent_pid_injection(self): """Регрессия KIN-FIX-013: SSH_AGENT_PID не должен искусственно добавляться. Если SSH_AGENT_PID отсутствует в os.environ — его не должно быть в результате. Ветка `if 'SSH_AGENT_PID' not in env` была always-no-op (env — копия os.environ, поэтому условие всегда False при наличии ключа). Удалена как мёртвый код. """ env_without_pid = {k: v for k, v in __import__("os").environ.items() if k != "SSH_AGENT_PID"} with patch.dict("os.environ", env_without_pid, clear=True): env = _build_claude_env() assert "SSH_AGENT_PID" not in env, ( "Регрессия KIN-FIX-013: SSH_AGENT_PID не должен появляться в env " "если его нет в os.environ — мёртвый код был удалён" ) def test_build_claude_env_ssh_agent_pid_preserved_when_set(self): """SSH_AGENT_PID из os.environ наследуется через os.environ.copy() штатно.""" with patch.dict("os.environ", {"SSH_AGENT_PID": "12345"}): env = _build_claude_env() assert env.get("SSH_AGENT_PID") == "12345", ( "SSH_AGENT_PID из os.environ должен наследоваться через copy()" ) def test_build_claude_env_no_dead_ssh_agent_pid_code(self): """Регрессия KIN-FIX-013: исходный код _build_claude_env не содержит SSH_AGENT_PID. Любое возвращение мёртвого кода будет поймано здесь. """ import inspect src = inspect.getsource(_build_claude_env) assert "SSH_AGENT_PID" not in src, ( "Регрессия KIN-FIX-013: мёртвый код SSH_AGENT_PID был возвращён в _build_claude_env" ) def test_build_claude_env_ssh_auth_sock_forwarded_from_environ(self): """SSH_AUTH_SOCK из os.environ должен пробрасываться в результирующий env.""" with patch.dict("os.environ", {"SSH_AUTH_SOCK": "/tmp/ssh-agent.sock"}): env = _build_claude_env() assert env.get("SSH_AUTH_SOCK") == "/tmp/ssh-agent.sock", ( "SSH_AUTH_SOCK из os.environ должен наследоваться через copy()" ) def test_build_claude_env_ssh_auth_sock_fallback_when_absent(self): """Если SSH_AUTH_SOCK не в os.environ, _build_claude_env пробует macOS-сокет.""" env_without_sock = {k: v for k, v in __import__("os").environ.items() if k != "SSH_AUTH_SOCK"} fake_sock = "/private/tmp/com.apple.launchd.FAKE12345/Listeners" with patch.dict("os.environ", env_without_sock, clear=True): with patch("glob.glob", return_value=[fake_sock]): env = _build_claude_env() # Если glob нашёл сокет — он должен быть установлен assert env.get("SSH_AUTH_SOCK") == fake_sock, ( "SSH_AUTH_SOCK должен быть установлен из macOS launchd-сокета при его наличии" ) def test_build_claude_env_ssh_auth_sock_absent_when_no_fallback(self): """Если SSH_AUTH_SOCK нет и macOS-сокет не найден — ключ не добавляется.""" env_without_sock = {k: v for k, v in __import__("os").environ.items() if k != "SSH_AUTH_SOCK"} with patch.dict("os.environ", env_without_sock, clear=True): with patch("glob.glob", return_value=[]): env = _build_claude_env() assert "SSH_AUTH_SOCK" not in env # --------------------------------------------------------------------------- # KIN-063: TestCompletionMode — auto_complete + last-step role check # --------------------------------------------------------------------------- class TestCompletionMode: """auto_complete mode срабатывает только если последний шаг — tester или reviewer.""" @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_auto_complete_with_tester_last_sets_done(self, mock_run, mock_hooks, mock_followup, conn): """auto_complete + последний шаг tester → status=done (Decision #29).""" mock_run.return_value = _mock_claude_success({"result": "ok"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "done" @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_auto_complete_with_reviewer_last_sets_done(self, mock_run, mock_hooks, mock_followup, conn): """auto_complete + последний шаг reviewer → status=done.""" mock_run.return_value = _mock_claude_success({"result": "ok"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "developer", "brief": "fix"}, {"role": "reviewer", "brief": "review"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "done" @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_auto_complete_without_tester_last_sets_review(self, mock_run, mock_hooks, mock_followup, conn): """auto_complete + последний шаг НЕ tester/reviewer → status=review (Decision #29).""" mock_run.return_value = _mock_claude_success({"result": "ok"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "developer", "brief": "fix"}, {"role": "debugger", "brief": "debug"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "review", ( "Регрессия KIN-063: auto_complete без tester/reviewer последним НЕ должен авто-завершать" ) @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_legacy_auto_mode_value_not_recognized(self, mock_run, mock_hooks, mock_followup, conn): """Регрессия: старое значение 'auto' больше не является валидным режимом. После KIN-063 'auto' → 'auto_complete'. Если в DB осталось 'auto' (без миграции), runner НЕ должен авто-завершать — это 'review'-ветка (безопасный fallback). (Decision #29) """ mock_run.return_value = _mock_claude_success({"result": "ok"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} # Прямой SQL-апдейт, обходя validate_completion_mode, чтобы симулировать # старую запись в БД без миграции conn.execute("UPDATE projects SET execution_mode='auto' WHERE id='vdol'") conn.commit() steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "review", ( "Регрессия: 'auto' (старый формат) не должен срабатывать как auto_complete" ) @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_review_mode_with_tester_last_keeps_task_in_review(self, mock_run, mock_hooks, mock_followup, conn): """review mode + последний шаг tester → task.status == 'review', НЕ done (ждёт ручного approve).""" mock_run.return_value = _mock_claude_success({"result": "all tests pass"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} # Проект и задача остаются в дефолтном 'review' mode steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "review" assert task["status"] != "done", ( "KIN-063: review mode не должен авто-завершать задачу даже если tester последний" ) @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_project_review_overrides_no_task_completion_mode(self, mock_run, mock_hooks, mock_followup, conn): """Project execution_mode='review' + задача без override → pipeline завершается в 'review'. Сценарий: PM выбрал auto_complete, но проект настроен на 'review' (ручной override человека). Задача не имеет task-level execution_mode, поэтому get_effective_mode возвращает project-level 'review'. """ mock_run.return_value = _mock_claude_success({"result": "ok"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} # Проект явно в 'review', задача без execution_mode models.update_project(conn, "vdol", execution_mode="review") # task VDOL-001 создана без execution_mode (None) — fixture steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True assert result["mode"] == "review" task = models.get_task(conn, "VDOL-001") assert task["status"] == "review", ( "KIN-063: project-level 'review' должен применяться когда задача не имеет override" ) @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_auto_complete_not_broken_by_revise_comment(self, mock_run, mock_hooks, mock_followup, conn): """Регрессия KIN-045: revise_comment в задаче не ломает auto_complete flow. Задача прошла ревизию (revise_comment != None, status=in_progress), затем повторно запускается пайплайн в auto_complete режиме. Последний шаг — tester → задача должна получить status='done'. """ mock_run.return_value = _mock_claude_success({"result": "all tests pass"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} models.update_project(conn, "vdol", execution_mode="auto_complete") models.update_task( conn, "VDOL-001", status="in_progress", revise_comment="Добавь тест для пустого массива", ) steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "done", ( "KIN-045: revise_comment не должен мешать auto_complete авто-завершению" ) # --------------------------------------------------------------------------- # KIN-048: _run_autocommit — флаг, git path, env= # --------------------------------------------------------------------------- class TestAutocommit: """KIN-048: _run_autocommit — autocommit_enabled флаг, shutil.which, env= regression.""" def test_disabled_project_skips_subprocess(self, conn): """autocommit_enabled=0 (дефолт) → subprocess не вызывается.""" with patch("agents.runner.subprocess.run") as mock_run: _run_autocommit(conn, "VDOL-001", "vdol") mock_run.assert_not_called() @patch("agents.runner.subprocess.run") @patch("agents.runner.shutil.which") def test_enabled_calls_git_add_and_commit(self, mock_which, mock_run, conn, tmp_path): """autocommit_enabled=1 → вызываются git add -A и git commit с task_id и title.""" mock_which.return_value = "/usr/bin/git" mock_run.return_value = MagicMock(returncode=0) models.update_project(conn, "vdol", autocommit_enabled=1, path=str(tmp_path)) _run_autocommit(conn, "VDOL-001", "vdol") assert mock_run.call_count == 2 add_cmd = mock_run.call_args_list[0][0][0] assert add_cmd == ["/usr/bin/git", "add", "-A"] commit_cmd = mock_run.call_args_list[1][0][0] assert commit_cmd[0] == "/usr/bin/git" assert commit_cmd[1] == "commit" assert "VDOL-001" in commit_cmd[-1] assert "Fix bug" in commit_cmd[-1] @patch("agents.runner.subprocess.run") def test_nothing_to_commit_no_exception(self, mock_run, conn, tmp_path): """returncode=1 (nothing to commit) → исключение не бросается.""" mock_run.return_value = MagicMock(returncode=1) models.update_project(conn, "vdol", autocommit_enabled=1, path=str(tmp_path)) _run_autocommit(conn, "VDOL-001", "vdol") # must not raise @patch("agents.runner.subprocess.run") def test_passes_env_to_subprocess(self, mock_run, conn, tmp_path): """Regression #33: env= должен передаваться в subprocess.run.""" mock_run.return_value = MagicMock(returncode=0) models.update_project(conn, "vdol", autocommit_enabled=1, path=str(tmp_path)) _run_autocommit(conn, "VDOL-001", "vdol") for call in mock_run.call_args_list: kwargs = call[1] assert "env" in kwargs, "Regression #33: subprocess.run должен получать env=" assert "/opt/homebrew/bin" in kwargs["env"].get("PATH", "") @patch("agents.runner.subprocess.run") @patch("agents.runner.shutil.which") def test_resolves_git_via_shutil_which(self, mock_which, mock_run, conn, tmp_path): """Regression #32: git резолвится через shutil.which, а не hardcoded 'git'.""" mock_which.return_value = "/opt/homebrew/bin/git" mock_run.return_value = MagicMock(returncode=0) models.update_project(conn, "vdol", autocommit_enabled=1, path=str(tmp_path)) _run_autocommit(conn, "VDOL-001", "vdol") git_which_calls = [c for c in mock_which.call_args_list if c[0][0] == "git"] assert len(git_which_calls) > 0, "Regression #32: shutil.which должен вызываться для git" first_cmd = mock_run.call_args_list[0][0][0] assert first_cmd[0] == "/opt/homebrew/bin/git" @patch("agents.runner.subprocess.run") @patch("agents.runner.shutil.which") def test_git_not_found_no_crash_logs_warning(self, mock_which, mock_run, conn, tmp_path): """shutil.which(git) → None → fallback 'git' → FileNotFoundError → no crash, WARNING logged.""" mock_which.return_value = None # git не найден в PATH mock_run.side_effect = FileNotFoundError("git: command not found") models.update_project(conn, "vdol", autocommit_enabled=1, path=str(tmp_path)) with patch("agents.runner._logger") as mock_logger: _run_autocommit(conn, "VDOL-001", "vdol") # не должен бросать исключение mock_logger.warning.assert_called_once() @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_autocommit_not_called_on_failed_pipeline(self, mock_run, mock_autocommit, conn): """Pipeline failure → _run_autocommit must NOT be called (gotcha #41).""" mock_run.return_value = _mock_claude_failure("compilation error") steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False mock_autocommit.assert_not_called() # --------------------------------------------------------------------------- # KIN-055: execution_mode='review' при переводе задачи в статус review # --------------------------------------------------------------------------- class TestReviewModeExecutionMode: """Регрессия KIN-055: execution_mode должен быть 'review', а не NULL после pipeline в review mode.""" def test_task_execution_mode_is_null_before_pipeline(self, conn): """Граничный случай: execution_mode IS NULL до запуска pipeline (задача только создана).""" task = models.get_task(conn, "VDOL-001") assert task["execution_mode"] is None, ( "Задача должна иметь NULL execution_mode до выполнения pipeline" ) @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_review_mode_sets_execution_mode_review(self, mock_run, mock_hooks, conn): """После pipeline в review mode task.execution_mode должно быть 'review', а не NULL.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] steps = [{"role": "debugger", "brief": "find bug"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "review" # Регрессионная проверка KIN-055: execution_mode не должен быть NULL assert task["execution_mode"] is not None, ( "Регрессия KIN-055: execution_mode не должен быть NULL после перевода задачи в статус review" ) assert task["execution_mode"] == "review" @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_review_mode_execution_mode_persisted_in_db(self, mock_run, mock_hooks, conn): """execution_mode='review' должно сохраняться в SQLite напрямую, минуя ORM-слой.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] steps = [{"role": "debugger", "brief": "find"}] run_pipeline(conn, "VDOL-001", steps) row = conn.execute( "SELECT execution_mode FROM tasks WHERE id='VDOL-001'" ).fetchone() assert row is not None assert row["execution_mode"] == "review", ( "Регрессия KIN-055: execution_mode должен быть 'review' в SQLite после pipeline" ) # --------------------------------------------------------------------------- # KIN-021: Audit log for --dangerously-skip-permissions # --------------------------------------------------------------------------- class TestAuditLogDangerousSkip: @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_audit_log_written_on_permission_retry( self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn ): """При retry с --dangerously-skip-permissions записывается событие в audit_log.""" permission_fail = _mock_claude_failure("permission denied: cannot write file") retry_success = _mock_claude_success({"result": "fixed"}) mock_run.side_effect = [permission_fail, retry_success] mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} mock_learn.return_value = {"added": 0, "skipped": 0} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True # Проверяем audit_log через прямой SQL rows = conn.execute( "SELECT * FROM audit_log WHERE task_id='VDOL-001'" ).fetchall() assert len(rows) == 1 assert rows[0]["event_type"] == "dangerous_skip" assert rows[0]["step_id"] == "debugger" assert "debugger" in rows[0]["reason"] @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_dangerously_skipped_flag_set_on_task( self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn ): """tasks.dangerously_skipped=1 после retry с --dangerously-skip-permissions.""" permission_fail = _mock_claude_failure("permission denied: cannot write file") retry_success = _mock_claude_success({"result": "fixed"}) mock_run.side_effect = [permission_fail, retry_success] mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} mock_learn.return_value = {"added": 0, "skipped": 0} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "debugger", "brief": "find"}] run_pipeline(conn, "VDOL-001", steps) # Верификация через прямой SQL (минуя ORM) row = conn.execute( "SELECT dangerously_skipped FROM tasks WHERE id='VDOL-001'" ).fetchone() assert row is not None assert row["dangerously_skipped"] == 1 @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_no_audit_log_in_review_mode(self, mock_run, mock_hooks, conn): """В review mode retry не происходит, audit_log остаётся пустым.""" permission_fail = _mock_claude_failure("permission denied: cannot write file") mock_run.return_value = permission_fail mock_hooks.return_value = [] steps = [{"role": "debugger", "brief": "find"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False rows = conn.execute( "SELECT * FROM audit_log WHERE task_id='VDOL-001'" ).fetchall() assert len(rows) == 0 @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_audit_log_no_entry_on_normal_success( self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn ): """При успешном выполнении без retry audit_log не записывается.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} mock_learn.return_value = {"added": 0, "skipped": 0} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True rows = conn.execute( "SELECT * FROM audit_log WHERE task_id='VDOL-001'" ).fetchall() assert len(rows) == 0 # --------------------------------------------------------------------------- # KIN-016: Blocked Protocol # --------------------------------------------------------------------------- class TestParseAgentBlocked: def test_returns_none_on_failure(self): result = {"success": False, "output": {"status": "blocked", "reason": "no access"}} assert _parse_agent_blocked(result) is None def test_returns_none_when_output_not_dict(self): result = {"success": True, "output": "plain text output"} assert _parse_agent_blocked(result) is None def test_returns_none_when_status_not_blocked(self): result = {"success": True, "output": {"status": "done", "changes": []}} assert _parse_agent_blocked(result) is None def test_detects_status_blocked(self): result = {"success": True, "output": {"status": "blocked", "reason": "no file access"}} blocked = _parse_agent_blocked(result) assert blocked is not None assert blocked["reason"] == "no file access" assert blocked["blocked_at"] is not None def test_detects_verdict_blocked(self): """reviewer.md uses verdict: blocked instead of status: blocked.""" result = {"success": True, "output": {"verdict": "blocked", "blocked_reason": "unreadable"}} blocked = _parse_agent_blocked(result) assert blocked is not None assert blocked["reason"] == "unreadable" def test_uses_provided_blocked_at(self): result = {"success": True, "output": { "status": "blocked", "reason": "out of scope", "blocked_at": "2026-03-16T10:00:00", }} blocked = _parse_agent_blocked(result) assert blocked["blocked_at"] == "2026-03-16T10:00:00" def test_falls_back_blocked_at_if_missing(self): result = {"success": True, "output": {"status": "blocked", "reason": "x"}} blocked = _parse_agent_blocked(result) assert "T" in blocked["blocked_at"] # ISO-8601 with T separator def test_does_not_check_nested_status(self): """Nested status='blocked' in sub-fields must NOT trigger blocked protocol.""" result = {"success": True, "output": { "status": "done", "changes": [{"file": "a.py", "status": "blocked"}], # nested — must be ignored }} assert _parse_agent_blocked(result) is None class TestPipelineBlockedProtocol: @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_pipeline_stops_on_semantic_blocked(self, mock_run, mock_autocommit, conn): """KIN-016: когда агент возвращает status='blocked', пайплайн останавливается.""" # First step returns semantic blocked mock_run.return_value = _mock_claude_success({ "result": json.dumps({"status": "blocked", "reason": "cannot access external API"}), }) steps = [ {"role": "debugger", "brief": "find bug"}, {"role": "tester", "brief": "verify"}, ] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False assert result["steps_completed"] == 0 assert "blocked" in result["error"] assert result["blocked_by"] == "debugger" assert result["blocked_reason"] == "cannot access external API" # Task marked as blocked with enriched fields task = models.get_task(conn, "VDOL-001") assert task["status"] == "blocked" assert task["blocked_reason"] == "cannot access external API" assert task["blocked_agent_role"] == "debugger" assert task["blocked_pipeline_step"] == "1" assert task["blocked_at"] is not None # Pipeline marked as failed pipe = conn.execute("SELECT * FROM pipelines WHERE task_id='VDOL-001'").fetchone() assert pipe["status"] == "failed" @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_pipeline_blocks_on_second_step(self, mock_run, mock_autocommit, conn): """KIN-016: blocked на шаге 2 → steps_completed=1, pipeline_step='2'.""" mock_run.side_effect = [ _mock_claude_success({"result": json.dumps({"status": "done", "changes": []})}), _mock_claude_success({"result": json.dumps({ "status": "blocked", "reason": "test environment unavailable", })}), ] steps = [ {"role": "backend_dev", "brief": "implement"}, {"role": "tester", "brief": "test"}, ] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False assert result["steps_completed"] == 1 assert result["blocked_by"] == "tester" task = models.get_task(conn, "VDOL-001") assert task["blocked_agent_role"] == "tester" assert task["blocked_pipeline_step"] == "2" @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_reviewer_verdict_blocked_stops_pipeline(self, mock_run, mock_autocommit, conn): """KIN-016: reviewer возвращает verdict='blocked' → пайплайн останавливается.""" mock_run.return_value = _mock_claude_success({ "result": json.dumps({ "verdict": "blocked", "status": "blocked", "reason": "cannot read implementation files", }), }) steps = [{"role": "reviewer", "brief": "review"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is False assert result["blocked_by"] == "reviewer" task = models.get_task(conn, "VDOL-001") assert task["status"] == "blocked" assert task["blocked_agent_role"] == "reviewer" # --------------------------------------------------------------------------- # KIN-071: _save_sysadmin_output # --------------------------------------------------------------------------- class TestSaveSysadminOutput: """KIN-071: _save_sysadmin_output парсит и сохраняет decisions + modules.""" @pytest.fixture def ops_conn(self): c = init_db(":memory:") models.create_project( c, "srv", "Server", "", project_type="operations", ssh_host="10.0.0.1", ) models.create_task(c, "SRV-001", "srv", "Scan server") yield c c.close() def test_saves_decisions_and_modules(self, ops_conn): """KIN-071: sysadmin output корректно сохраняет decisions и modules.""" from agents.runner import _save_sysadmin_output output = { "status": "done", "decisions": [ {"type": "gotcha", "title": "Port 8080 open", "description": "nginx on 8080", "tags": ["server"]}, {"type": "decision", "title": "Docker used", "description": "docker 24.0", "tags": ["docker"]}, ], "modules": [ {"name": "nginx", "type": "service", "path": "/etc/nginx", "description": "web proxy"}, ], } result = _save_sysadmin_output( ops_conn, "srv", "SRV-001", {"raw_output": json.dumps(output)} ) assert result["decisions_added"] == 2 assert result["modules_added"] == 1 decisions = models.get_decisions(ops_conn, "srv") assert len(decisions) == 2 modules = models.get_modules(ops_conn, "srv") assert len(modules) == 1 assert modules[0]["name"] == "nginx" def test_idempotent_on_duplicate_decisions(self, ops_conn): """KIN-071: повторный вызов не создаёт дублей.""" from agents.runner import _save_sysadmin_output output = { "decisions": [ {"type": "gotcha", "title": "Port 8080 open", "description": "nginx on 8080"}, ], "modules": [], } r1 = _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": json.dumps(output)}) r2 = _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": json.dumps(output)}) assert r1["decisions_added"] == 1 assert r2["decisions_added"] == 0 # deduped assert r2["decisions_skipped"] == 1 def test_idempotent_on_duplicate_modules(self, ops_conn): """KIN-071: повторный вызов не создаёт дублей модулей.""" from agents.runner import _save_sysadmin_output output = { "decisions": [], "modules": [{"name": "nginx", "type": "service", "path": "/etc/nginx"}], } r1 = _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": json.dumps(output)}) r2 = _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": json.dumps(output)}) assert r1["modules_added"] == 1 assert r2["modules_skipped"] == 1 assert len(models.get_modules(ops_conn, "srv")) == 1 def test_handles_non_json_output(self, ops_conn): """KIN-071: не-JSON вывод не вызывает исключения.""" from agents.runner import _save_sysadmin_output result = _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": "not json"}) assert result["decisions_added"] == 0 assert result["modules_added"] == 0 def test_handles_empty_output(self, ops_conn): """KIN-071: пустой вывод не вызывает исключения.""" from agents.runner import _save_sysadmin_output result = _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": ""}) assert result["decisions_added"] == 0 def test_full_sysadmin_output_format_saves_docker_and_systemctl_as_decisions(self, ops_conn): """KIN-071: полный формат вывода sysadmin (docker ps + systemctl) → decisions + modules.""" from agents.runner import _save_sysadmin_output # Симуляция реального вывода sysadmin-агента после docker ps и systemctl output = { "status": "done", "summary": "Ubuntu 22.04, nginx + postgres + app in docker", "os": "Ubuntu 22.04 LTS, kernel 5.15.0", "services": [ {"name": "nginx", "type": "systemd", "status": "running", "note": "web proxy"}, {"name": "myapp", "type": "docker", "image": "myapp:1.2.3", "ports": ["80:8080"]}, {"name": "postgres", "type": "docker", "image": "postgres:15", "ports": ["5432:5432"]}, ], "open_ports": [ {"port": 80, "proto": "tcp", "process": "nginx"}, {"port": 5432, "proto": "tcp", "process": "postgres"}, ], "decisions": [ { "type": "gotcha", "title": "nginx proxies to docker app on 8080", "description": "nginx.conf proxy_pass http://localhost:8080", "tags": ["nginx", "docker"], }, { "type": "decision", "title": "postgres data on /var/lib/postgresql", "description": "Volume mount /var/lib/postgresql/data persists DB", "tags": ["postgres", "storage"], }, ], "modules": [ { "name": "nginx", "type": "service", "path": "/etc/nginx", "description": "Reverse proxy", "owner_role": "sysadmin", }, { "name": "myapp", "type": "docker", "path": "/opt/myapp", "description": "Main application container", }, { "name": "postgres", "type": "docker", "path": "/var/lib/postgresql", "description": "Database", }, ], } result = _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": json.dumps(output)}) assert result["decisions_added"] == 2 assert result["modules_added"] == 3 decisions = models.get_decisions(ops_conn, "srv") d_titles = {d["title"] for d in decisions} assert "nginx proxies to docker app on 8080" in d_titles assert "postgres data on /var/lib/postgresql" in d_titles modules = models.get_modules(ops_conn, "srv") m_names = {m["name"] for m in modules} assert {"nginx", "myapp", "postgres"} == m_names def test_invalid_decision_type_normalized_to_decision(self, ops_conn): """KIN-071: тип 'workaround' не входит в VALID_DECISION_TYPES → нормализуется в 'decision'.""" from agents.runner import _save_sysadmin_output output = { "decisions": [ { "type": "workaround", "title": "Use /proc/net for port list", "description": "ss not installed, fallback to /proc/net/tcp", "tags": ["networking"], }, ], "modules": [], } _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": json.dumps(output)}) decisions = models.get_decisions(ops_conn, "srv") assert len(decisions) == 1 assert decisions[0]["type"] == "decision" def test_decision_missing_title_skipped(self, ops_conn): """KIN-071: decision без title пропускается.""" from agents.runner import _save_sysadmin_output output = { "decisions": [ {"type": "gotcha", "title": "", "description": "Something"}, ], "modules": [], } result = _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": json.dumps(output)}) assert result["decisions_added"] == 0 def test_module_missing_name_skipped(self, ops_conn): """KIN-071: module без name пропускается.""" from agents.runner import _save_sysadmin_output output = { "decisions": [], "modules": [ {"name": "", "type": "service", "path": "/etc/something"}, ], } result = _save_sysadmin_output(ops_conn, "srv", "SRV-001", {"raw_output": json.dumps(output)}) assert result["modules_added"] == 0 # --------------------------------------------------------------------------- # KIN-003: _get_changed_files — вычисление изменённых git-файлов # --------------------------------------------------------------------------- class TestGetChangedFiles: """Тесты для _get_changed_files(project_path) из agents/runner.py (KIN-003).""" @patch("agents.runner.subprocess.run") def test_returns_files_from_git_diff(self, mock_run): """Возвращает список файлов из git diff --name-only.""" proc = MagicMock() proc.returncode = 0 proc.stdout = "web/frontend/App.vue\ncore/models.py\n" mock_run.return_value = proc result = _get_changed_files("/tmp/fake-project") assert isinstance(result, list) assert "web/frontend/App.vue" in result assert "core/models.py" in result @patch("agents.runner.subprocess.run") def test_returns_empty_list_on_exception(self, mock_run): """При ошибке git (не найден, не репозиторий) возвращает [].""" mock_run.side_effect = Exception("git not found") result = _get_changed_files("/tmp/fake-project") assert result == [] @patch("agents.runner.subprocess.run") def test_deduplicates_files_from_multiple_git_commands(self, mock_run): """Один файл из нескольких git-команд появляется в результате только один раз.""" proc = MagicMock() proc.returncode = 0 proc.stdout = "web/frontend/App.vue\n" mock_run.return_value = proc # все 3 git-команды возвращают одно и то же result = _get_changed_files("/tmp/fake-project") assert result.count("web/frontend/App.vue") == 1, ( "Дубликаты из разных git-команд должны дедуплицироваться" ) @patch("agents.runner.subprocess.run") def test_combines_files_from_different_git_commands(self, mock_run): """Файлы из трёх разных git-команд объединяются в один список.""" mock_run.side_effect = [ MagicMock(returncode=0, stdout="web/frontend/App.vue\n"), MagicMock(returncode=0, stdout="core/models.py\n"), MagicMock(returncode=0, stdout="agents/runner.py\n"), ] result = _get_changed_files("/tmp/fake-project") assert "web/frontend/App.vue" in result assert "core/models.py" in result assert "agents/runner.py" in result @patch("agents.runner.subprocess.run") def test_skips_failed_git_command_and_continues(self, mock_run): """Упавшая git-команда (returncode != 0) не блокирует остальные.""" fail_proc = MagicMock(returncode=1, stdout="") success_proc = MagicMock(returncode=0, stdout="core/models.py\n") mock_run.side_effect = [fail_proc, success_proc, fail_proc] result = _get_changed_files("/tmp/fake-project") assert "core/models.py" in result @patch("agents.runner.subprocess.run") def test_strips_whitespace_from_file_paths(self, mock_run): """Пробелы и переносы вокруг имён файлов обрезаются.""" proc = MagicMock() proc.returncode = 0 proc.stdout = " web/frontend/App.vue \n core/models.py \n" mock_run.return_value = proc result = _get_changed_files("/tmp/fake-project") assert "web/frontend/App.vue" in result assert "core/models.py" in result assert " web/frontend/App.vue " not in result # --------------------------------------------------------------------------- # KIN-003: run_pipeline — передача changed_files в run_hooks # --------------------------------------------------------------------------- class TestPipelineChangedFiles: """Интеграционные тесты: pipeline вычисляет changed_files и передаёт в run_hooks.""" @patch("agents.runner._get_changed_files") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_pipeline_passes_changed_files_to_run_hooks( self, mock_run, mock_hooks, mock_get_files ): """run_pipeline передаёт changed_files в run_hooks(event='pipeline_completed'). Используем проект с path='/tmp' (реальная директория), чтобы _get_changed_files был вызван. """ c = init_db(":memory:") models.create_project(c, "kin-tmp", "KinTmp", "/tmp", tech_stack=["vue3"]) models.create_task(c, "KT-001", "kin-tmp", "Fix bug") mock_run.return_value = _mock_claude_success({"result": "done"}) mock_hooks.return_value = [] mock_get_files.return_value = ["web/frontend/App.vue", "core/models.py"] steps = [{"role": "debugger", "brief": "find bug"}] result = run_pipeline(c, "KT-001", steps) c.close() assert result["success"] is True mock_get_files.assert_called_once_with("/tmp") # pipeline_completed call должен содержать changed_files pipeline_calls = [ call for call in mock_hooks.call_args_list if call.kwargs.get("event") == "pipeline_completed" ] assert len(pipeline_calls) >= 1 kw = pipeline_calls[0].kwargs assert kw.get("changed_files") == ["web/frontend/App.vue", "core/models.py"] @patch("agents.runner._run_autocommit") @patch("core.hooks.subprocess.run") @patch("agents.runner._run_claude") def test_pipeline_completes_when_frontend_hook_build_fails( self, mock_run_claude, mock_hook_run, mock_autocommit ): """Ошибка сборки фронтенда (exitcode=1) не роняет pipeline (AC #3 KIN-003). Хук выполняется и возвращает failure, но pipeline.status = 'completed' и результат run_pipeline['success'] = True. Примечание: патчим _run_claude (не subprocess.run) чтобы не конфликтовать с core.hooks.subprocess.run — оба ссылаются на один и тот же subprocess.run. """ from core.hooks import create_hook c = init_db(":memory:") models.create_project(c, "kin-build", "KinBuild", "/tmp", tech_stack=["vue3"]) models.create_task(c, "KB-001", "kin-build", "Add feature") create_hook( c, "kin-build", "rebuild-frontend", "pipeline_completed", "/tmp/rebuild.sh", trigger_module_path=None, working_dir="/tmp", ) mock_run_claude.return_value = { "output": "done", "returncode": 0, "error": None, "empty_output": False, "tokens_used": None, "cost_usd": None, } # npm run build завершается с ошибкой fail_proc = MagicMock() fail_proc.returncode = 1 fail_proc.stdout = "" fail_proc.stderr = "Error: Cannot find module './App'" mock_hook_run.return_value = fail_proc steps = [{"role": "tester", "brief": "test feature"}] result = run_pipeline(c, "KB-001", steps) assert result["success"] is True, ( "Ошибка сборки хука не должна ронять pipeline" ) pipe = c.execute( "SELECT status FROM pipelines WHERE task_id='KB-001'" ).fetchone() assert pipe["status"] == "completed" c.close() @patch("agents.runner._run_autocommit") @patch("agents.runner.subprocess.run") def test_pipeline_changed_files_is_none_when_project_path_missing( self, mock_run, mock_autocommit, conn ): """Если путь проекта не существует, changed_files=None передаётся в run_hooks. Хуки по-прежнему запускаются, но без git-фильтра (task_modules fallback). """ # vdol path = ~/projects/vdolipoperek (не существует в CI) # Хук без trigger_module_path должен сработать from core.hooks import create_hook, get_hook_logs create_hook(conn, "vdol", "always", "pipeline_completed", "echo ok", trigger_module_path=None, working_dir="/tmp") mock_run.return_value = _mock_claude_success({"result": "done"}) build_proc = MagicMock(returncode=0, stdout="ok", stderr="") with patch("core.hooks.subprocess.run", return_value=build_proc): steps = [{"role": "tester", "brief": "test"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True # Хук без фильтра должен был выполниться logs = get_hook_logs(conn, project_id="vdol") assert len(logs) >= 1 # --------------------------------------------------------------------------- # _save_sysadmin_output — KIN-081 # --------------------------------------------------------------------------- class TestSaveSysadminOutput: def test_modules_added_count_for_new_modules(self, conn): """KIN-081: _save_sysadmin_output считает modules_added правильно через _created.""" result = { "raw_output": json.dumps({ "modules": [ {"name": "nginx", "type": "infra", "path": "/etc/nginx", "description": "Web server"}, {"name": "postgres", "type": "infra", "path": "/var/lib/postgresql", "description": "Database"}, ], "decisions": [], }) } counts = _save_sysadmin_output(conn, "vdol", "VDOL-001", result) assert counts["modules_added"] == 2 assert counts["modules_skipped"] == 0 def test_modules_skipped_count_for_duplicate_names(self, conn): """KIN-081: повторный вызов с теми же модулями: added=0, skipped=2.""" raw = json.dumps({ "modules": [ {"name": "nginx", "type": "infra", "path": "/etc/nginx"}, {"name": "postgres", "type": "infra", "path": "/var/lib/postgresql"}, ], "decisions": [], }) result = {"raw_output": raw} # First call — adds _save_sysadmin_output(conn, "vdol", "VDOL-001", result) # Second call — all duplicates counts = _save_sysadmin_output(conn, "vdol", "VDOL-001", result) assert counts["modules_added"] == 0 assert counts["modules_skipped"] == 2 def test_empty_output_returns_zeros(self, conn): """_save_sysadmin_output с не-JSON строкой возвращает нули.""" counts = _save_sysadmin_output(conn, "vdol", "VDOL-001", {"raw_output": "Agent completed the task."}) assert counts == { "decisions_added": 0, "decisions_skipped": 0, "modules_added": 0, "modules_skipped": 0, } def test_decisions_added_and_skipped(self, conn): """_save_sysadmin_output дедуплицирует decisions через add_decision_if_new.""" raw = json.dumps({ "modules": [], "decisions": [ {"type": "convention", "title": "Use WAL mode", "description": "PRAGMA journal_mode=WAL for SQLite"}, ], }) result = {"raw_output": raw} counts1 = _save_sysadmin_output(conn, "vdol", "VDOL-001", result) assert counts1["decisions_added"] == 1 assert counts1["decisions_skipped"] == 0 counts2 = _save_sysadmin_output(conn, "vdol", "VDOL-001", result) assert counts2["decisions_added"] == 0 assert counts2["decisions_skipped"] == 1 # --------------------------------------------------------------------------- # check_claude_auth # --------------------------------------------------------------------------- class TestCheckClaudeAuth: """Tests for check_claude_auth() — Claude CLI login healthcheck.""" @patch("agents.runner.subprocess.run") def test_ok_when_returncode_zero(self, mock_run): """Не бросает исключение при returncode=0 и корректном JSON.""" mock = MagicMock() mock.stdout = json.dumps({"result": "ok"}) mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock check_claude_auth() # должна вернуть None без исключений @patch("agents.runner.subprocess.run") def test_not_logged_in_via_string_in_stdout(self, mock_run): """Бросает ClaudeAuthError при 'Not logged in' в stdout.""" mock = MagicMock() mock.stdout = "Not logged in" mock.stderr = "" mock.returncode = 1 mock_run.return_value = mock with pytest.raises(ClaudeAuthError) as exc_info: check_claude_auth() assert "login" in str(exc_info.value).lower() @patch("agents.runner.subprocess.run") def test_not_logged_in_case_insensitive(self, mock_run): """Бросает ClaudeAuthError при 'not logged in' в любом регистре.""" mock = MagicMock() mock.stdout = "" mock.stderr = "Error: NOT LOGGED IN to Claude" mock.returncode = 1 mock_run.return_value = mock with pytest.raises(ClaudeAuthError): check_claude_auth() @patch("agents.runner.subprocess.run") def test_not_logged_in_via_string_in_stderr(self, mock_run): """Бросает ClaudeAuthError при 'Not logged in' в stderr.""" mock = MagicMock() mock.stdout = "" mock.stderr = "Error: Not logged in to Claude" mock.returncode = 1 mock_run.return_value = mock with pytest.raises(ClaudeAuthError): check_claude_auth() @patch("agents.runner.subprocess.run") def test_not_logged_in_via_nonzero_returncode(self, mock_run): """Бросает ClaudeAuthError при ненулевом returncode (без 'Not logged in' текста).""" mock = MagicMock() mock.stdout = "" mock.stderr = "Some other error" mock.returncode = 1 mock_run.return_value = mock with pytest.raises(ClaudeAuthError): check_claude_auth() @patch("agents.runner.subprocess.run") def test_not_logged_in_via_is_error_in_json(self, mock_run): """Бросает ClaudeAuthError при is_error=true в JSON даже с returncode=0.""" mock = MagicMock() mock.stdout = json.dumps({"is_error": True, "result": "authentication required"}) mock.stderr = "" mock.returncode = 0 mock_run.return_value = mock with pytest.raises(ClaudeAuthError): check_claude_auth() @patch("agents.runner.subprocess.run", side_effect=FileNotFoundError) def test_raises_when_cli_not_found(self, mock_run): """При FileNotFoundError бросает ClaudeAuthError с понятным сообщением.""" with pytest.raises(ClaudeAuthError) as exc_info: check_claude_auth() assert "PATH" in str(exc_info.value) or "not found" in str(exc_info.value).lower() @patch("agents.runner.subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="claude", timeout=10)) def test_ok_when_timeout(self, mock_run): """При TimeoutExpired не бросает исключение (не блокируем на timeout).""" check_claude_auth() # должна вернуть None без исключений # --------------------------------------------------------------------------- # KIN-OBS-030: PM-шаг инструментирован в pipeline_log # --------------------------------------------------------------------------- class TestPMStepPipelineLog: """Проверяет, что PM-шаг записывается в pipeline_log после run_pipeline.""" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner.subprocess.run") def test_pm_log_entry_written_when_pm_result_provided( self, mock_run, mock_learn, mock_autocommit, conn ): """Если pm_result передан в run_pipeline, в pipeline_log появляется запись PM-шага.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_learn.return_value = {"added": 0, "skipped": 0} pm_result = {"success": True, "duration_seconds": 5, "tokens_used": 1000, "cost_usd": 0.01} steps = [{"role": "debugger", "brief": "find bug"}] run_pipeline( conn, "VDOL-001", steps, pm_result=pm_result, pm_started_at="2026-03-17T10:00:00", pm_ended_at="2026-03-17T10:00:05", ) # Должно быть 2 записи: PM start + PM done start_logs = conn.execute( "SELECT * FROM pipeline_log WHERE message='PM start: task planning'" ).fetchall() done_logs = conn.execute( "SELECT * FROM pipeline_log WHERE message LIKE 'PM done:%'" ).fetchall() assert len(start_logs) == 1 assert len(done_logs) == 1 @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner.subprocess.run") def test_pm_log_entry_has_correct_pipeline_id( self, mock_run, mock_learn, mock_autocommit, conn ): """pipeline_id в PM-записи pipeline_log совпадает с реальным pipeline.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_learn.return_value = {"added": 0, "skipped": 0} pm_result = {"success": True, "duration_seconds": 3, "tokens_used": 800, "cost_usd": 0.008} steps = [{"role": "debugger", "brief": "find bug"}] run_pipeline( conn, "VDOL-001", steps, pm_result=pm_result, pm_started_at="2026-03-17T10:00:00", pm_ended_at="2026-03-17T10:00:03", ) pipeline = conn.execute("SELECT * FROM pipelines WHERE task_id='VDOL-001'").fetchone() assert pipeline is not None pm_log = conn.execute( "SELECT * FROM pipeline_log WHERE message='PM start: task planning'" ).fetchone() assert pm_log is not None assert pm_log["pipeline_id"] == pipeline["id"] @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner.subprocess.run") def test_pm_log_entry_has_step_pm_in_extra( self, mock_run, mock_learn, mock_autocommit, conn ): """extra_json PM-записи содержит role='pm' и корректные метрики выполнения.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_learn.return_value = {"added": 0, "skipped": 0} pm_result = {"success": True, "duration_seconds": 7, "tokens_used": 1500, "cost_usd": 0.02} steps = [{"role": "debugger", "brief": "find bug"}] run_pipeline( conn, "VDOL-001", steps, pm_result=pm_result, pm_started_at="2026-03-17T10:00:00", pm_ended_at="2026-03-17T10:00:07", ) row = conn.execute( "SELECT extra_json, ts FROM pipeline_log WHERE message LIKE 'PM done:%'" ).fetchone() assert row is not None extra = json.loads(row["extra_json"]) assert extra["role"] == "pm" assert extra["steps_count"] == 1 assert extra["tokens_used"] == 1500 assert extra["cost_usd"] == 0.02 # ts должен совпадать с pm_ended_at assert row["ts"] == "2026-03-17T10:00:07" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner.subprocess.run") def test_pm_log_not_written_when_pm_result_is_none( self, mock_run, mock_learn, mock_autocommit, conn ): """Если pm_result не передан (None), записи PM-шага в pipeline_log нет.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_learn.return_value = {"added": 0, "skipped": 0} steps = [{"role": "debugger", "brief": "find bug"}] run_pipeline(conn, "VDOL-001", steps) # pm_result=None по умолчанию pm_logs = conn.execute( "SELECT * FROM pipeline_log WHERE message='PM start: task planning' OR message LIKE 'PM done:%'" ).fetchall() assert len(pm_logs) == 0 @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner.subprocess.run") def test_pm_log_not_written_for_sub_pipeline( self, mock_run, mock_learn, mock_autocommit, conn ): """PM-лог НЕ записывается в sub-pipeline (parent_pipeline_id задан).""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_learn.return_value = {"added": 0, "skipped": 0} # Сначала создаём родительский pipeline parent_pipeline = models.create_pipeline(conn, "VDOL-001", "vdol", "linear", []) pm_result = {"success": True, "duration_seconds": 4, "tokens_used": 900, "cost_usd": 0.009} steps = [{"role": "debugger", "brief": "find bug"}] run_pipeline( conn, "VDOL-001", steps, pm_result=pm_result, pm_started_at="2026-03-17T10:00:00", pm_ended_at="2026-03-17T10:00:04", parent_pipeline_id=parent_pipeline["id"], ) pm_logs = conn.execute( "SELECT * FROM pipeline_log WHERE message='PM start: task planning' OR message LIKE 'PM done:%'" ).fetchall() assert len(pm_logs) == 0 @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner.subprocess.run") def test_pm_log_no_orphan_records( self, mock_run, mock_learn, mock_autocommit, conn ): """FK integrity: все записи pipeline_log ссылаются на существующий pipeline.""" mock_run.return_value = _mock_claude_success({"result": "done"}) mock_learn.return_value = {"added": 0, "skipped": 0} pm_result = {"success": True, "duration_seconds": 2, "tokens_used": 500, "cost_usd": 0.005} steps = [{"role": "debugger", "brief": "find bug"}] run_pipeline( conn, "VDOL-001", steps, pm_result=pm_result, pm_started_at="2026-03-17T10:00:00", pm_ended_at="2026-03-17T10:00:02", ) # Проверяем FK через JOIN — orphan-записей не должно быть orphans = conn.execute( """SELECT pl.id FROM pipeline_log pl LEFT JOIN pipelines p ON pl.pipeline_id = p.id WHERE p.id IS NULL""" ).fetchall() assert len(orphans) == 0 # --------------------------------------------------------------------------- # KIN-116: _detect_destructive_operations — unit tests # --------------------------------------------------------------------------- class TestDetectDestructiveOperations: """Unit tests for _detect_destructive_operations(). Tests observable behaviour: which patterns trigger detection, which don't. """ def _result(self, output): """Helper: make a successful result dict with given output text.""" return {"success": True, "raw_output": output, "output": ""} def test_returns_list_type(self): """_detect_destructive_operations должна возвращать list.""" assert isinstance(_detect_destructive_operations([]), list) def test_clean_output_returns_empty_list(self): """Нет деструктивных операций → пустой список (falsy).""" results = [self._result("All tests passed. 42 passed, 0 failed.")] assert _detect_destructive_operations(results) == [] def test_detects_rm_rf(self): """rm -rf /path → детектируется.""" results = [self._result("Cleaning up: rm -rf /tmp/build")] assert len(_detect_destructive_operations(results)) > 0 def test_detects_rm_r(self): """rm -r /path (recursive without force) → детектируется.""" results = [self._result("Removing: rm -r /tmp/old")] assert len(_detect_destructive_operations(results)) > 0 def test_detects_drop_table(self): """DROP TABLE → детектируется.""" results = [self._result("I will run: DROP TABLE users")] assert len(_detect_destructive_operations(results)) > 0 def test_detects_drop_database(self): """DROP DATABASE → детектируется.""" results = [self._result("DROP DATABASE legacy_db")] assert len(_detect_destructive_operations(results)) > 0 def test_detects_delete_from_no_where(self): """DELETE FROM без WHERE → детектируется.""" results = [self._result("DELETE FROM temp_table")] assert len(_detect_destructive_operations(results)) > 0 def test_detects_delete_from_with_where(self): """DELETE FROM с WHERE — НЕ деструктивная операция, не детектируется.""" results = [self._result("DELETE FROM sessions WHERE expired=1")] assert _detect_destructive_operations(results) == [] def test_detects_delete_from_with_where_complex(self): """DELETE FROM с WHERE и сложным условием — тоже не детектируется.""" results = [self._result("DELETE FROM logs WHERE created_at < '2024-01-01' AND user_id = 42")] assert _detect_destructive_operations(results) == [] def test_detects_unlink_shell(self): """unlink /path → детектируется.""" results = [self._result("unlink /tmp/lockfile")] assert len(_detect_destructive_operations(results)) > 0 def test_detects_shutil_rmtree(self): """shutil.rmtree() → детектируется.""" results = [self._result("shutil.rmtree(build_dir)")] assert len(_detect_destructive_operations(results)) > 0 def test_detects_os_remove(self): """os.remove() → детектируется.""" results = [self._result("os.remove(path)")] assert len(_detect_destructive_operations(results)) > 0 def test_detects_os_unlink(self): """os.unlink() → детектируется.""" results = [self._result("os.unlink(filename)")] assert len(_detect_destructive_operations(results)) > 0 def test_ignores_failed_results(self): """Успешность success=False → детектор пропускает такой шаг.""" results = [{"success": False, "raw_output": "rm -rf /tmp/x", "output": ""}] assert _detect_destructive_operations(results) == [] def test_empty_results_list(self): """Пустой список results → пустой список matches.""" assert _detect_destructive_operations([]) == [] def test_detects_pattern_in_output_field(self): """Детектор сканирует и поле output (не только raw_output).""" results = [{"success": True, "raw_output": "", "output": "shutil.rmtree('/tmp/x')"}] assert len(_detect_destructive_operations(results)) > 0 # --------------------------------------------------------------------------- # KIN-116: Pipeline — destructive ops guard in auto_complete mode # --------------------------------------------------------------------------- class TestDestructiveOpsPipeline: """run_pipeline должен принудительно ставить review при деструктивных операциях.""" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_destructive_output_forces_review_in_auto_complete( self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn ): """rm -rf в output → task.status=review даже при mode=auto_complete.""" mock_run.return_value = _mock_claude_success({"result": "Cleaning up: rm -rf /tmp/build"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} mock_learn.return_value = {"added": 0, "skipped": 0} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "verify"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "review", "destructive ops должны принудительно ставить review" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_destructive_output_pipeline_mode_downgraded_to_review( self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn ): """При деструктивных операциях run_pipeline возвращает mode='review'.""" mock_run.return_value = _mock_claude_success({"result": "shutil.rmtree(old_dir)"}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} mock_learn.return_value = {"added": 0, "skipped": 0} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "verify"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["mode"] == "review" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("core.followup.generate_followups") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_clean_output_auto_complete_sets_done( self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn ): """Без деструктивных операций auto_complete + tester last → status=done.""" mock_run.return_value = _mock_claude_success({"result": "All tests passed."}) mock_hooks.return_value = [] mock_followup.return_value = {"created": [], "pending_actions": []} mock_learn.return_value = {"added": 0, "skipped": 0} models.update_project(conn, "vdol", execution_mode="auto_complete") steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "verify"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "done" @patch("agents.runner._run_autocommit") @patch("agents.runner._run_learning_extraction") @patch("agents.runner.run_hooks") @patch("agents.runner.subprocess.run") def test_destructive_output_in_review_mode_stays_review( self, mock_run, mock_hooks, mock_learn, mock_autocommit, conn ): """В review mode деструктивные операции не меняют статус (уже review).""" mock_run.return_value = _mock_claude_success({"result": "DROP TABLE old_cache"}) mock_hooks.return_value = [] mock_learn.return_value = {"added": 0, "skipped": 0} # project остаётся в режиме "review" (по умолчанию) steps = [{"role": "debugger", "brief": "check"}] result = run_pipeline(conn, "VDOL-001", steps) assert result["success"] is True task = models.get_task(conn, "VDOL-001") assert task["status"] == "review"