"""Tests for KIN-BIZ-007: Fernet encryption of credentials in project_environments. Acceptance criteria: 1. Roundtrip: _encrypt_auth → _decrypt_auth returns the original string. 2. Migration: b64:-prefixed record is auto-re-encrypted on read; decrypt returns plaintext. 3. Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500). 4. Runner path: get_environment() returns decrypted plaintext auth_value. 5. Old _obfuscate_auth / _deobfuscate_auth are not present anywhere. Decision #214: patch на consuming-модуле, не на defining. Decision #215: использовать mock.assert_called_once(). """ import base64 import os import pytest from unittest.mock import patch, MagicMock from core.db import init_db from core import models @pytest.fixture def conn(): """Fresh in-memory DB for each test.""" c = init_db(db_path=":memory:") yield c c.close() @pytest.fixture def conn_with_project(conn): """In-memory DB with a test project.""" models.create_project(conn, "testproj", "Test Project", "/test") return conn @pytest.fixture def scan_client(tmp_path): """TestClient with project + environment pre-created. Returns (client, env_id).""" import web.api as api_module api_module.DB_PATH = tmp_path / "scan_biz007.db" from web.api import app from fastapi.testclient import TestClient c = TestClient(app) c.post("/api/projects", json={"id": "scanproj", "name": "Scan Project", "path": "/scan"}) r = c.post("/api/projects/scanproj/environments", json={ "name": "prod", "host": "10.0.0.1", "username": "root", }) env_id = r.json()["id"] return c, env_id @pytest.fixture def env_client(tmp_path): """TestClient with just a project pre-created. Returns client.""" import web.api as api_module api_module.DB_PATH = tmp_path / "env_biz007.db" from web.api import app from fastapi.testclient import TestClient c = TestClient(app) c.post("/api/projects", json={"id": "envproj", "name": "Env Project", "path": "/env"}) return c # --------------------------------------------------------------------------- # AC1: Roundtrip — _encrypt_auth → _decrypt_auth returns original string # --------------------------------------------------------------------------- def test_encrypt_decrypt_roundtrip_returns_original(conn): """AC1: encrypt → decrypt returns the exact original plaintext.""" original = "my_super_secret_password" encrypted = models._encrypt_auth(original) decrypted = models._decrypt_auth(encrypted) assert decrypted == original def test_encrypt_produces_different_value_than_plaintext(conn): """AC1: encrypted value is not the original (Fernet token, not plaintext).""" original = "plain_secret" encrypted = models._encrypt_auth(original) assert encrypted != original assert not encrypted.startswith("b64:") def test_encrypt_two_calls_produce_different_tokens(conn): """AC1: Fernet uses random IV — two encryptions of same value differ but both decrypt correctly.""" value = "same_password" enc1 = models._encrypt_auth(value) enc2 = models._encrypt_auth(value) # Encrypted forms must differ due to Fernet IV randomness assert enc1 != enc2 # Both must decrypt to original assert models._decrypt_auth(enc1) == value assert models._decrypt_auth(enc2) == value def test_encrypt_raises_runtime_error_when_no_key(monkeypatch): """AC1: _encrypt_auth raises RuntimeError when KIN_SECRET_KEY is absent.""" monkeypatch.delenv("KIN_SECRET_KEY", raising=False) with pytest.raises(RuntimeError, match="KIN_SECRET_KEY"): models._encrypt_auth("any_value") def test_decrypt_fernet_token_without_key_returns_raw_not_plaintext(monkeypatch): """AC1: _decrypt_auth without key cannot recover plaintext — returns stored token, not original.""" original = "secret" encrypted = models._encrypt_auth(original) monkeypatch.delenv("KIN_SECRET_KEY", raising=False) result = models._decrypt_auth(encrypted) # Without the key we cannot get the plaintext back assert result != original # --------------------------------------------------------------------------- # AC2: Migration — b64: record auto-re-encrypted on read # --------------------------------------------------------------------------- def test_decrypt_auth_handles_b64_prefix_without_db(conn): """AC2: _decrypt_auth decodes b64:-prefixed value (no DB needed for the decode itself).""" plaintext = "legacy_password" b64_stored = "b64:" + base64.b64encode(plaintext.encode()).decode() decrypted = models._decrypt_auth(b64_stored) assert decrypted == plaintext def test_decrypt_auth_b64_rewrites_db_when_conn_provided(conn_with_project): """AC2: _decrypt_auth with conn+env_id re-encrypts b64: value in DB on read.""" conn = conn_with_project plaintext = "legacy_pass_123" b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode() cur = conn.execute( """INSERT INTO project_environments (project_id, name, host, port, username, auth_type, auth_value, is_installed) VALUES ('testproj', 'legacy', 'host.example.com', 22, 'root', 'password', ?, 0)""", (b64_value,), ) conn.commit() env_id = cur.lastrowid # Call decrypt with conn+env_id — must trigger re-encryption decrypted = models._decrypt_auth(b64_value, conn=conn, env_id=env_id) assert decrypted == plaintext # DB must now have Fernet token, not b64: stored_after = conn.execute( "SELECT auth_value FROM project_environments WHERE id = ?", (env_id,) ).fetchone()["auth_value"] assert not stored_after.startswith("b64:"), ( "After migration, b64: prefix must be replaced by a Fernet token" ) # And the new token must decrypt correctly assert models._decrypt_auth(stored_after) == plaintext def test_get_environment_migrates_b64_and_returns_plaintext(conn_with_project): """AC2: get_environment() transparently migrates b64: values and returns plaintext auth_value.""" conn = conn_with_project plaintext = "old_secret" b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode() cur = conn.execute( """INSERT INTO project_environments (project_id, name, host, port, username, auth_type, auth_value, is_installed) VALUES ('testproj', 'legacy2', 'host2.example.com', 22, 'root', 'password', ?, 0)""", (b64_value,), ) conn.commit() env_id = cur.lastrowid env = models.get_environment(conn, env_id) assert env["auth_value"] == plaintext, ( f"get_environment must return plaintext after b64 migration, got: {env['auth_value']!r}" ) # DB must be updated: b64: replaced by Fernet token stored_after = conn.execute( "SELECT auth_value FROM project_environments WHERE id = ?", (env_id,) ).fetchone()["auth_value"] assert not stored_after.startswith("b64:"), ( "DB must contain Fernet token after get_environment migrates b64: record" ) def test_get_environment_second_read_after_migration_still_decrypts(conn_with_project): """AC2: After b64 migration, subsequent get_environment calls still return plaintext.""" conn = conn_with_project plaintext = "migrated_secret" b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode() cur = conn.execute( """INSERT INTO project_environments (project_id, name, host, port, username, auth_type, auth_value, is_installed) VALUES ('testproj', 'legacy3', 'host3.example.com', 22, 'root', 'password', ?, 0)""", (b64_value,), ) conn.commit() env_id = cur.lastrowid # First read: triggers migration env1 = models.get_environment(conn, env_id) assert env1["auth_value"] == plaintext # Second read: now reads Fernet token (post-migration) env2 = models.get_environment(conn, env_id) assert env2["auth_value"] == plaintext # --------------------------------------------------------------------------- # AC3: Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500) # --------------------------------------------------------------------------- def test_scan_endpoint_returns_503_when_kin_secret_key_missing(scan_client, monkeypatch): """AC3: POST /environments/{id}/scan returns 503 when KIN_SECRET_KEY is not set.""" client, env_id = scan_client monkeypatch.delenv("KIN_SECRET_KEY", raising=False) r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan") assert r.status_code == 503, ( f"scan must return 503 when KIN_SECRET_KEY is missing, got {r.status_code}: {r.text}" ) def test_scan_endpoint_returns_503_not_500(scan_client, monkeypatch): """AC3: HTTP 503 (misconfiguration) must be returned, not 500 (code bug).""" client, env_id = scan_client monkeypatch.delenv("KIN_SECRET_KEY", raising=False) r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan") assert r.status_code != 500, "Missing KIN_SECRET_KEY must produce 503, not 500" assert r.status_code == 503 def test_scan_endpoint_returns_202_when_key_present(scan_client): """AC3: scan endpoint returns 202 when KIN_SECRET_KEY is correctly set.""" client, env_id = scan_client with patch("subprocess.Popen") as mock_popen: mock_popen.return_value = MagicMock(pid=12345) r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan") assert r.status_code == 202 # --------------------------------------------------------------------------- # AC4: Runner path — get_environment() returns decrypted plaintext auth_value # --------------------------------------------------------------------------- def test_get_environment_returns_decrypted_auth_value(conn_with_project): """AC4: get_environment() returns plaintext, not the Fernet token stored in DB.""" conn = conn_with_project plaintext = "runner_secret_42" env = models.create_environment( conn, "testproj", "runner-env", "10.0.0.10", "root", auth_value=plaintext, ) env_id = env["id"] fetched = models.get_environment(conn, env_id) assert fetched["auth_value"] == plaintext, ( f"get_environment must return plaintext auth_value, got: {fetched['auth_value']!r}" ) def test_get_environment_auth_value_is_not_fernet_token(conn_with_project): """AC4: auth_value from get_environment is decrypted (not a Fernet base64 token).""" conn = conn_with_project plaintext = "real_password_xyz" env = models.create_environment( conn, "testproj", "fernet-check", "10.0.0.11", "user", auth_value=plaintext, ) # Verify DB stores encrypted (not plaintext) raw_stored = conn.execute( "SELECT auth_value FROM project_environments WHERE id = ?", (env["id"],) ).fetchone()["auth_value"] assert raw_stored != plaintext, "DB must store encrypted value, not plaintext" # get_environment must return decrypted plaintext fetched = models.get_environment(conn, env["id"]) assert fetched["auth_value"] == plaintext def test_get_environment_returns_none_auth_value_when_not_set(conn_with_project): """AC4: get_environment() returns auth_value=None when no credential was stored.""" conn = conn_with_project env = models.create_environment( conn, "testproj", "no-cred", "10.0.0.12", "user", auth_value=None, ) fetched = models.get_environment(conn, env["id"]) assert fetched["auth_value"] is None def test_create_environment_hides_auth_value_in_return(conn_with_project): """AC4: create_environment() returns auth_value=None — plaintext only via get_environment.""" conn = conn_with_project env = models.create_environment( conn, "testproj", "hidden-cred", "10.0.0.13", "user", auth_value="secret", ) assert env["auth_value"] is None, ( "create_environment must return auth_value=None for API safety" ) # --------------------------------------------------------------------------- # AC5: Old _obfuscate_auth / _deobfuscate_auth are not present anywhere # --------------------------------------------------------------------------- def test_obfuscate_auth_not_in_core_models(): """AC5: _obfuscate_auth must not exist in core.models (fully removed).""" import core.models as m assert not hasattr(m, "_obfuscate_auth"), ( "_obfuscate_auth must be removed from core.models — use _encrypt_auth instead" ) def test_deobfuscate_auth_not_in_core_models(): """AC5: _deobfuscate_auth must not exist in core.models (fully removed).""" import core.models as m assert not hasattr(m, "_deobfuscate_auth"), ( "_deobfuscate_auth must be removed from core.models — use _decrypt_auth instead" ) def test_obfuscate_auth_not_imported_in_web_api(): """AC5: _obfuscate_auth must not be imported or defined in web.api.""" import web.api as api_mod assert not hasattr(api_mod, "_obfuscate_auth"), ( "_obfuscate_auth must not appear in web.api" ) def test_deobfuscate_auth_not_imported_in_web_api(): """AC5: _deobfuscate_auth must not be imported or defined in web.api.""" import web.api as api_mod assert not hasattr(api_mod, "_deobfuscate_auth"), ( "_deobfuscate_auth must not appear in web.api" ) # --------------------------------------------------------------------------- # AC6 (KIN-095): ModuleNotFoundError for cryptography → 503, not 500 # --------------------------------------------------------------------------- def test_create_environment_returns_503_when_cryptography_not_installed(env_client): """AC6: POST /environments returns 503 when cryptography package missing (not 500).""" client = env_client with patch("core.models._encrypt_auth", side_effect=ModuleNotFoundError("No module named 'cryptography'")): r = client.post("/api/projects/envproj/environments", json={ "name": "creds-env", "host": "10.0.0.20", "username": "root", "auth_type": "password", "auth_value": "secret", }) assert r.status_code == 503, ( f"create_environment must return 503 when cryptography is missing, got {r.status_code}: {r.text}" ) assert "cryptography" in r.json()["detail"].lower() def test_create_environment_returns_503_not_500_for_missing_cryptography(env_client): """AC6: 500 must NOT be returned when cryptography package is absent.""" client = env_client with patch("core.models._encrypt_auth", side_effect=ModuleNotFoundError("No module named 'cryptography'")): r = client.post("/api/projects/envproj/environments", json={ "name": "creds-env2", "host": "10.0.0.21", "username": "root", "auth_value": "secret2", }) assert r.status_code != 500, "Missing cryptography must produce 503, not 500" def test_patch_environment_returns_503_when_cryptography_not_installed(env_client): """AC6: PATCH /environments/{id} returns 503 when cryptography package missing.""" client = env_client # Create env without auth_value so no encryption at create time r = client.post("/api/projects/envproj/environments", json={ "name": "patch-env", "host": "10.0.0.22", "username": "root", }) assert r.status_code == 201, f"Setup failed: {r.text}" env_id = r.json()["id"] with patch("core.models._encrypt_auth", side_effect=ModuleNotFoundError("No module named 'cryptography'")): r = client.patch(f"/api/projects/envproj/environments/{env_id}", json={ "auth_value": "new_secret", }) assert r.status_code == 503, ( f"patch_environment must return 503 when cryptography is missing, got {r.status_code}: {r.text}" ) assert "cryptography" in r.json()["detail"].lower()