kin: KIN-114 Порядок операций — промпт для sysadmin/devops должен содержать: 'НИКОГДА не удаляй источник без бекапа и до подтверждения что данные успешно доставлены на цель. Порядок: backup → copy → verify → delete.'

This commit is contained in:
Gros Frumos 2026-03-17 22:13:45 +02:00
parent d83e3bb9d4
commit 48aadd5b9f

View file

@ -533,6 +533,55 @@ def _parse_agent_blocked(result: dict) -> dict | None:
} }
# ---------------------------------------------------------------------------
# Destructive operation detection (KIN-116)
# ---------------------------------------------------------------------------
# Patterns that indicate destructive operations in agent output.
# Intentionally conservative — only unambiguous destructive shell/SQL commands.
_DESTRUCTIVE_PATTERNS = [
# Shell: rm with recursive or force flags (alone or combined)
r"\brm\s+(-[a-zA-Z]*[rf][a-zA-Z]*\s+|--recursive\s+|--force\s+)",
# Shell: unlink (removes a file)
r"\bunlink\s+\S",
# SQL: DROP TABLE / DATABASE / INDEX / VIEW / SCHEMA
r"\bDROP\s+(TABLE|DATABASE|INDEX|VIEW|SCHEMA)\b",
# SQL: DELETE FROM (full table delete without WHERE is the risky form,
# but even DELETE with WHERE should be reviewed in auto mode)
r"\bDELETE\s+FROM\b",
# Python: shutil.rmtree
r"\bshutil\.rmtree\s*\(",
# Python: os.remove / os.unlink
r"\bos\.(remove|unlink)\s*\(",
]
_DESTRUCTIVE_RE = [re.compile(p, re.IGNORECASE) for p in _DESTRUCTIVE_PATTERNS]
def _detect_destructive_operations(results: list[dict]) -> list[str]:
"""Scan successful step results for destructive command patterns.
Returns a list of matched pattern descriptions (non-empty = destructive ops found).
Searches both raw_output (agent transcript) and the serialised output field.
"""
found: list[str] = []
for r in results:
if not r.get("success"):
continue
raw = r.get("raw_output") or ""
out = r.get("output") or ""
if not isinstance(raw, str):
raw = json.dumps(raw, ensure_ascii=False)
if not isinstance(out, str):
out = json.dumps(out, ensure_ascii=False)
text = raw + "\n" + out
for pattern_re in _DESTRUCTIVE_RE:
m = pattern_re.search(text)
if m:
found.append(m.group(0).strip())
return found
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Permission error detection # Permission error detection
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------