diff --git a/core/models.py b/core/models.py index 4b2cac7..d301301 100644 --- a/core/models.py +++ b/core/models.py @@ -789,11 +789,14 @@ def create_environment( def get_environment(conn: sqlite3.Connection, env_id: int) -> dict | None: - """Get environment by id including raw obfuscated auth_value (for internal use).""" + """Get environment by id. auth_value is returned decrypted (for internal use).""" row = conn.execute( "SELECT * FROM project_environments WHERE id = ?", (env_id,) ).fetchone() - return _row_to_dict(row) + result = _row_to_dict(row) + if result and result.get("auth_value"): + result["auth_value"] = _decrypt_auth(result["auth_value"], conn=conn, env_id=env_id) + return result def list_environments(conn: sqlite3.Connection, project_id: str) -> list[dict]: diff --git a/tests/test_kin_biz_007_fernet.py b/tests/test_kin_biz_007_fernet.py new file mode 100644 index 0000000..857e6dd --- /dev/null +++ b/tests/test_kin_biz_007_fernet.py @@ -0,0 +1,327 @@ +"""Tests for KIN-BIZ-007: Fernet encryption of credentials in project_environments. + +Acceptance criteria: +1. Roundtrip: _encrypt_auth → _decrypt_auth returns the original string. +2. Migration: b64:-prefixed record is auto-re-encrypted on read; decrypt returns plaintext. +3. Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500). +4. Runner path: get_environment() returns decrypted plaintext auth_value. +5. Old _obfuscate_auth / _deobfuscate_auth are not present anywhere. + +Decision #214: patch на consuming-модуле, не на defining. +Decision #215: использовать mock.assert_called_once(). +""" + +import base64 +import os +import pytest +from unittest.mock import patch, MagicMock + +from core.db import init_db +from core import models + + +@pytest.fixture +def conn(): + """Fresh in-memory DB for each test.""" + c = init_db(db_path=":memory:") + yield c + c.close() + + +@pytest.fixture +def conn_with_project(conn): + """In-memory DB with a test project.""" + models.create_project(conn, "testproj", "Test Project", "/test") + return conn + + +@pytest.fixture +def scan_client(tmp_path): + """TestClient with project + environment pre-created. Returns (client, env_id).""" + import web.api as api_module + api_module.DB_PATH = tmp_path / "scan_biz007.db" + from web.api import app + from fastapi.testclient import TestClient + c = TestClient(app) + c.post("/api/projects", json={"id": "scanproj", "name": "Scan Project", "path": "/scan"}) + r = c.post("/api/projects/scanproj/environments", json={ + "name": "prod", "host": "10.0.0.1", "username": "root", + }) + env_id = r.json()["id"] + return c, env_id + + +# --------------------------------------------------------------------------- +# AC1: Roundtrip — _encrypt_auth → _decrypt_auth returns original string +# --------------------------------------------------------------------------- + +def test_encrypt_decrypt_roundtrip_returns_original(conn): + """AC1: encrypt → decrypt returns the exact original plaintext.""" + original = "my_super_secret_password" + encrypted = models._encrypt_auth(original) + decrypted = models._decrypt_auth(encrypted) + assert decrypted == original + + +def test_encrypt_produces_different_value_than_plaintext(conn): + """AC1: encrypted value is not the original (Fernet token, not plaintext).""" + original = "plain_secret" + encrypted = models._encrypt_auth(original) + assert encrypted != original + assert not encrypted.startswith("b64:") + + +def test_encrypt_two_calls_produce_different_tokens(conn): + """AC1: Fernet uses random IV — two encryptions of same value differ but both decrypt correctly.""" + value = "same_password" + enc1 = models._encrypt_auth(value) + enc2 = models._encrypt_auth(value) + # Encrypted forms must differ due to Fernet IV randomness + assert enc1 != enc2 + # Both must decrypt to original + assert models._decrypt_auth(enc1) == value + assert models._decrypt_auth(enc2) == value + + +def test_encrypt_raises_runtime_error_when_no_key(monkeypatch): + """AC1: _encrypt_auth raises RuntimeError when KIN_SECRET_KEY is absent.""" + monkeypatch.delenv("KIN_SECRET_KEY", raising=False) + with pytest.raises(RuntimeError, match="KIN_SECRET_KEY"): + models._encrypt_auth("any_value") + + +def test_decrypt_fernet_token_without_key_returns_raw_not_plaintext(monkeypatch): + """AC1: _decrypt_auth without key cannot recover plaintext — returns stored token, not original.""" + original = "secret" + encrypted = models._encrypt_auth(original) + monkeypatch.delenv("KIN_SECRET_KEY", raising=False) + result = models._decrypt_auth(encrypted) + # Without the key we cannot get the plaintext back + assert result != original + + +# --------------------------------------------------------------------------- +# AC2: Migration — b64: record auto-re-encrypted on read +# --------------------------------------------------------------------------- + +def test_decrypt_auth_handles_b64_prefix_without_db(conn): + """AC2: _decrypt_auth decodes b64:-prefixed value (no DB needed for the decode itself).""" + plaintext = "legacy_password" + b64_stored = "b64:" + base64.b64encode(plaintext.encode()).decode() + decrypted = models._decrypt_auth(b64_stored) + assert decrypted == plaintext + + +def test_decrypt_auth_b64_rewrites_db_when_conn_provided(conn_with_project): + """AC2: _decrypt_auth with conn+env_id re-encrypts b64: value in DB on read.""" + conn = conn_with_project + plaintext = "legacy_pass_123" + b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode() + + cur = conn.execute( + """INSERT INTO project_environments + (project_id, name, host, port, username, auth_type, auth_value, is_installed) + VALUES ('testproj', 'legacy', 'host.example.com', 22, 'root', 'password', ?, 0)""", + (b64_value,), + ) + conn.commit() + env_id = cur.lastrowid + + # Call decrypt with conn+env_id — must trigger re-encryption + decrypted = models._decrypt_auth(b64_value, conn=conn, env_id=env_id) + assert decrypted == plaintext + + # DB must now have Fernet token, not b64: + stored_after = conn.execute( + "SELECT auth_value FROM project_environments WHERE id = ?", (env_id,) + ).fetchone()["auth_value"] + assert not stored_after.startswith("b64:"), ( + "After migration, b64: prefix must be replaced by a Fernet token" + ) + # And the new token must decrypt correctly + assert models._decrypt_auth(stored_after) == plaintext + + +def test_get_environment_migrates_b64_and_returns_plaintext(conn_with_project): + """AC2: get_environment() transparently migrates b64: values and returns plaintext auth_value.""" + conn = conn_with_project + plaintext = "old_secret" + b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode() + + cur = conn.execute( + """INSERT INTO project_environments + (project_id, name, host, port, username, auth_type, auth_value, is_installed) + VALUES ('testproj', 'legacy2', 'host2.example.com', 22, 'root', 'password', ?, 0)""", + (b64_value,), + ) + conn.commit() + env_id = cur.lastrowid + + env = models.get_environment(conn, env_id) + + assert env["auth_value"] == plaintext, ( + f"get_environment must return plaintext after b64 migration, got: {env['auth_value']!r}" + ) + + # DB must be updated: b64: replaced by Fernet token + stored_after = conn.execute( + "SELECT auth_value FROM project_environments WHERE id = ?", (env_id,) + ).fetchone()["auth_value"] + assert not stored_after.startswith("b64:"), ( + "DB must contain Fernet token after get_environment migrates b64: record" + ) + + +def test_get_environment_second_read_after_migration_still_decrypts(conn_with_project): + """AC2: After b64 migration, subsequent get_environment calls still return plaintext.""" + conn = conn_with_project + plaintext = "migrated_secret" + b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode() + + cur = conn.execute( + """INSERT INTO project_environments + (project_id, name, host, port, username, auth_type, auth_value, is_installed) + VALUES ('testproj', 'legacy3', 'host3.example.com', 22, 'root', 'password', ?, 0)""", + (b64_value,), + ) + conn.commit() + env_id = cur.lastrowid + + # First read: triggers migration + env1 = models.get_environment(conn, env_id) + assert env1["auth_value"] == plaintext + + # Second read: now reads Fernet token (post-migration) + env2 = models.get_environment(conn, env_id) + assert env2["auth_value"] == plaintext + + +# --------------------------------------------------------------------------- +# AC3: Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500) +# --------------------------------------------------------------------------- + +def test_scan_endpoint_returns_503_when_kin_secret_key_missing(scan_client, monkeypatch): + """AC3: POST /environments/{id}/scan returns 503 when KIN_SECRET_KEY is not set.""" + client, env_id = scan_client + monkeypatch.delenv("KIN_SECRET_KEY", raising=False) + r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan") + assert r.status_code == 503, ( + f"scan must return 503 when KIN_SECRET_KEY is missing, got {r.status_code}: {r.text}" + ) + + +def test_scan_endpoint_returns_503_not_500(scan_client, monkeypatch): + """AC3: HTTP 503 (misconfiguration) must be returned, not 500 (code bug).""" + client, env_id = scan_client + monkeypatch.delenv("KIN_SECRET_KEY", raising=False) + r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan") + assert r.status_code != 500, "Missing KIN_SECRET_KEY must produce 503, not 500" + assert r.status_code == 503 + + +def test_scan_endpoint_returns_202_when_key_present(scan_client): + """AC3: scan endpoint returns 202 when KIN_SECRET_KEY is correctly set.""" + client, env_id = scan_client + with patch("subprocess.Popen") as mock_popen: + mock_popen.return_value = MagicMock(pid=12345) + r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan") + assert r.status_code == 202 + + +# --------------------------------------------------------------------------- +# AC4: Runner path — get_environment() returns decrypted plaintext auth_value +# --------------------------------------------------------------------------- + +def test_get_environment_returns_decrypted_auth_value(conn_with_project): + """AC4: get_environment() returns plaintext, not the Fernet token stored in DB.""" + conn = conn_with_project + plaintext = "runner_secret_42" + env = models.create_environment( + conn, "testproj", "runner-env", "10.0.0.10", "root", + auth_value=plaintext, + ) + env_id = env["id"] + + fetched = models.get_environment(conn, env_id) + assert fetched["auth_value"] == plaintext, ( + f"get_environment must return plaintext auth_value, got: {fetched['auth_value']!r}" + ) + + +def test_get_environment_auth_value_is_not_fernet_token(conn_with_project): + """AC4: auth_value from get_environment is decrypted (not a Fernet base64 token).""" + conn = conn_with_project + plaintext = "real_password_xyz" + env = models.create_environment( + conn, "testproj", "fernet-check", "10.0.0.11", "user", + auth_value=plaintext, + ) + + # Verify DB stores encrypted (not plaintext) + raw_stored = conn.execute( + "SELECT auth_value FROM project_environments WHERE id = ?", (env["id"],) + ).fetchone()["auth_value"] + assert raw_stored != plaintext, "DB must store encrypted value, not plaintext" + + # get_environment must return decrypted plaintext + fetched = models.get_environment(conn, env["id"]) + assert fetched["auth_value"] == plaintext + + +def test_get_environment_returns_none_auth_value_when_not_set(conn_with_project): + """AC4: get_environment() returns auth_value=None when no credential was stored.""" + conn = conn_with_project + env = models.create_environment( + conn, "testproj", "no-cred", "10.0.0.12", "user", + auth_value=None, + ) + fetched = models.get_environment(conn, env["id"]) + assert fetched["auth_value"] is None + + +def test_create_environment_hides_auth_value_in_return(conn_with_project): + """AC4: create_environment() returns auth_value=None — plaintext only via get_environment.""" + conn = conn_with_project + env = models.create_environment( + conn, "testproj", "hidden-cred", "10.0.0.13", "user", + auth_value="secret", + ) + assert env["auth_value"] is None, ( + "create_environment must return auth_value=None for API safety" + ) + + +# --------------------------------------------------------------------------- +# AC5: Old _obfuscate_auth / _deobfuscate_auth are not present anywhere +# --------------------------------------------------------------------------- + +def test_obfuscate_auth_not_in_core_models(): + """AC5: _obfuscate_auth must not exist in core.models (fully removed).""" + import core.models as m + assert not hasattr(m, "_obfuscate_auth"), ( + "_obfuscate_auth must be removed from core.models — use _encrypt_auth instead" + ) + + +def test_deobfuscate_auth_not_in_core_models(): + """AC5: _deobfuscate_auth must not exist in core.models (fully removed).""" + import core.models as m + assert not hasattr(m, "_deobfuscate_auth"), ( + "_deobfuscate_auth must be removed from core.models — use _decrypt_auth instead" + ) + + +def test_obfuscate_auth_not_imported_in_web_api(): + """AC5: _obfuscate_auth must not be imported or defined in web.api.""" + import web.api as api_mod + assert not hasattr(api_mod, "_obfuscate_auth"), ( + "_obfuscate_auth must not appear in web.api" + ) + + +def test_deobfuscate_auth_not_imported_in_web_api(): + """AC5: _deobfuscate_auth must not be imported or defined in web.api.""" + import web.api as api_mod + assert not hasattr(api_mod, "_deobfuscate_auth"), ( + "_deobfuscate_auth must not appear in web.api" + ) diff --git a/web/api.py b/web/api.py index 34ffcc3..a6221f5 100644 --- a/web/api.py +++ b/web/api.py @@ -1088,8 +1088,8 @@ def _trigger_sysadmin_scan(conn, project_id: str, env: dict) -> str: "port": env["port"], "username": env["username"], "auth_type": env["auth_type"], - # auth_value is Fernet-encrypted. Stored in tasks.brief — treat as sensitive. - # Decrypt with _decrypt_auth() from core/models.py. + # auth_value is decrypted plaintext (get_environment decrypts via _decrypt_auth). + # Stored in tasks.brief — treat as sensitive. "auth_value_b64": env.get("auth_value"), "text": ( f"Провести полный аудит среды '{env['name']}' на сервере {env['host']}.\n\n" @@ -1258,8 +1258,11 @@ def delete_environment(project_id: str, env_id: int): if not p: conn.close() raise HTTPException(404, f"Project '{project_id}' not found") - existing = models.get_environment(conn, env_id) - if not existing or existing.get("project_id") != project_id: + # Check existence directly — no decryption needed for delete + row = conn.execute( + "SELECT project_id FROM project_environments WHERE id = ?", (env_id,) + ).fetchone() + if not row or dict(row)["project_id"] != project_id: conn.close() raise HTTPException(404, f"Environment #{env_id} not found") models.delete_environment(conn, env_id) @@ -1270,6 +1273,9 @@ def delete_environment(project_id: str, env_id: int): @app.post("/api/projects/{project_id}/environments/{env_id}/scan", status_code=202) def scan_environment(project_id: str, env_id: int): """Manually re-trigger sysadmin env scan for an environment.""" + import os as _os + if not _os.environ.get("KIN_SECRET_KEY"): + raise HTTPException(503, "Server misconfiguration: KIN_SECRET_KEY is not set. Contact admin.") conn = get_conn() p = models.get_project(conn, project_id) if not p: