2026-03-17 22:18:19 +02:00
|
|
|
|
"""Regression tests for KIN-116 — destructive operation detection in Auto mode.
|
|
|
|
|
|
|
|
|
|
|
|
Root cause: agents executing DELETE/DROP/rm -rf in Auto mode could complete
|
|
|
|
|
|
without human review, silently destroying data.
|
|
|
|
|
|
|
|
|
|
|
|
Fix: runner._detect_destructive_operations() scans step outputs for destructive
|
|
|
|
|
|
patterns. If found in auto_complete mode, mode is downgraded to "review" and
|
|
|
|
|
|
the task is put into review status instead of done.
|
|
|
|
|
|
|
|
|
|
|
|
Coverage:
|
|
|
|
|
|
(1) _detect_destructive_operations returns [] for clean output
|
|
|
|
|
|
(2) _detect_destructive_operations detects `rm -rf`
|
|
|
|
|
|
(3) _detect_destructive_operations detects `rm -r`
|
|
|
|
|
|
(4) _detect_destructive_operations detects `rm -f`
|
|
|
|
|
|
(5) _detect_destructive_operations detects `DROP TABLE`
|
|
|
|
|
|
(6) _detect_destructive_operations detects `DELETE FROM`
|
|
|
|
|
|
(7) _detect_destructive_operations detects `unlink /path`
|
|
|
|
|
|
(8) _detect_destructive_operations detects `shutil.rmtree(`
|
|
|
|
|
|
(9) _detect_destructive_operations detects `os.remove(`
|
|
|
|
|
|
(10) _detect_destructive_operations detects `os.unlink(`
|
|
|
|
|
|
(11) _detect_destructive_operations searches raw_output, not just output
|
|
|
|
|
|
(12) _detect_destructive_operations ignores failed steps (success=False)
|
|
|
|
|
|
(13) _detect_destructive_operations is case-insensitive for SQL keywords
|
|
|
|
|
|
(14) run_pipeline auto mode → done when no destructive ops
|
|
|
|
|
|
(15) run_pipeline auto mode → review when destructive op in step output
|
|
|
|
|
|
(16) run_pipeline review mode stays review regardless of destructive ops
|
|
|
|
|
|
(17) audit_log entry written when destructive ops detected in auto mode
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
import sqlite3
|
|
|
|
|
|
import unittest
|
|
|
|
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
|
|
|
|
|
|
|
|
from agents.runner import _detect_destructive_operations
|
|
|
|
|
|
from core.db import init_db
|
|
|
|
|
|
from core import models
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestDetectDestructivePatterns(unittest.TestCase):
|
|
|
|
|
|
"""Unit tests for the _detect_destructive_operations() helper."""
|
|
|
|
|
|
|
|
|
|
|
|
def _make_result(self, raw_output="", output="", success=True):
|
|
|
|
|
|
return {"success": success, "raw_output": raw_output, "output": output}
|
|
|
|
|
|
|
|
|
|
|
|
# (1) clean output → no detection
|
|
|
|
|
|
def test_clean_output_returns_empty(self):
|
|
|
|
|
|
results = [self._make_result("I updated the config file", "{}")]
|
|
|
|
|
|
self.assertEqual(_detect_destructive_operations(results), [])
|
|
|
|
|
|
|
|
|
|
|
|
# (2) rm -rf detected
|
|
|
|
|
|
def test_rm_rf_detected(self):
|
|
|
|
|
|
results = [self._make_result("rm -rf /tmp/old_build")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
# (3) rm -r detected
|
|
|
|
|
|
def test_rm_r_detected(self):
|
|
|
|
|
|
results = [self._make_result("rm -r old_dir/")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
# (4) rm -f detected
|
|
|
|
|
|
def test_rm_f_detected(self):
|
|
|
|
|
|
results = [self._make_result("rm -f lockfile.pid")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
# (5) DROP TABLE detected
|
|
|
|
|
|
def test_drop_table_detected(self):
|
|
|
|
|
|
results = [self._make_result("DROP TABLE users;")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
2026-03-17 22:29:03 +02:00
|
|
|
|
# (6) DELETE FROM без WHERE — детектируется; с WHERE — нет (KIN-SEC-001)
|
|
|
|
|
|
def test_delete_from_no_where_detected(self):
|
|
|
|
|
|
results = [self._make_result("DELETE FROM sessions;")]
|
2026-03-17 22:18:19 +02:00
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
2026-03-17 22:29:03 +02:00
|
|
|
|
def test_delete_from_with_where_not_detected(self):
|
|
|
|
|
|
results = [self._make_result("DELETE FROM sessions WHERE expired = 1;")]
|
|
|
|
|
|
self.assertEqual(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
2026-03-17 22:18:19 +02:00
|
|
|
|
# (7) unlink detected
|
|
|
|
|
|
def test_unlink_detected(self):
|
|
|
|
|
|
results = [self._make_result("unlink /var/run/app.pid")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
# (8) shutil.rmtree detected
|
|
|
|
|
|
def test_shutil_rmtree_detected(self):
|
|
|
|
|
|
results = [self._make_result("shutil.rmtree(build_dir)")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
# (9) os.remove detected
|
|
|
|
|
|
def test_os_remove_detected(self):
|
|
|
|
|
|
results = [self._make_result("os.remove(path)")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
# (10) os.unlink detected
|
|
|
|
|
|
def test_os_unlink_detected(self):
|
|
|
|
|
|
results = [self._make_result("os.unlink(stale_file)")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
# (11) searches raw_output field
|
|
|
|
|
|
def test_searches_raw_output_field(self):
|
|
|
|
|
|
results = [self._make_result(raw_output="rm -rf /tmp/junk", output="{}")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
# (12) ignores failed steps
|
|
|
|
|
|
def test_ignores_failed_steps(self):
|
|
|
|
|
|
results = [self._make_result("rm -rf /entire/system", success=False)]
|
|
|
|
|
|
self.assertEqual(_detect_destructive_operations(results), [])
|
|
|
|
|
|
|
|
|
|
|
|
# (13) case-insensitive SQL
|
|
|
|
|
|
def test_case_insensitive_sql(self):
|
|
|
|
|
|
results = [self._make_result("drop table tmp_cache;")]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
# (14) multiple results — only clean ones
|
|
|
|
|
|
def test_multiple_results_all_clean(self):
|
|
|
|
|
|
results = [
|
|
|
|
|
|
self._make_result("Updated config"),
|
|
|
|
|
|
self._make_result("Ran migrations"),
|
|
|
|
|
|
]
|
|
|
|
|
|
self.assertEqual(_detect_destructive_operations(results), [])
|
|
|
|
|
|
|
|
|
|
|
|
# (15) multiple results — one dirty
|
|
|
|
|
|
def test_multiple_results_one_dirty(self):
|
|
|
|
|
|
results = [
|
|
|
|
|
|
self._make_result("Updated config"),
|
|
|
|
|
|
self._make_result("DELETE FROM audit_log;"),
|
|
|
|
|
|
]
|
|
|
|
|
|
self.assertGreater(len(_detect_destructive_operations(results)), 0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunPipelineDestructiveAutoMode(unittest.TestCase):
|
|
|
|
|
|
"""Integration tests: run_pipeline with destructive ops in auto mode → review."""
|
|
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
|
self.conn = init_db(":memory:")
|
|
|
|
|
|
self.project = models.create_project(
|
|
|
|
|
|
self.conn, "proj-destructive", "DestructiveTest",
|
|
|
|
|
|
path="/tmp/proj-destructive",
|
|
|
|
|
|
execution_mode="auto_complete",
|
|
|
|
|
|
)
|
|
|
|
|
|
self.task = models.create_task(
|
|
|
|
|
|
self.conn, "DEST-001", "proj-destructive", "Test destructive",
|
|
|
|
|
|
brief={"route_type": "hotfix"},
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
|
|
self.conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
def _mock_step_result(self, raw_output="clean output"):
|
|
|
|
|
|
return {
|
|
|
|
|
|
"success": True,
|
|
|
|
|
|
"output": {"status": "done"},
|
|
|
|
|
|
"raw_output": raw_output,
|
|
|
|
|
|
"cost_usd": 0.0,
|
|
|
|
|
|
"tokens_used": 0,
|
|
|
|
|
|
"duration_seconds": 1.0,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# (14) clean output → auto-complete → done
|
|
|
|
|
|
def test_auto_mode_clean_output_becomes_done(self):
|
|
|
|
|
|
steps = [{"role": "tester", "model": "haiku"}]
|
|
|
|
|
|
clean_result = self._mock_step_result("All tests pass. No changes made.")
|
|
|
|
|
|
|
|
|
|
|
|
with patch("agents.runner.run_agent", return_value=clean_result), \
|
|
|
|
|
|
patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \
|
|
|
|
|
|
patch("agents.runner._run_autocommit"), \
|
|
|
|
|
|
patch("agents.runner._run_learning_extraction"), \
|
|
|
|
|
|
patch("agents.runner.run_hooks"):
|
|
|
|
|
|
from agents.runner import run_pipeline
|
|
|
|
|
|
run_pipeline(self.conn, self.task["id"], steps)
|
|
|
|
|
|
|
|
|
|
|
|
task = models.get_task(self.conn, self.task["id"])
|
|
|
|
|
|
self.assertEqual(task["status"], "done")
|
|
|
|
|
|
|
|
|
|
|
|
# (15) destructive op in output → stays review even in auto mode
|
|
|
|
|
|
def test_auto_mode_destructive_output_becomes_review(self):
|
|
|
|
|
|
steps = [{"role": "tester", "model": "haiku"}]
|
|
|
|
|
|
destructive_result = self._mock_step_result("rm -rf /tmp/old && tests pass")
|
|
|
|
|
|
|
|
|
|
|
|
with patch("agents.runner.run_agent", return_value=destructive_result), \
|
|
|
|
|
|
patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \
|
|
|
|
|
|
patch("agents.runner._run_autocommit"), \
|
|
|
|
|
|
patch("agents.runner._run_learning_extraction"), \
|
|
|
|
|
|
patch("agents.runner.run_hooks"):
|
|
|
|
|
|
from agents.runner import run_pipeline
|
|
|
|
|
|
run_pipeline(self.conn, self.task["id"], steps)
|
|
|
|
|
|
|
|
|
|
|
|
task = models.get_task(self.conn, self.task["id"])
|
|
|
|
|
|
self.assertEqual(task["status"], "review",
|
|
|
|
|
|
"Auto mode with destructive ops must end in review, not done")
|
|
|
|
|
|
|
|
|
|
|
|
# (16) review mode stays review regardless
|
|
|
|
|
|
def test_review_mode_unaffected_by_detection(self):
|
|
|
|
|
|
steps = [{"role": "backend_dev", "model": "haiku"}]
|
|
|
|
|
|
clean_result = self._mock_step_result("Updated models.py")
|
|
|
|
|
|
|
|
|
|
|
|
with patch("agents.runner.run_agent", return_value=clean_result), \
|
|
|
|
|
|
patch("agents.runner.models.get_effective_mode", return_value="review"), \
|
|
|
|
|
|
patch("agents.runner._run_autocommit"), \
|
|
|
|
|
|
patch("agents.runner._run_learning_extraction"), \
|
|
|
|
|
|
patch("agents.runner.run_hooks"):
|
|
|
|
|
|
from agents.runner import run_pipeline
|
|
|
|
|
|
run_pipeline(self.conn, self.task["id"], steps)
|
|
|
|
|
|
|
|
|
|
|
|
task = models.get_task(self.conn, self.task["id"])
|
|
|
|
|
|
self.assertEqual(task["status"], "review")
|
|
|
|
|
|
|
|
|
|
|
|
# (17) audit log written when destructive ops detected in auto mode
|
|
|
|
|
|
def test_audit_log_written_on_destructive_detection(self):
|
|
|
|
|
|
steps = [{"role": "tester", "model": "haiku"}]
|
|
|
|
|
|
destructive_result = self._mock_step_result("DELETE FROM cache;")
|
|
|
|
|
|
|
|
|
|
|
|
with patch("agents.runner.run_agent", return_value=destructive_result), \
|
|
|
|
|
|
patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \
|
|
|
|
|
|
patch("agents.runner._run_autocommit"), \
|
|
|
|
|
|
patch("agents.runner._run_learning_extraction"), \
|
|
|
|
|
|
patch("agents.runner.run_hooks"):
|
|
|
|
|
|
from agents.runner import run_pipeline
|
|
|
|
|
|
run_pipeline(self.conn, self.task["id"], steps)
|
|
|
|
|
|
|
|
|
|
|
|
rows = self.conn.execute(
|
|
|
|
|
|
"SELECT * FROM audit_log WHERE event_type = 'destructive_ops_detected'"
|
|
|
|
|
|
).fetchall()
|
|
|
|
|
|
self.assertGreater(len(rows), 0, "Audit log must have destructive_ops_detected entry")
|
|
|
|
|
|
row = dict(rows[0])
|
|
|
|
|
|
self.assertEqual(row["task_id"], self.task["id"])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
unittest.main()
|