kin/tests/test_kin_089_regression.py

378 lines
14 KiB
Python
Raw Permalink Normal View History

"""Regression tests for KIN-089: 500 Internal Server Error when adding credentials.
Root cause: DB schema had label/login/credential columns; code expected name/username/auth_value.
Fix: Migration in core/db.py (_migrate) renames columns labelname, loginusername, credentialauth_value.
Acceptance criteria:
1. Credentials can be added without error (status 201, not 500)
2. Credentials are stored in DB (encrypted)
3. Sysadmin task brief contains environment fields for inventory
"""
import sqlite3
import pytest
from unittest.mock import patch, MagicMock
from core.db import init_db, _migrate
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _cols(conn: sqlite3.Connection, table: str) -> set[str]:
return {r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
def _conn_with_old_env_schema() -> sqlite3.Connection:
"""Creates in-memory DB with OLD project_environments schema (label/login/credential)."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript("""
CREATE TABLE projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
path TEXT,
status TEXT DEFAULT 'active',
language TEXT DEFAULT 'ru',
execution_mode TEXT NOT NULL DEFAULT 'review'
);
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
title TEXT NOT NULL,
status TEXT DEFAULT 'pending',
execution_mode TEXT
);
CREATE TABLE project_environments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL REFERENCES projects(id),
label TEXT NOT NULL,
host TEXT NOT NULL,
port INTEGER DEFAULT 22,
login TEXT NOT NULL,
auth_type TEXT NOT NULL DEFAULT 'password',
credential TEXT,
is_installed INTEGER NOT NULL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(project_id, label)
);
INSERT INTO projects VALUES ('corelock', 'Corelock', '/corelock', 'active', 'ru', 'review');
INSERT INTO project_environments
(project_id, label, host, port, login, auth_type, credential, is_installed)
VALUES ('corelock', 'prod', '10.5.1.254', 22, 'pelmen', 'password', 'b64:c2VjcmV0', 0);
""")
conn.commit()
return conn
# ---------------------------------------------------------------------------
# Migration: label/login/credential → name/username/auth_value
# ---------------------------------------------------------------------------
class TestKin089Migration:
"""Regression: _migrate renames env columns from old schema to new schema."""
def test_migration_renames_label_to_name(self):
conn = _conn_with_old_env_schema()
_migrate(conn)
cols = _cols(conn, "project_environments")
assert "name" in cols, "After migration, 'name' column must exist"
assert "label" not in cols, "After migration, 'label' column must not exist"
conn.close()
def test_migration_renames_login_to_username(self):
conn = _conn_with_old_env_schema()
_migrate(conn)
cols = _cols(conn, "project_environments")
assert "username" in cols, "After migration, 'username' column must exist"
assert "login" not in cols, "After migration, 'login' column must not exist"
conn.close()
def test_migration_renames_credential_to_auth_value(self):
conn = _conn_with_old_env_schema()
_migrate(conn)
cols = _cols(conn, "project_environments")
assert "auth_value" in cols, "After migration, 'auth_value' column must exist"
assert "credential" not in cols, "After migration, 'credential' column must not exist"
conn.close()
def test_migration_preserves_existing_data(self):
"""After migration, existing env rows must be accessible with new column names."""
conn = _conn_with_old_env_schema()
_migrate(conn)
row = conn.execute(
"SELECT name, username, auth_value FROM project_environments WHERE project_id = 'corelock'"
).fetchone()
assert row is not None, "Existing row must survive migration"
assert row["name"] == "prod"
assert row["username"] == "pelmen"
assert row["auth_value"] == "b64:c2VjcmV0"
conn.close()
def test_migration_is_idempotent_on_new_schema(self):
"""Calling _migrate on a DB that already has new schema must not fail."""
conn = init_db(":memory:")
before = _cols(conn, "project_environments")
_migrate(conn)
after = _cols(conn, "project_environments")
assert before == after, "_migrate must not alter schema when new columns already exist"
conn.close()
def test_migration_preserves_unique_constraint(self):
"""After migration, UNIQUE(project_id, name) constraint must still work."""
conn = _conn_with_old_env_schema()
_migrate(conn)
with pytest.raises(sqlite3.IntegrityError):
conn.execute(
"INSERT INTO project_environments (project_id, name, host, username) "
"VALUES ('corelock', 'prod', '1.2.3.4', 'root')"
)
conn.close()
# ---------------------------------------------------------------------------
# Endpoint regression: POST /environments must return 201, not 500
# ---------------------------------------------------------------------------
@pytest.fixture
def client(tmp_path):
import web.api as api_module
api_module.DB_PATH = tmp_path / "test.db"
from web.api import app
from fastapi.testclient import TestClient
c = TestClient(app)
c.post("/api/projects", json={"id": "corelock", "name": "Corelock", "path": "/corelock"})
return c
def test_create_environment_returns_201_not_500(client):
"""Regression KIN-089: POST /environments must not return 500."""
r = client.post("/api/projects/corelock/environments", json={
"name": "prod",
"host": "10.5.1.254",
"username": "pelmen",
"port": 22,
"auth_type": "password",
"auth_value": "s3cr3t",
"is_installed": False,
})
assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}"
def test_create_environment_missing_kin_secret_key_returns_503(tmp_path):
"""When KIN_SECRET_KEY is not set, POST /environments must return 503, not 500.
503 = server misconfiguration (operator error), not 500 (code bug).
"""
import os
import web.api as api_module
api_module.DB_PATH = tmp_path / "test503.db"
from web.api import app
from fastapi.testclient import TestClient
env_without_key = {k: v for k, v in os.environ.items() if k != "KIN_SECRET_KEY"}
with patch.dict(os.environ, env_without_key, clear=True):
c = TestClient(app)
c.post("/api/projects", json={"id": "corelock", "name": "Corelock", "path": "/corelock"})
r = c.post("/api/projects/corelock/environments", json={
"name": "prod",
"host": "10.5.1.254",
"username": "pelmen",
"auth_value": "secret",
})
assert r.status_code == 503, (
f"Missing KIN_SECRET_KEY must return 503 (not 500 or other), got {r.status_code}: {r.text}"
)
# ---------------------------------------------------------------------------
# AC: Credentials stored in DB
# ---------------------------------------------------------------------------
def test_create_environment_auth_value_encrypted_in_db(client):
"""AC: auth_value is stored encrypted in DB, not plain text."""
import web.api as api_module
from core.db import init_db
from core import models as m
r = client.post("/api/projects/corelock/environments", json={
"name": "db-creds-test",
"host": "10.5.1.254",
"username": "pelmen",
"auth_value": "supersecret",
})
assert r.status_code == 201
env_id = r.json()["id"]
conn = init_db(api_module.DB_PATH)
row = conn.execute(
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
).fetchone()
conn.close()
assert row["auth_value"] is not None, "auth_value must be stored in DB"
assert row["auth_value"] != "supersecret", "auth_value must NOT be stored as plain text"
def test_create_environment_auth_value_hidden_in_response(client):
"""AC: auth_value is never returned in API response."""
r = client.post("/api/projects/corelock/environments", json={
"name": "hidden-creds",
"host": "10.5.1.254",
"username": "pelmen",
"auth_value": "supersecret",
})
assert r.status_code == 201
assert r.json().get("auth_value") is None, "auth_value must be None in response"
def test_create_environment_stored_credential_is_decryptable(client):
"""AC: Stored credential can be decrypted back to original value."""
import web.api as api_module
from core.db import init_db
from core import models as m
r = client.post("/api/projects/corelock/environments", json={
"name": "decrypt-test",
"host": "10.5.1.254",
"username": "pelmen",
"auth_value": "mypassword123",
})
assert r.status_code == 201
env_id = r.json()["id"]
conn = init_db(api_module.DB_PATH)
row = conn.execute(
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
).fetchone()
conn.close()
decrypted = m._decrypt_auth(row["auth_value"])
assert decrypted == "mypassword123", "Stored credential must decrypt to original value"
# ---------------------------------------------------------------------------
# AC: Sysadmin sees environment fields in context for inventory
# ---------------------------------------------------------------------------
def test_sysadmin_task_created_with_env_fields_in_brief(client):
"""AC: When is_installed=True, sysadmin task brief contains host and username."""
import web.api as api_module
from core.db import init_db
from core import models as m
with patch("subprocess.Popen") as mock_popen:
mock_popen.return_value = MagicMock(pid=12345)
r = client.post("/api/projects/corelock/environments", json={
"name": "prod-scan",
"host": "10.5.1.254",
"username": "pelmen",
"is_installed": True,
})
assert r.status_code == 201
assert "scan_task_id" in r.json(), "scan_task_id must be returned when is_installed=True"
task_id = r.json()["scan_task_id"]
conn = init_db(api_module.DB_PATH)
task = m.get_task(conn, task_id)
conn.close()
assert task is not None, "Sysadmin task must be created in DB"
assert task["assigned_role"] == "sysadmin"
assert task["category"] == "INFRA"
brief = task["brief"]
brief_str = str(brief)
assert "10.5.1.254" in brief_str, "Sysadmin brief must contain host for inventory"
assert "pelmen" in brief_str, "Sysadmin brief must contain username for inventory"
def test_sysadmin_task_brief_is_dict_not_string(client):
"""Sysadmin task brief must be a structured dict (not raw string) for agent parsing."""
import web.api as api_module
from core.db import init_db
from core import models as m
with patch("subprocess.Popen") as mock_popen:
mock_popen.return_value = MagicMock(pid=99999)
r = client.post("/api/projects/corelock/environments", json={
"name": "brief-type-test",
"host": "10.5.1.1",
"username": "root",
"is_installed": True,
})
task_id = r.json()["scan_task_id"]
conn = init_db(api_module.DB_PATH)
task = m.get_task(conn, task_id)
conn.close()
assert isinstance(task["brief"], dict), (
f"Sysadmin task brief must be a dict, got {type(task['brief'])}"
)
def test_post_migration_create_environment_works(tmp_path):
"""AC: After DB migration from old schema, create_environment works end-to-end."""
import web.api as api_module
from fastapi.testclient import TestClient
# Set up DB with old schema using a file-based DB (to test init_db migration path)
old_db_path = tmp_path / "old.db"
conn = sqlite3.connect(str(old_db_path))
conn.row_factory = sqlite3.Row
conn.executescript("""
CREATE TABLE projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
path TEXT,
status TEXT DEFAULT 'active',
language TEXT DEFAULT 'ru',
execution_mode TEXT NOT NULL DEFAULT 'review'
);
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
title TEXT NOT NULL,
status TEXT DEFAULT 'pending',
execution_mode TEXT
);
CREATE TABLE project_environments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL REFERENCES projects(id),
label TEXT NOT NULL,
host TEXT NOT NULL,
port INTEGER DEFAULT 22,
login TEXT NOT NULL,
auth_type TEXT NOT NULL DEFAULT 'password',
credential TEXT,
is_installed INTEGER NOT NULL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(project_id, label)
);
INSERT INTO projects VALUES ('corelock', 'Corelock', '/corelock', 'active', 'ru', 'review');
""")
conn.commit()
conn.close()
# Switch API to use the old DB — init_db will run _migrate on it
api_module.DB_PATH = old_db_path
from web.api import app
c = TestClient(app)
# Trigger init_db migration by making a request
r = c.post("/api/projects/corelock/environments", json={
"name": "prod",
"host": "10.5.1.254",
"username": "pelmen",
"auth_value": "topsecret",
})
assert r.status_code == 201, (
f"After migration from old schema, create_environment must return 201, got {r.status_code}: {r.text}"
)
assert r.json()["name"] == "prod"
assert r.json()["username"] == "pelmen"