kin/tests/test_kin_116_regression.py
2026-03-17 22:18:19 +02:00

227 lines
9.6 KiB
Python

"""Regression tests for KIN-116 — destructive operation detection in Auto mode.
Root cause: agents executing DELETE/DROP/rm -rf in Auto mode could complete
without human review, silently destroying data.
Fix: runner._detect_destructive_operations() scans step outputs for destructive
patterns. If found in auto_complete mode, mode is downgraded to "review" and
the task is put into review status instead of done.
Coverage:
(1) _detect_destructive_operations returns [] for clean output
(2) _detect_destructive_operations detects `rm -rf`
(3) _detect_destructive_operations detects `rm -r`
(4) _detect_destructive_operations detects `rm -f`
(5) _detect_destructive_operations detects `DROP TABLE`
(6) _detect_destructive_operations detects `DELETE FROM`
(7) _detect_destructive_operations detects `unlink /path`
(8) _detect_destructive_operations detects `shutil.rmtree(`
(9) _detect_destructive_operations detects `os.remove(`
(10) _detect_destructive_operations detects `os.unlink(`
(11) _detect_destructive_operations searches raw_output, not just output
(12) _detect_destructive_operations ignores failed steps (success=False)
(13) _detect_destructive_operations is case-insensitive for SQL keywords
(14) run_pipeline auto mode → done when no destructive ops
(15) run_pipeline auto mode → review when destructive op in step output
(16) run_pipeline review mode stays review regardless of destructive ops
(17) audit_log entry written when destructive ops detected in auto mode
"""
import json
import sqlite3
import unittest
from unittest.mock import MagicMock, patch
from agents.runner import _detect_destructive_operations
from core.db import init_db
from core import models
class TestDetectDestructivePatterns(unittest.TestCase):
"""Unit tests for the _detect_destructive_operations() helper."""
def _make_result(self, raw_output="", output="", success=True):
return {"success": success, "raw_output": raw_output, "output": output}
# (1) clean output → no detection
def test_clean_output_returns_empty(self):
results = [self._make_result("I updated the config file", "{}")]
self.assertEqual(_detect_destructive_operations(results), [])
# (2) rm -rf detected
def test_rm_rf_detected(self):
results = [self._make_result("rm -rf /tmp/old_build")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (3) rm -r detected
def test_rm_r_detected(self):
results = [self._make_result("rm -r old_dir/")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (4) rm -f detected
def test_rm_f_detected(self):
results = [self._make_result("rm -f lockfile.pid")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (5) DROP TABLE detected
def test_drop_table_detected(self):
results = [self._make_result("DROP TABLE users;")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (6) DELETE FROM detected
def test_delete_from_detected(self):
results = [self._make_result("DELETE FROM sessions WHERE expired = 1;")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (7) unlink detected
def test_unlink_detected(self):
results = [self._make_result("unlink /var/run/app.pid")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (8) shutil.rmtree detected
def test_shutil_rmtree_detected(self):
results = [self._make_result("shutil.rmtree(build_dir)")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (9) os.remove detected
def test_os_remove_detected(self):
results = [self._make_result("os.remove(path)")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (10) os.unlink detected
def test_os_unlink_detected(self):
results = [self._make_result("os.unlink(stale_file)")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (11) searches raw_output field
def test_searches_raw_output_field(self):
results = [self._make_result(raw_output="rm -rf /tmp/junk", output="{}")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (12) ignores failed steps
def test_ignores_failed_steps(self):
results = [self._make_result("rm -rf /entire/system", success=False)]
self.assertEqual(_detect_destructive_operations(results), [])
# (13) case-insensitive SQL
def test_case_insensitive_sql(self):
results = [self._make_result("drop table tmp_cache;")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (14) multiple results — only clean ones
def test_multiple_results_all_clean(self):
results = [
self._make_result("Updated config"),
self._make_result("Ran migrations"),
]
self.assertEqual(_detect_destructive_operations(results), [])
# (15) multiple results — one dirty
def test_multiple_results_one_dirty(self):
results = [
self._make_result("Updated config"),
self._make_result("DELETE FROM audit_log;"),
]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
class TestRunPipelineDestructiveAutoMode(unittest.TestCase):
"""Integration tests: run_pipeline with destructive ops in auto mode → review."""
def setUp(self):
self.conn = init_db(":memory:")
self.project = models.create_project(
self.conn, "proj-destructive", "DestructiveTest",
path="/tmp/proj-destructive",
execution_mode="auto_complete",
)
self.task = models.create_task(
self.conn, "DEST-001", "proj-destructive", "Test destructive",
brief={"route_type": "hotfix"},
)
def tearDown(self):
self.conn.close()
def _mock_step_result(self, raw_output="clean output"):
return {
"success": True,
"output": {"status": "done"},
"raw_output": raw_output,
"cost_usd": 0.0,
"tokens_used": 0,
"duration_seconds": 1.0,
}
# (14) clean output → auto-complete → done
def test_auto_mode_clean_output_becomes_done(self):
steps = [{"role": "tester", "model": "haiku"}]
clean_result = self._mock_step_result("All tests pass. No changes made.")
with patch("agents.runner.run_agent", return_value=clean_result), \
patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \
patch("agents.runner._run_autocommit"), \
patch("agents.runner._run_learning_extraction"), \
patch("agents.runner.run_hooks"):
from agents.runner import run_pipeline
run_pipeline(self.conn, self.task["id"], steps)
task = models.get_task(self.conn, self.task["id"])
self.assertEqual(task["status"], "done")
# (15) destructive op in output → stays review even in auto mode
def test_auto_mode_destructive_output_becomes_review(self):
steps = [{"role": "tester", "model": "haiku"}]
destructive_result = self._mock_step_result("rm -rf /tmp/old && tests pass")
with patch("agents.runner.run_agent", return_value=destructive_result), \
patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \
patch("agents.runner._run_autocommit"), \
patch("agents.runner._run_learning_extraction"), \
patch("agents.runner.run_hooks"):
from agents.runner import run_pipeline
run_pipeline(self.conn, self.task["id"], steps)
task = models.get_task(self.conn, self.task["id"])
self.assertEqual(task["status"], "review",
"Auto mode with destructive ops must end in review, not done")
# (16) review mode stays review regardless
def test_review_mode_unaffected_by_detection(self):
steps = [{"role": "backend_dev", "model": "haiku"}]
clean_result = self._mock_step_result("Updated models.py")
with patch("agents.runner.run_agent", return_value=clean_result), \
patch("agents.runner.models.get_effective_mode", return_value="review"), \
patch("agents.runner._run_autocommit"), \
patch("agents.runner._run_learning_extraction"), \
patch("agents.runner.run_hooks"):
from agents.runner import run_pipeline
run_pipeline(self.conn, self.task["id"], steps)
task = models.get_task(self.conn, self.task["id"])
self.assertEqual(task["status"], "review")
# (17) audit log written when destructive ops detected in auto mode
def test_audit_log_written_on_destructive_detection(self):
steps = [{"role": "tester", "model": "haiku"}]
destructive_result = self._mock_step_result("DELETE FROM cache;")
with patch("agents.runner.run_agent", return_value=destructive_result), \
patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \
patch("agents.runner._run_autocommit"), \
patch("agents.runner._run_learning_extraction"), \
patch("agents.runner.run_hooks"):
from agents.runner import run_pipeline
run_pipeline(self.conn, self.task["id"], steps)
rows = self.conn.execute(
"SELECT * FROM audit_log WHERE event_type = 'destructive_ops_detected'"
).fetchall()
self.assertGreater(len(rows), 0, "Audit log must have destructive_ops_detected entry")
row = dict(rows[0])
self.assertEqual(row["task_id"], self.task["id"])
if __name__ == "__main__":
unittest.main()