kin: auto-commit after pipeline

This commit is contained in:
Gros Frumos 2026-03-17 22:24:05 +02:00
parent fb4dd03342
commit 99262fb920
2 changed files with 400 additions and 0 deletions

View file

@ -8,9 +8,12 @@ Business logic for project deployments:
"""
import shlex
import shutil
import sqlite3
import subprocess
import time
from datetime import datetime
from pathlib import Path
VALID_RUNTIMES = {"docker", "node", "python", "static"}
@ -24,6 +27,44 @@ RUNTIME_STEPS = {
}
def pre_deploy_backup(project: dict) -> dict:
"""Create a timestamped backup of the project's SQLite database before deploy.
Looks for *.db / *.sqlite files in deploy_path or path.
Returns {"status": "ok", "path": "..."} on success,
{"status": "skipped"} if no DB found,
{"status": "error", "reason": "..."} on failure.
Raises OSError/PermissionError on unrecoverable errors (propagated to caller).
"""
project_path = Path(project.get("deploy_path") or project.get("path") or "")
if not project_path or not project_path.exists():
return {"status": "skipped", "reason": "project path not found"}
# Look for SQLite DB files (kin.db first, then any *.db / *.sqlite)
candidates = []
for pattern in ("kin.db", "*.db", "*.sqlite"):
candidates.extend(project_path.glob(pattern))
# Deduplicate while preserving order
seen: set[Path] = set()
db_files = []
for f in candidates:
if f not in seen:
seen.add(f)
db_files.append(f)
if not db_files:
return {"status": "skipped"}
db_file = db_files[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = db_file.parent / f"{db_file.name}.{timestamp}.bak"
shutil.copy2(str(db_file), str(backup_path))
return {"status": "ok", "path": str(backup_path)}
def build_deploy_steps(project: dict) -> list[str]:
"""Build deploy command list based on runtime and project config.
@ -84,6 +125,27 @@ def execute_deploy(project: dict, conn: sqlite3.Connection) -> dict:
"error": "No deploy steps: deploy_runtime not set or invalid",
}
# Mandatory pre-deploy backup — deploy without backup is impossible
try:
backup_result = pre_deploy_backup(project)
except Exception as e:
return {
"success": False,
"steps": steps,
"results": [],
"backup": {"status": "error", "reason": str(e)},
"error": f"Pre-deploy backup failed: {e}",
}
if backup_result.get("status") == "error":
return {
"success": False,
"steps": steps,
"results": [],
"backup": backup_result,
"error": f"Pre-deploy backup failed: {backup_result.get('reason')}",
}
deploy_path = project.get("deploy_path") or project.get("path") or None
results = []
overall_success = True
@ -142,6 +204,7 @@ def execute_deploy(project: dict, conn: sqlite3.Connection) -> dict:
"success": overall_success,
"steps": steps,
"results": results,
"backup": backup_result,
}

View file

@ -11,6 +11,7 @@ from agents.runner import (
_build_claude_env, _resolve_claude_cmd, _EXTRA_PATH_DIRS, _run_autocommit,
_parse_agent_blocked, _get_changed_files, _save_sysadmin_output,
check_claude_auth, ClaudeAuthError, _MODEL_TIMEOUTS,
_detect_destructive_operations,
)
@ -583,6 +584,162 @@ class TestTryParseJson:
def test_json_array(self):
assert _try_parse_json('[1, 2, 3]') == [1, 2, 3]
def test_empty_array_returns_list_not_none(self):
"""Empty array [] must return [], not None — [] is falsy but valid JSON."""
result = _try_parse_json('[]')
assert result == []
assert result is not None
def test_empty_object_returns_dict_not_none(self):
"""Empty object {} must return {}, not None."""
result = _try_parse_json('{}')
assert result == {}
assert result is not None
# ---------------------------------------------------------------------------
# Falsy output preservation (empty array / empty dict from agent)
# ---------------------------------------------------------------------------
class TestRunAgentFalsyOutputPreservation:
"""Regression tests for bug: `parsed_output if parsed_output else output_text`
drops falsy values like [] and {} instead of preserving them.
Bug location: runner.py line ~236.
"""
@patch("agents.runner.subprocess.run")
def test_empty_array_output_preserved_not_replaced_by_raw_string(self, mock_run, conn):
"""When agent returns '[]', output must be [] (list), not the raw string '[]'."""
mock = MagicMock()
mock.stdout = json.dumps({"result": "[]"})
mock.stderr = ""
mock.returncode = 0
mock_run.return_value = mock
result = run_agent(conn, "tester", "VDOL-001", "vdol")
# _try_parse_json parses the outer envelope; result['result'] is the string '[]'
# which then gets parsed again → [] list
# The key check: output must be a list, NOT the raw string '[]'
assert result["output"] == [] or result["output"] == "[]", \
f"output should be [] or '[]', got {result['output']!r}"
# Stronger assertion: if the fix is in place, output == []
assert result["output"] == [], \
f"Fix not applied: empty array lost, got raw string {result['output']!r}"
@patch("agents.runner.subprocess.run")
def test_empty_array_top_level_output_preserved(self, mock_run, conn):
"""When claude stdout IS '[]' (top-level array), output must be [] not '[]'."""
mock = MagicMock()
mock.stdout = "[]"
mock.stderr = ""
mock.returncode = 0
mock_run.return_value = mock
result = run_agent(conn, "tester", "VDOL-001", "vdol")
# _try_parse_json('[]') returns [] (a list, falsy)
# Bug: `parsed_output if parsed_output else output_text` drops [] → returns '[]'
# Fix: `output_text if parsed_output is None else parsed_output`
assert result["output"] == [], \
f"Fix not applied: empty list dropped, got {result['output']!r}"
@patch("agents.runner.subprocess.run")
def test_empty_dict_top_level_output_preserved(self, mock_run, conn):
"""When claude stdout IS '{}', output must be {} not '{}'."""
mock = MagicMock()
mock.stdout = "{}"
mock.stderr = ""
mock.returncode = 0
mock_run.return_value = mock
result = run_agent(conn, "tester", "VDOL-001", "vdol")
assert result["output"] == {}, \
f"Fix not applied: empty dict dropped, got {result['output']!r}"
@patch("agents.runner.subprocess.run")
def test_nonempty_array_output_still_works(self, mock_run, conn):
"""Normal non-empty array output [1,2,3] must still be returned correctly."""
mock = MagicMock()
mock.stdout = "[1, 2, 3]"
mock.stderr = ""
mock.returncode = 0
mock_run.return_value = mock
result = run_agent(conn, "tester", "VDOL-001", "vdol")
assert result["output"] == [1, 2, 3]
@patch("agents.runner.subprocess.run")
def test_none_parsed_output_falls_back_to_raw_string(self, mock_run, conn):
"""When output is not JSON at all, raw string must be returned."""
mock = MagicMock()
mock.stdout = "this is plain text"
mock.stderr = ""
mock.returncode = 0
mock_run.return_value = mock
result = run_agent(conn, "tester", "VDOL-001", "vdol")
assert result["output"] == "this is plain text"
# ---------------------------------------------------------------------------
# _run_claude falsy result field (empty array in 'result' key)
# ---------------------------------------------------------------------------
class TestRunClaudeEmptyResultField:
"""Regression tests for bug in _run_claude: `parsed.get('result') or parsed.get('content')`
drops empty array/dict in 'result' and falls through to 'content'.
Bug location: runner.py line ~313.
"""
@patch("agents.runner.subprocess.run")
def test_empty_array_in_result_field_not_lost(self, mock_run, conn):
"""When claude returns {"result": []}, output must be '[]', not content fallback."""
mock = MagicMock()
# Claude envelope: result field contains empty array
mock.stdout = json.dumps({"result": [], "content": "fallback text"})
mock.stderr = ""
mock.returncode = 0
mock_run.return_value = mock
result = run_agent(conn, "tester", "VDOL-001", "vdol")
# Bug: `parsed.get("result") or parsed.get("content")` → "fallback text"
# Fix: explicit None check → "[]" (json.dumps([]))
raw = result["raw_output"]
assert "fallback text" not in raw or raw == json.dumps({
"result": [], "content": "fallback text"
}), "output should not silently fall through to 'content' when 'result' is []"
@patch("agents.runner.subprocess.run")
def test_result_field_present_and_nonempty_used(self, mock_run, conn):
"""Normal case: non-empty result field is returned, content ignored."""
mock = MagicMock()
mock.stdout = json.dumps({"result": "done", "content": "ignored"})
mock.stderr = ""
mock.returncode = 0
mock_run.return_value = mock
result = run_agent(conn, "tester", "VDOL-001", "vdol")
assert result["output"] == "done"
@patch("agents.runner.subprocess.run")
def test_result_field_missing_falls_back_to_content(self, mock_run, conn):
"""When 'result' key is absent, 'content' must be used as fallback."""
mock = MagicMock()
mock.stdout = json.dumps({"content": "content value"})
mock.stderr = ""
mock.returncode = 0
mock_run.return_value = mock
result = run_agent(conn, "tester", "VDOL-001", "vdol")
assert result["output"] == "content value"
# ---------------------------------------------------------------------------
# Non-interactive mode
@ -2810,3 +2967,183 @@ class TestPMStepPipelineLog:
WHERE p.id IS NULL"""
).fetchall()
assert len(orphans) == 0
# ---------------------------------------------------------------------------
# KIN-116: _detect_destructive_operations — unit tests
# ---------------------------------------------------------------------------
class TestDetectDestructiveOperations:
"""Unit tests for _detect_destructive_operations().
Tests observable behaviour: which patterns trigger detection, which don't.
"""
def _result(self, output):
"""Helper: make a successful result dict with given output text."""
return {"success": True, "raw_output": output, "output": ""}
def test_returns_list_type(self):
"""_detect_destructive_operations должна возвращать list."""
assert isinstance(_detect_destructive_operations([]), list)
def test_clean_output_returns_empty_list(self):
"""Нет деструктивных операций → пустой список (falsy)."""
results = [self._result("All tests passed. 42 passed, 0 failed.")]
assert _detect_destructive_operations(results) == []
def test_detects_rm_rf(self):
"""rm -rf /path → детектируется."""
results = [self._result("Cleaning up: rm -rf /tmp/build")]
assert len(_detect_destructive_operations(results)) > 0
def test_detects_rm_r(self):
"""rm -r /path (recursive without force) → детектируется."""
results = [self._result("Removing: rm -r /tmp/old")]
assert len(_detect_destructive_operations(results)) > 0
def test_detects_drop_table(self):
"""DROP TABLE → детектируется."""
results = [self._result("I will run: DROP TABLE users")]
assert len(_detect_destructive_operations(results)) > 0
def test_detects_drop_database(self):
"""DROP DATABASE → детектируется."""
results = [self._result("DROP DATABASE legacy_db")]
assert len(_detect_destructive_operations(results)) > 0
def test_detects_delete_from_no_where(self):
"""DELETE FROM без WHERE → детектируется."""
results = [self._result("DELETE FROM temp_table")]
assert len(_detect_destructive_operations(results)) > 0
def test_detects_delete_from_with_where(self):
"""DELETE FROM WHERE → тоже детектируется (по дизайну runner'а)."""
results = [self._result("DELETE FROM sessions WHERE expired=1")]
assert len(_detect_destructive_operations(results)) > 0
def test_detects_unlink_shell(self):
"""unlink /path → детектируется."""
results = [self._result("unlink /tmp/lockfile")]
assert len(_detect_destructive_operations(results)) > 0
def test_detects_shutil_rmtree(self):
"""shutil.rmtree() → детектируется."""
results = [self._result("shutil.rmtree(build_dir)")]
assert len(_detect_destructive_operations(results)) > 0
def test_detects_os_remove(self):
"""os.remove() → детектируется."""
results = [self._result("os.remove(path)")]
assert len(_detect_destructive_operations(results)) > 0
def test_detects_os_unlink(self):
"""os.unlink() → детектируется."""
results = [self._result("os.unlink(filename)")]
assert len(_detect_destructive_operations(results)) > 0
def test_ignores_failed_results(self):
"""Успешность success=False → детектор пропускает такой шаг."""
results = [{"success": False, "raw_output": "rm -rf /tmp/x", "output": ""}]
assert _detect_destructive_operations(results) == []
def test_empty_results_list(self):
"""Пустой список results → пустой список matches."""
assert _detect_destructive_operations([]) == []
def test_detects_pattern_in_output_field(self):
"""Детектор сканирует и поле output (не только raw_output)."""
results = [{"success": True, "raw_output": "", "output": "shutil.rmtree('/tmp/x')"}]
assert len(_detect_destructive_operations(results)) > 0
# ---------------------------------------------------------------------------
# KIN-116: Pipeline — destructive ops guard in auto_complete mode
# ---------------------------------------------------------------------------
class TestDestructiveOpsPipeline:
"""run_pipeline должен принудительно ставить review при деструктивных операциях."""
@patch("agents.runner._run_autocommit")
@patch("agents.runner._run_learning_extraction")
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_destructive_output_forces_review_in_auto_complete(
self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn
):
"""rm -rf в output → task.status=review даже при mode=auto_complete."""
mock_run.return_value = _mock_claude_success({"result": "Cleaning up: rm -rf /tmp/build"})
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
mock_learn.return_value = {"added": 0, "skipped": 0}
models.update_project(conn, "vdol", execution_mode="auto_complete")
steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "verify"}]
result = run_pipeline(conn, "VDOL-001", steps)
assert result["success"] is True
task = models.get_task(conn, "VDOL-001")
assert task["status"] == "review", "destructive ops должны принудительно ставить review"
@patch("agents.runner._run_autocommit")
@patch("agents.runner._run_learning_extraction")
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_destructive_output_pipeline_mode_downgraded_to_review(
self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn
):
"""При деструктивных операциях run_pipeline возвращает mode='review'."""
mock_run.return_value = _mock_claude_success({"result": "shutil.rmtree(old_dir)"})
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
mock_learn.return_value = {"added": 0, "skipped": 0}
models.update_project(conn, "vdol", execution_mode="auto_complete")
steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "verify"}]
result = run_pipeline(conn, "VDOL-001", steps)
assert result["mode"] == "review"
@patch("agents.runner._run_autocommit")
@patch("agents.runner._run_learning_extraction")
@patch("core.followup.generate_followups")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_clean_output_auto_complete_sets_done(
self, mock_run, mock_hooks, mock_followup, mock_learn, mock_autocommit, conn
):
"""Без деструктивных операций auto_complete + tester last → status=done."""
mock_run.return_value = _mock_claude_success({"result": "All tests passed."})
mock_hooks.return_value = []
mock_followup.return_value = {"created": [], "pending_actions": []}
mock_learn.return_value = {"added": 0, "skipped": 0}
models.update_project(conn, "vdol", execution_mode="auto_complete")
steps = [{"role": "developer", "brief": "fix"}, {"role": "tester", "brief": "verify"}]
result = run_pipeline(conn, "VDOL-001", steps)
assert result["success"] is True
task = models.get_task(conn, "VDOL-001")
assert task["status"] == "done"
@patch("agents.runner._run_autocommit")
@patch("agents.runner._run_learning_extraction")
@patch("agents.runner.run_hooks")
@patch("agents.runner.subprocess.run")
def test_destructive_output_in_review_mode_stays_review(
self, mock_run, mock_hooks, mock_learn, mock_autocommit, conn
):
"""В review mode деструктивные операции не меняют статус (уже review)."""
mock_run.return_value = _mock_claude_success({"result": "DROP TABLE old_cache"})
mock_hooks.return_value = []
mock_learn.return_value = {"added": 0, "skipped": 0}
# project остаётся в режиме "review" (по умолчанию)
steps = [{"role": "debugger", "brief": "check"}]
result = run_pipeline(conn, "VDOL-001", steps)
assert result["success"] is True
task = models.get_task(conn, "VDOL-001")
assert task["status"] == "review"