kin/tests/test_kin_116_regression.py

231 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Regression tests for KIN-116 — destructive operation detection in Auto mode.
Root cause: agents executing DELETE/DROP/rm -rf in Auto mode could complete
without human review, silently destroying data.
Fix: runner._detect_destructive_operations() scans step outputs for destructive
patterns. If found in auto_complete mode, mode is downgraded to "review" and
the task is put into review status instead of done.
Coverage:
(1) _detect_destructive_operations returns [] for clean output
(2) _detect_destructive_operations detects `rm -rf`
(3) _detect_destructive_operations detects `rm -r`
(4) _detect_destructive_operations detects `rm -f`
(5) _detect_destructive_operations detects `DROP TABLE`
(6) _detect_destructive_operations detects `DELETE FROM`
(7) _detect_destructive_operations detects `unlink /path`
(8) _detect_destructive_operations detects `shutil.rmtree(`
(9) _detect_destructive_operations detects `os.remove(`
(10) _detect_destructive_operations detects `os.unlink(`
(11) _detect_destructive_operations searches raw_output, not just output
(12) _detect_destructive_operations ignores failed steps (success=False)
(13) _detect_destructive_operations is case-insensitive for SQL keywords
(14) run_pipeline auto mode → done when no destructive ops
(15) run_pipeline auto mode → review when destructive op in step output
(16) run_pipeline review mode stays review regardless of destructive ops
(17) audit_log entry written when destructive ops detected in auto mode
"""
import json
import sqlite3
import unittest
from unittest.mock import MagicMock, patch
from agents.runner import _detect_destructive_operations
from core.db import init_db
from core import models
class TestDetectDestructivePatterns(unittest.TestCase):
"""Unit tests for the _detect_destructive_operations() helper."""
def _make_result(self, raw_output="", output="", success=True):
return {"success": success, "raw_output": raw_output, "output": output}
# (1) clean output → no detection
def test_clean_output_returns_empty(self):
results = [self._make_result("I updated the config file", "{}")]
self.assertEqual(_detect_destructive_operations(results), [])
# (2) rm -rf detected
def test_rm_rf_detected(self):
results = [self._make_result("rm -rf /tmp/old_build")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (3) rm -r detected
def test_rm_r_detected(self):
results = [self._make_result("rm -r old_dir/")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (4) rm -f detected
def test_rm_f_detected(self):
results = [self._make_result("rm -f lockfile.pid")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (5) DROP TABLE detected
def test_drop_table_detected(self):
results = [self._make_result("DROP TABLE users;")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (6) DELETE FROM без WHERE — детектируется; с WHERE — нет (KIN-SEC-001)
def test_delete_from_no_where_detected(self):
results = [self._make_result("DELETE FROM sessions;")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
def test_delete_from_with_where_not_detected(self):
results = [self._make_result("DELETE FROM sessions WHERE expired = 1;")]
self.assertEqual(len(_detect_destructive_operations(results)), 0)
# (7) unlink detected
def test_unlink_detected(self):
results = [self._make_result("unlink /var/run/app.pid")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (8) shutil.rmtree detected
def test_shutil_rmtree_detected(self):
results = [self._make_result("shutil.rmtree(build_dir)")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (9) os.remove detected
def test_os_remove_detected(self):
results = [self._make_result("os.remove(path)")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (10) os.unlink detected
def test_os_unlink_detected(self):
results = [self._make_result("os.unlink(stale_file)")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (11) searches raw_output field
def test_searches_raw_output_field(self):
results = [self._make_result(raw_output="rm -rf /tmp/junk", output="{}")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (12) ignores failed steps
def test_ignores_failed_steps(self):
results = [self._make_result("rm -rf /entire/system", success=False)]
self.assertEqual(_detect_destructive_operations(results), [])
# (13) case-insensitive SQL
def test_case_insensitive_sql(self):
results = [self._make_result("drop table tmp_cache;")]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
# (14) multiple results — only clean ones
def test_multiple_results_all_clean(self):
results = [
self._make_result("Updated config"),
self._make_result("Ran migrations"),
]
self.assertEqual(_detect_destructive_operations(results), [])
# (15) multiple results — one dirty
def test_multiple_results_one_dirty(self):
results = [
self._make_result("Updated config"),
self._make_result("DELETE FROM audit_log;"),
]
self.assertGreater(len(_detect_destructive_operations(results)), 0)
class TestRunPipelineDestructiveAutoMode(unittest.TestCase):
"""Integration tests: run_pipeline with destructive ops in auto mode → review."""
def setUp(self):
self.conn = init_db(":memory:")
self.project = models.create_project(
self.conn, "proj-destructive", "DestructiveTest",
path="/tmp/proj-destructive",
execution_mode="auto_complete",
)
self.task = models.create_task(
self.conn, "DEST-001", "proj-destructive", "Test destructive",
brief={"route_type": "hotfix"},
)
def tearDown(self):
self.conn.close()
def _mock_step_result(self, raw_output="clean output"):
return {
"success": True,
"output": {"status": "done"},
"raw_output": raw_output,
"cost_usd": 0.0,
"tokens_used": 0,
"duration_seconds": 1.0,
}
# (14) clean output → auto-complete → done
def test_auto_mode_clean_output_becomes_done(self):
steps = [{"role": "tester", "model": "haiku"}]
clean_result = self._mock_step_result("All tests pass. No changes made.")
with patch("agents.runner.run_agent", return_value=clean_result), \
patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \
patch("agents.runner._run_autocommit"), \
patch("agents.runner._run_learning_extraction"), \
patch("agents.runner.run_hooks"):
from agents.runner import run_pipeline
run_pipeline(self.conn, self.task["id"], steps)
task = models.get_task(self.conn, self.task["id"])
self.assertEqual(task["status"], "done")
# (15) destructive op in output → stays review even in auto mode
def test_auto_mode_destructive_output_becomes_review(self):
steps = [{"role": "tester", "model": "haiku"}]
destructive_result = self._mock_step_result("rm -rf /tmp/old && tests pass")
with patch("agents.runner.run_agent", return_value=destructive_result), \
patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \
patch("agents.runner._run_autocommit"), \
patch("agents.runner._run_learning_extraction"), \
patch("agents.runner.run_hooks"):
from agents.runner import run_pipeline
run_pipeline(self.conn, self.task["id"], steps)
task = models.get_task(self.conn, self.task["id"])
self.assertEqual(task["status"], "review",
"Auto mode with destructive ops must end in review, not done")
# (16) review mode stays review regardless
def test_review_mode_unaffected_by_detection(self):
steps = [{"role": "backend_dev", "model": "haiku"}]
clean_result = self._mock_step_result("Updated models.py")
with patch("agents.runner.run_agent", return_value=clean_result), \
patch("agents.runner.models.get_effective_mode", return_value="review"), \
patch("agents.runner._run_autocommit"), \
patch("agents.runner._run_learning_extraction"), \
patch("agents.runner.run_hooks"):
from agents.runner import run_pipeline
run_pipeline(self.conn, self.task["id"], steps)
task = models.get_task(self.conn, self.task["id"])
self.assertEqual(task["status"], "review")
# (17) audit log written when destructive ops detected in auto mode
def test_audit_log_written_on_destructive_detection(self):
steps = [{"role": "tester", "model": "haiku"}]
destructive_result = self._mock_step_result("DELETE FROM cache;")
with patch("agents.runner.run_agent", return_value=destructive_result), \
patch("agents.runner.models.get_effective_mode", return_value="auto_complete"), \
patch("agents.runner._run_autocommit"), \
patch("agents.runner._run_learning_extraction"), \
patch("agents.runner.run_hooks"):
from agents.runner import run_pipeline
run_pipeline(self.conn, self.task["id"], steps)
rows = self.conn.execute(
"SELECT * FROM audit_log WHERE event_type = 'destructive_ops_detected'"
).fetchall()
self.assertGreater(len(rows), 0, "Audit log must have destructive_ops_detected entry")
row = dict(rows[0])
self.assertEqual(row["task_id"], self.task["id"])
if __name__ == "__main__":
unittest.main()