"""Regression tests for KIN-116 — destructive operation detection in Auto mode. Root cause: agents executing DELETE/DROP/rm -rf in Auto mode could complete without human review, silently destroying data. Fix: runner._detect_destructive_operations() scans step outputs for destructive patterns. If found in auto_complete mode, mode is downgraded to "review" and the task is put into review status instead of done. Coverage: (1) _detect_destructive_operations returns [] for clean output (2) _detect_destructive_operations detects `rm -rf` (3) _detect_destructive_operations detects `rm -r` (4) _detect_destructive_operations detects `rm -f` (5) _detect_destructive_operations detects `DROP TABLE` (6) _detect_destructive_operations detects `DELETE FROM` (7) _detect_destructive_operations detects `unlink /path` (8) _detect_destructive_operations detects `shutil.rmtree(` (9) _detect_destructive_operations detects `os.remove(` (10) _detect_destructive_operations detects `os.unlink(` (11) _detect_destructive_operations searches raw_output, not just output (12) _detect_destructive_operations ignores failed steps (success=False) (13) _detect_destructive_operations is case-insensitive for SQL keywords (14) run_pipeline auto mode → done when no destructive ops (15) run_pipeline auto mode → review when destructive op in step output (16) run_pipeline review mode stays review regardless of destructive ops (17) audit_log entry written when destructive ops detected in auto mode """ import json import sqlite3 import unittest from unittest.mock import MagicMock, patch from agents.runner import _detect_destructive_operations from core.db import init_db from core import models class TestDetectDestructivePatterns(unittest.TestCase): """Unit tests for the _detect_destructive_operations() helper.""" def _make_result(self, raw_output="", output="", success=True): return {"success": success, "raw_output": raw_output, "output": output} # (1) clean output → no detection def test_clean_output_returns_empty(self): results = [self._make_result("I updated the config file", "{}")] self.assertEqual(_detect_destructive_operations(results), []) # (2) rm -rf detected def test_rm_rf_detected(self): results = [self._make_result("rm -rf /tmp/old_build")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (3) rm -r detected def test_rm_r_detected(self): results = [self._make_result("rm -r old_dir/")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (4) rm -f detected def test_rm_f_detected(self): results = [self._make_result("rm -f lockfile.pid")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (5) DROP TABLE detected def test_drop_table_detected(self): results = [self._make_result("DROP TABLE users;")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (6) DELETE FROM detected def test_delete_from_detected(self): results = [self._make_result("DELETE FROM sessions WHERE expired = 1;")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (7) unlink detected def test_unlink_detected(self): results = [self._make_result("unlink /var/run/app.pid")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (8) shutil.rmtree detected def test_shutil_rmtree_detected(self): results = [self._make_result("shutil.rmtree(build_dir)")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (9) os.remove detected def test_os_remove_detected(self): results = [self._make_result("os.remove(path)")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (10) os.unlink detected def test_os_unlink_detected(self): results = [self._make_result("os.unlink(stale_file)")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (11) searches raw_output field def test_searches_raw_output_field(self): results = [self._make_result(raw_output="rm -rf /tmp/junk", output="{}")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (12) ignores failed steps def test_ignores_failed_steps(self): results = [self._make_result("rm -rf /entire/system", success=False)] self.assertEqual(_detect_destructive_operations(results), []) # (13) case-insensitive SQL def test_case_insensitive_sql(self): results = [self._make_result("drop table tmp_cache;")] self.assertGreater(len(_detect_destructive_operations(results)), 0) # (14) multiple results — only clean ones def test_multiple_results_all_clean(self): results = [ self._make_result("Updated config"), self._make_result("Ran migrations"), ] self.assertEqual(_detect_destructive_operations(results), []) # (15) multiple results — one dirty def test_multiple_results_one_dirty(self): results = [ self._make_result("Updated config"), self._make_result("DELETE FROM audit_log;"), ] self.assertGreater(len(_detect_destructive_operations(results)), 0) class TestRunPipelineDestructiveAutoMode(unittest.TestCase): """Integration tests: run_pipeline with destructive ops in auto mode → review.""" def setUp(self): self.conn = init_db(":memory:") self.project = models.create_project( self.conn, "proj-destructive", "DestructiveTest", path="/tmp/proj-destructive", execution_mode="auto_complete", ) self.task = models.create_task( self.conn, "DEST-001", "proj-destructive", "Test destructive", brief={"route_type": "hotfix"}, ) def tearDown(self): self.conn.close() def _mock_step_result(self, raw_output="clean output"): return { "success": True, "output": {"status": "done"}, "raw_output": raw_output, "cost_usd": 0.0, "tokens_used": 0, "duration_seconds": 1.0, } # (14) clean output → auto-complete → done def test_auto_mode_clean_output_becomes_done(self): steps = [{"role": "tester", "model": "haiku"}] clean_result = self._mock_step_result("All tests pass. No changes made.") with patch("agents.runner.run_agent", return_value=clean_result), \ patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \ patch("agents.runner._run_autocommit"), \ patch("agents.runner._run_learning_extraction"), \ patch("agents.runner.run_hooks"): from agents.runner import run_pipeline run_pipeline(self.conn, self.task["id"], steps) task = models.get_task(self.conn, self.task["id"]) self.assertEqual(task["status"], "done") # (15) destructive op in output → stays review even in auto mode def test_auto_mode_destructive_output_becomes_review(self): steps = [{"role": "tester", "model": "haiku"}] destructive_result = self._mock_step_result("rm -rf /tmp/old && tests pass") with patch("agents.runner.run_agent", return_value=destructive_result), \ patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \ patch("agents.runner._run_autocommit"), \ patch("agents.runner._run_learning_extraction"), \ patch("agents.runner.run_hooks"): from agents.runner import run_pipeline run_pipeline(self.conn, self.task["id"], steps) task = models.get_task(self.conn, self.task["id"]) self.assertEqual(task["status"], "review", "Auto mode with destructive ops must end in review, not done") # (16) review mode stays review regardless def test_review_mode_unaffected_by_detection(self): steps = [{"role": "backend_dev", "model": "haiku"}] clean_result = self._mock_step_result("Updated models.py") with patch("agents.runner.run_agent", return_value=clean_result), \ patch("agents.runner.models.get_effective_mode", return_value="review"), \ patch("agents.runner._run_autocommit"), \ patch("agents.runner._run_learning_extraction"), \ patch("agents.runner.run_hooks"): from agents.runner import run_pipeline run_pipeline(self.conn, self.task["id"], steps) task = models.get_task(self.conn, self.task["id"]) self.assertEqual(task["status"], "review") # (17) audit log written when destructive ops detected in auto mode def test_audit_log_written_on_destructive_detection(self): steps = [{"role": "tester", "model": "haiku"}] destructive_result = self._mock_step_result("DELETE FROM cache;") with patch("agents.runner.run_agent", return_value=destructive_result), \ patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \ patch("agents.runner._run_autocommit"), \ patch("agents.runner._run_learning_extraction"), \ patch("agents.runner.run_hooks"): from agents.runner import run_pipeline run_pipeline(self.conn, self.task["id"], steps) rows = self.conn.execute( "SELECT * FROM audit_log WHERE event_type = 'destructive_ops_detected'" ).fetchall() self.assertGreater(len(rows), 0, "Audit log must have destructive_ops_detected entry") row = dict(rows[0]) self.assertEqual(row["task_id"], self.task["id"]) if __name__ == "__main__": unittest.main()