kin: KIN-089 При попытке добавить креды прод сервера для проекта corelock вылетает 500 Internal Server Error
This commit is contained in:
parent
e80e50ba0c
commit
4a65d90218
13 changed files with 1215 additions and 4 deletions
377
tests/test_kin_089_regression.py
Normal file
377
tests/test_kin_089_regression.py
Normal file
|
|
@ -0,0 +1,377 @@
|
|||
"""Regression tests for KIN-089: 500 Internal Server Error when adding credentials.
|
||||
|
||||
Root cause: DB schema had label/login/credential columns; code expected name/username/auth_value.
|
||||
Fix: Migration in core/db.py (_migrate) renames columns label→name, login→username, credential→auth_value.
|
||||
|
||||
Acceptance criteria:
|
||||
1. Credentials can be added without error (status 201, not 500)
|
||||
2. Credentials are stored in DB (encrypted)
|
||||
3. Sysadmin task brief contains environment fields for inventory
|
||||
"""
|
||||
import sqlite3
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from core.db import init_db, _migrate
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _cols(conn: sqlite3.Connection, table: str) -> set[str]:
|
||||
return {r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
|
||||
|
||||
|
||||
def _conn_with_old_env_schema() -> sqlite3.Connection:
|
||||
"""Creates in-memory DB with OLD project_environments schema (label/login/credential)."""
|
||||
conn = sqlite3.connect(":memory:")
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.executescript("""
|
||||
CREATE TABLE projects (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
path TEXT,
|
||||
status TEXT DEFAULT 'active',
|
||||
language TEXT DEFAULT 'ru',
|
||||
execution_mode TEXT NOT NULL DEFAULT 'review'
|
||||
);
|
||||
CREATE TABLE tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
project_id TEXT NOT NULL,
|
||||
title TEXT NOT NULL,
|
||||
status TEXT DEFAULT 'pending',
|
||||
execution_mode TEXT
|
||||
);
|
||||
CREATE TABLE project_environments (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
project_id TEXT NOT NULL REFERENCES projects(id),
|
||||
label TEXT NOT NULL,
|
||||
host TEXT NOT NULL,
|
||||
port INTEGER DEFAULT 22,
|
||||
login TEXT NOT NULL,
|
||||
auth_type TEXT NOT NULL DEFAULT 'password',
|
||||
credential TEXT,
|
||||
is_installed INTEGER NOT NULL DEFAULT 0,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(project_id, label)
|
||||
);
|
||||
INSERT INTO projects VALUES ('corelock', 'Corelock', '/corelock', 'active', 'ru', 'review');
|
||||
INSERT INTO project_environments
|
||||
(project_id, label, host, port, login, auth_type, credential, is_installed)
|
||||
VALUES ('corelock', 'prod', '10.5.1.254', 22, 'pelmen', 'password', 'b64:c2VjcmV0', 0);
|
||||
""")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Migration: label/login/credential → name/username/auth_value
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestKin089Migration:
|
||||
"""Regression: _migrate renames env columns from old schema to new schema."""
|
||||
|
||||
def test_migration_renames_label_to_name(self):
|
||||
conn = _conn_with_old_env_schema()
|
||||
_migrate(conn)
|
||||
cols = _cols(conn, "project_environments")
|
||||
assert "name" in cols, "After migration, 'name' column must exist"
|
||||
assert "label" not in cols, "After migration, 'label' column must not exist"
|
||||
conn.close()
|
||||
|
||||
def test_migration_renames_login_to_username(self):
|
||||
conn = _conn_with_old_env_schema()
|
||||
_migrate(conn)
|
||||
cols = _cols(conn, "project_environments")
|
||||
assert "username" in cols, "After migration, 'username' column must exist"
|
||||
assert "login" not in cols, "After migration, 'login' column must not exist"
|
||||
conn.close()
|
||||
|
||||
def test_migration_renames_credential_to_auth_value(self):
|
||||
conn = _conn_with_old_env_schema()
|
||||
_migrate(conn)
|
||||
cols = _cols(conn, "project_environments")
|
||||
assert "auth_value" in cols, "After migration, 'auth_value' column must exist"
|
||||
assert "credential" not in cols, "After migration, 'credential' column must not exist"
|
||||
conn.close()
|
||||
|
||||
def test_migration_preserves_existing_data(self):
|
||||
"""After migration, existing env rows must be accessible with new column names."""
|
||||
conn = _conn_with_old_env_schema()
|
||||
_migrate(conn)
|
||||
row = conn.execute(
|
||||
"SELECT name, username, auth_value FROM project_environments WHERE project_id = 'corelock'"
|
||||
).fetchone()
|
||||
assert row is not None, "Existing row must survive migration"
|
||||
assert row["name"] == "prod"
|
||||
assert row["username"] == "pelmen"
|
||||
assert row["auth_value"] == "b64:c2VjcmV0"
|
||||
conn.close()
|
||||
|
||||
def test_migration_is_idempotent_on_new_schema(self):
|
||||
"""Calling _migrate on a DB that already has new schema must not fail."""
|
||||
conn = init_db(":memory:")
|
||||
before = _cols(conn, "project_environments")
|
||||
_migrate(conn)
|
||||
after = _cols(conn, "project_environments")
|
||||
assert before == after, "_migrate must not alter schema when new columns already exist"
|
||||
conn.close()
|
||||
|
||||
def test_migration_preserves_unique_constraint(self):
|
||||
"""After migration, UNIQUE(project_id, name) constraint must still work."""
|
||||
conn = _conn_with_old_env_schema()
|
||||
_migrate(conn)
|
||||
with pytest.raises(sqlite3.IntegrityError):
|
||||
conn.execute(
|
||||
"INSERT INTO project_environments (project_id, name, host, username) "
|
||||
"VALUES ('corelock', 'prod', '1.2.3.4', 'root')"
|
||||
)
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoint regression: POST /environments must return 201, not 500
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def client(tmp_path):
|
||||
import web.api as api_module
|
||||
api_module.DB_PATH = tmp_path / "test.db"
|
||||
from web.api import app
|
||||
from fastapi.testclient import TestClient
|
||||
c = TestClient(app)
|
||||
c.post("/api/projects", json={"id": "corelock", "name": "Corelock", "path": "/corelock"})
|
||||
return c
|
||||
|
||||
|
||||
def test_create_environment_returns_201_not_500(client):
|
||||
"""Regression KIN-089: POST /environments must not return 500."""
|
||||
r = client.post("/api/projects/corelock/environments", json={
|
||||
"name": "prod",
|
||||
"host": "10.5.1.254",
|
||||
"username": "pelmen",
|
||||
"port": 22,
|
||||
"auth_type": "password",
|
||||
"auth_value": "s3cr3t",
|
||||
"is_installed": False,
|
||||
})
|
||||
assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}"
|
||||
|
||||
|
||||
def test_create_environment_missing_kin_secret_key_returns_503(tmp_path):
|
||||
"""When KIN_SECRET_KEY is not set, POST /environments must return 503, not 500.
|
||||
|
||||
503 = server misconfiguration (operator error), not 500 (code bug).
|
||||
"""
|
||||
import os
|
||||
import web.api as api_module
|
||||
api_module.DB_PATH = tmp_path / "test503.db"
|
||||
from web.api import app
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
env_without_key = {k: v for k, v in os.environ.items() if k != "KIN_SECRET_KEY"}
|
||||
with patch.dict(os.environ, env_without_key, clear=True):
|
||||
c = TestClient(app)
|
||||
c.post("/api/projects", json={"id": "corelock", "name": "Corelock", "path": "/corelock"})
|
||||
r = c.post("/api/projects/corelock/environments", json={
|
||||
"name": "prod",
|
||||
"host": "10.5.1.254",
|
||||
"username": "pelmen",
|
||||
"auth_value": "secret",
|
||||
})
|
||||
assert r.status_code == 503, (
|
||||
f"Missing KIN_SECRET_KEY must return 503 (not 500 or other), got {r.status_code}: {r.text}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC: Credentials stored in DB
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_create_environment_auth_value_encrypted_in_db(client):
|
||||
"""AC: auth_value is stored encrypted in DB, not plain text."""
|
||||
import web.api as api_module
|
||||
from core.db import init_db
|
||||
from core import models as m
|
||||
|
||||
r = client.post("/api/projects/corelock/environments", json={
|
||||
"name": "db-creds-test",
|
||||
"host": "10.5.1.254",
|
||||
"username": "pelmen",
|
||||
"auth_value": "supersecret",
|
||||
})
|
||||
assert r.status_code == 201
|
||||
env_id = r.json()["id"]
|
||||
|
||||
conn = init_db(api_module.DB_PATH)
|
||||
row = conn.execute(
|
||||
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
|
||||
assert row["auth_value"] is not None, "auth_value must be stored in DB"
|
||||
assert row["auth_value"] != "supersecret", "auth_value must NOT be stored as plain text"
|
||||
|
||||
|
||||
def test_create_environment_auth_value_hidden_in_response(client):
|
||||
"""AC: auth_value is never returned in API response."""
|
||||
r = client.post("/api/projects/corelock/environments", json={
|
||||
"name": "hidden-creds",
|
||||
"host": "10.5.1.254",
|
||||
"username": "pelmen",
|
||||
"auth_value": "supersecret",
|
||||
})
|
||||
assert r.status_code == 201
|
||||
assert r.json().get("auth_value") is None, "auth_value must be None in response"
|
||||
|
||||
|
||||
def test_create_environment_stored_credential_is_decryptable(client):
|
||||
"""AC: Stored credential can be decrypted back to original value."""
|
||||
import web.api as api_module
|
||||
from core.db import init_db
|
||||
from core import models as m
|
||||
|
||||
r = client.post("/api/projects/corelock/environments", json={
|
||||
"name": "decrypt-test",
|
||||
"host": "10.5.1.254",
|
||||
"username": "pelmen",
|
||||
"auth_value": "mypassword123",
|
||||
})
|
||||
assert r.status_code == 201
|
||||
env_id = r.json()["id"]
|
||||
|
||||
conn = init_db(api_module.DB_PATH)
|
||||
row = conn.execute(
|
||||
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
|
||||
decrypted = m._decrypt_auth(row["auth_value"])
|
||||
assert decrypted == "mypassword123", "Stored credential must decrypt to original value"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC: Sysadmin sees environment fields in context for inventory
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_sysadmin_task_created_with_env_fields_in_brief(client):
|
||||
"""AC: When is_installed=True, sysadmin task brief contains host and username."""
|
||||
import web.api as api_module
|
||||
from core.db import init_db
|
||||
from core import models as m
|
||||
|
||||
with patch("subprocess.Popen") as mock_popen:
|
||||
mock_popen.return_value = MagicMock(pid=12345)
|
||||
r = client.post("/api/projects/corelock/environments", json={
|
||||
"name": "prod-scan",
|
||||
"host": "10.5.1.254",
|
||||
"username": "pelmen",
|
||||
"is_installed": True,
|
||||
})
|
||||
|
||||
assert r.status_code == 201
|
||||
assert "scan_task_id" in r.json(), "scan_task_id must be returned when is_installed=True"
|
||||
task_id = r.json()["scan_task_id"]
|
||||
|
||||
conn = init_db(api_module.DB_PATH)
|
||||
task = m.get_task(conn, task_id)
|
||||
conn.close()
|
||||
|
||||
assert task is not None, "Sysadmin task must be created in DB"
|
||||
assert task["assigned_role"] == "sysadmin"
|
||||
assert task["category"] == "INFRA"
|
||||
|
||||
brief = task["brief"]
|
||||
brief_str = str(brief)
|
||||
assert "10.5.1.254" in brief_str, "Sysadmin brief must contain host for inventory"
|
||||
assert "pelmen" in brief_str, "Sysadmin brief must contain username for inventory"
|
||||
|
||||
|
||||
def test_sysadmin_task_brief_is_dict_not_string(client):
|
||||
"""Sysadmin task brief must be a structured dict (not raw string) for agent parsing."""
|
||||
import web.api as api_module
|
||||
from core.db import init_db
|
||||
from core import models as m
|
||||
|
||||
with patch("subprocess.Popen") as mock_popen:
|
||||
mock_popen.return_value = MagicMock(pid=99999)
|
||||
r = client.post("/api/projects/corelock/environments", json={
|
||||
"name": "brief-type-test",
|
||||
"host": "10.5.1.1",
|
||||
"username": "root",
|
||||
"is_installed": True,
|
||||
})
|
||||
|
||||
task_id = r.json()["scan_task_id"]
|
||||
conn = init_db(api_module.DB_PATH)
|
||||
task = m.get_task(conn, task_id)
|
||||
conn.close()
|
||||
|
||||
assert isinstance(task["brief"], dict), (
|
||||
f"Sysadmin task brief must be a dict, got {type(task['brief'])}"
|
||||
)
|
||||
|
||||
|
||||
def test_post_migration_create_environment_works(tmp_path):
|
||||
"""AC: After DB migration from old schema, create_environment works end-to-end."""
|
||||
import web.api as api_module
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
# Set up DB with old schema using a file-based DB (to test init_db migration path)
|
||||
old_db_path = tmp_path / "old.db"
|
||||
conn = sqlite3.connect(str(old_db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.executescript("""
|
||||
CREATE TABLE projects (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
path TEXT,
|
||||
status TEXT DEFAULT 'active',
|
||||
language TEXT DEFAULT 'ru',
|
||||
execution_mode TEXT NOT NULL DEFAULT 'review'
|
||||
);
|
||||
CREATE TABLE tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
project_id TEXT NOT NULL,
|
||||
title TEXT NOT NULL,
|
||||
status TEXT DEFAULT 'pending',
|
||||
execution_mode TEXT
|
||||
);
|
||||
CREATE TABLE project_environments (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
project_id TEXT NOT NULL REFERENCES projects(id),
|
||||
label TEXT NOT NULL,
|
||||
host TEXT NOT NULL,
|
||||
port INTEGER DEFAULT 22,
|
||||
login TEXT NOT NULL,
|
||||
auth_type TEXT NOT NULL DEFAULT 'password',
|
||||
credential TEXT,
|
||||
is_installed INTEGER NOT NULL DEFAULT 0,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(project_id, label)
|
||||
);
|
||||
INSERT INTO projects VALUES ('corelock', 'Corelock', '/corelock', 'active', 'ru', 'review');
|
||||
""")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Switch API to use the old DB — init_db will run _migrate on it
|
||||
api_module.DB_PATH = old_db_path
|
||||
from web.api import app
|
||||
c = TestClient(app)
|
||||
|
||||
# Trigger init_db migration by making a request
|
||||
r = c.post("/api/projects/corelock/environments", json={
|
||||
"name": "prod",
|
||||
"host": "10.5.1.254",
|
||||
"username": "pelmen",
|
||||
"auth_value": "topsecret",
|
||||
})
|
||||
assert r.status_code == 201, (
|
||||
f"After migration from old schema, create_environment must return 201, got {r.status_code}: {r.text}"
|
||||
)
|
||||
assert r.json()["name"] == "prod"
|
||||
assert r.json()["username"] == "pelmen"
|
||||
Loading…
Add table
Add a link
Reference in a new issue