kin/tests/test_kin_biz_007_fernet.py

388 lines
16 KiB
Python

"""Tests for KIN-BIZ-007: Fernet encryption of credentials in project_environments.
Acceptance criteria:
1. Roundtrip: _encrypt_auth → _decrypt_auth returns the original string.
2. Migration: b64:-prefixed record is auto-re-encrypted on read; decrypt returns plaintext.
3. Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500).
4. Runner path: get_environment() returns decrypted plaintext auth_value.
5. Old _obfuscate_auth / _deobfuscate_auth are not present anywhere.
Decision #214: patch на consuming-модуле, не на defining.
Decision #215: использовать mock.assert_called_once().
"""
import base64
import os
import pytest
from unittest.mock import patch, MagicMock
from core.db import init_db
from core import models
@pytest.fixture
def conn():
"""Fresh in-memory DB for each test."""
c = init_db(db_path=":memory:")
yield c
c.close()
@pytest.fixture
def conn_with_project(conn):
"""In-memory DB with a test project."""
models.create_project(conn, "testproj", "Test Project", "/test")
return conn
@pytest.fixture
def scan_client(tmp_path):
"""TestClient with project + environment pre-created. Returns (client, env_id)."""
import web.api as api_module
api_module.DB_PATH = tmp_path / "scan_biz007.db"
from web.api import app
from fastapi.testclient import TestClient
c = TestClient(app)
c.post("/api/projects", json={"id": "scanproj", "name": "Scan Project", "path": "/scan"})
r = c.post("/api/projects/scanproj/environments", json={
"name": "prod", "host": "10.0.0.1", "username": "root",
})
env_id = r.json()["id"]
return c, env_id
@pytest.fixture
def env_client(tmp_path):
"""TestClient with just a project pre-created. Returns client."""
import web.api as api_module
api_module.DB_PATH = tmp_path / "env_biz007.db"
from web.api import app
from fastapi.testclient import TestClient
c = TestClient(app)
c.post("/api/projects", json={"id": "envproj", "name": "Env Project", "path": "/env"})
return c
# ---------------------------------------------------------------------------
# AC1: Roundtrip — _encrypt_auth → _decrypt_auth returns original string
# ---------------------------------------------------------------------------
def test_encrypt_decrypt_roundtrip_returns_original(conn):
"""AC1: encrypt → decrypt returns the exact original plaintext."""
original = "my_super_secret_password"
encrypted = models._encrypt_auth(original)
decrypted = models._decrypt_auth(encrypted)
assert decrypted == original
def test_encrypt_produces_different_value_than_plaintext(conn):
"""AC1: encrypted value is not the original (Fernet token, not plaintext)."""
original = "plain_secret"
encrypted = models._encrypt_auth(original)
assert encrypted != original
assert not encrypted.startswith("b64:")
def test_encrypt_two_calls_produce_different_tokens(conn):
"""AC1: Fernet uses random IV — two encryptions of same value differ but both decrypt correctly."""
value = "same_password"
enc1 = models._encrypt_auth(value)
enc2 = models._encrypt_auth(value)
# Encrypted forms must differ due to Fernet IV randomness
assert enc1 != enc2
# Both must decrypt to original
assert models._decrypt_auth(enc1) == value
assert models._decrypt_auth(enc2) == value
def test_encrypt_raises_runtime_error_when_no_key(monkeypatch):
"""AC1: _encrypt_auth raises RuntimeError when KIN_SECRET_KEY is absent."""
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
with pytest.raises(RuntimeError, match="KIN_SECRET_KEY"):
models._encrypt_auth("any_value")
def test_decrypt_fernet_token_without_key_returns_raw_not_plaintext(monkeypatch):
"""AC1: _decrypt_auth without key cannot recover plaintext — returns stored token, not original."""
original = "secret"
encrypted = models._encrypt_auth(original)
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
result = models._decrypt_auth(encrypted)
# Without the key we cannot get the plaintext back
assert result != original
# ---------------------------------------------------------------------------
# AC2: Migration — b64: record auto-re-encrypted on read
# ---------------------------------------------------------------------------
def test_decrypt_auth_handles_b64_prefix_without_db(conn):
"""AC2: _decrypt_auth decodes b64:-prefixed value (no DB needed for the decode itself)."""
plaintext = "legacy_password"
b64_stored = "b64:" + base64.b64encode(plaintext.encode()).decode()
decrypted = models._decrypt_auth(b64_stored)
assert decrypted == plaintext
def test_decrypt_auth_b64_rewrites_db_when_conn_provided(conn_with_project):
"""AC2: _decrypt_auth with conn+env_id re-encrypts b64: value in DB on read."""
conn = conn_with_project
plaintext = "legacy_pass_123"
b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode()
cur = conn.execute(
"""INSERT INTO project_environments
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
VALUES ('testproj', 'legacy', 'host.example.com', 22, 'root', 'password', ?, 0)""",
(b64_value,),
)
conn.commit()
env_id = cur.lastrowid
# Call decrypt with conn+env_id — must trigger re-encryption
decrypted = models._decrypt_auth(b64_value, conn=conn, env_id=env_id)
assert decrypted == plaintext
# DB must now have Fernet token, not b64:
stored_after = conn.execute(
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
).fetchone()["auth_value"]
assert not stored_after.startswith("b64:"), (
"After migration, b64: prefix must be replaced by a Fernet token"
)
# And the new token must decrypt correctly
assert models._decrypt_auth(stored_after) == plaintext
def test_get_environment_migrates_b64_and_returns_plaintext(conn_with_project):
"""AC2: get_environment() transparently migrates b64: values and returns plaintext auth_value."""
conn = conn_with_project
plaintext = "old_secret"
b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode()
cur = conn.execute(
"""INSERT INTO project_environments
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
VALUES ('testproj', 'legacy2', 'host2.example.com', 22, 'root', 'password', ?, 0)""",
(b64_value,),
)
conn.commit()
env_id = cur.lastrowid
env = models.get_environment(conn, env_id)
assert env["auth_value"] == plaintext, (
f"get_environment must return plaintext after b64 migration, got: {env['auth_value']!r}"
)
# DB must be updated: b64: replaced by Fernet token
stored_after = conn.execute(
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
).fetchone()["auth_value"]
assert not stored_after.startswith("b64:"), (
"DB must contain Fernet token after get_environment migrates b64: record"
)
def test_get_environment_second_read_after_migration_still_decrypts(conn_with_project):
"""AC2: After b64 migration, subsequent get_environment calls still return plaintext."""
conn = conn_with_project
plaintext = "migrated_secret"
b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode()
cur = conn.execute(
"""INSERT INTO project_environments
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
VALUES ('testproj', 'legacy3', 'host3.example.com', 22, 'root', 'password', ?, 0)""",
(b64_value,),
)
conn.commit()
env_id = cur.lastrowid
# First read: triggers migration
env1 = models.get_environment(conn, env_id)
assert env1["auth_value"] == plaintext
# Second read: now reads Fernet token (post-migration)
env2 = models.get_environment(conn, env_id)
assert env2["auth_value"] == plaintext
# ---------------------------------------------------------------------------
# AC3: Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500)
# ---------------------------------------------------------------------------
def test_scan_endpoint_returns_503_when_kin_secret_key_missing(scan_client, monkeypatch):
"""AC3: POST /environments/{id}/scan returns 503 when KIN_SECRET_KEY is not set."""
client, env_id = scan_client
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan")
assert r.status_code == 503, (
f"scan must return 503 when KIN_SECRET_KEY is missing, got {r.status_code}: {r.text}"
)
def test_scan_endpoint_returns_503_not_500(scan_client, monkeypatch):
"""AC3: HTTP 503 (misconfiguration) must be returned, not 500 (code bug)."""
client, env_id = scan_client
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan")
assert r.status_code != 500, "Missing KIN_SECRET_KEY must produce 503, not 500"
assert r.status_code == 503
def test_scan_endpoint_returns_202_when_key_present(scan_client):
"""AC3: scan endpoint returns 202 when KIN_SECRET_KEY is correctly set."""
client, env_id = scan_client
with patch("subprocess.Popen") as mock_popen:
mock_popen.return_value = MagicMock(pid=12345)
r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan")
assert r.status_code == 202
# ---------------------------------------------------------------------------
# AC4: Runner path — get_environment() returns decrypted plaintext auth_value
# ---------------------------------------------------------------------------
def test_get_environment_returns_decrypted_auth_value(conn_with_project):
"""AC4: get_environment() returns plaintext, not the Fernet token stored in DB."""
conn = conn_with_project
plaintext = "runner_secret_42"
env = models.create_environment(
conn, "testproj", "runner-env", "10.0.0.10", "root",
auth_value=plaintext,
)
env_id = env["id"]
fetched = models.get_environment(conn, env_id)
assert fetched["auth_value"] == plaintext, (
f"get_environment must return plaintext auth_value, got: {fetched['auth_value']!r}"
)
def test_get_environment_auth_value_is_not_fernet_token(conn_with_project):
"""AC4: auth_value from get_environment is decrypted (not a Fernet base64 token)."""
conn = conn_with_project
plaintext = "real_password_xyz"
env = models.create_environment(
conn, "testproj", "fernet-check", "10.0.0.11", "user",
auth_value=plaintext,
)
# Verify DB stores encrypted (not plaintext)
raw_stored = conn.execute(
"SELECT auth_value FROM project_environments WHERE id = ?", (env["id"],)
).fetchone()["auth_value"]
assert raw_stored != plaintext, "DB must store encrypted value, not plaintext"
# get_environment must return decrypted plaintext
fetched = models.get_environment(conn, env["id"])
assert fetched["auth_value"] == plaintext
def test_get_environment_returns_none_auth_value_when_not_set(conn_with_project):
"""AC4: get_environment() returns auth_value=None when no credential was stored."""
conn = conn_with_project
env = models.create_environment(
conn, "testproj", "no-cred", "10.0.0.12", "user",
auth_value=None,
)
fetched = models.get_environment(conn, env["id"])
assert fetched["auth_value"] is None
def test_create_environment_hides_auth_value_in_return(conn_with_project):
"""AC4: create_environment() returns auth_value=None — plaintext only via get_environment."""
conn = conn_with_project
env = models.create_environment(
conn, "testproj", "hidden-cred", "10.0.0.13", "user",
auth_value="secret",
)
assert env["auth_value"] is None, (
"create_environment must return auth_value=None for API safety"
)
# ---------------------------------------------------------------------------
# AC5: Old _obfuscate_auth / _deobfuscate_auth are not present anywhere
# ---------------------------------------------------------------------------
def test_obfuscate_auth_not_in_core_models():
"""AC5: _obfuscate_auth must not exist in core.models (fully removed)."""
import core.models as m
assert not hasattr(m, "_obfuscate_auth"), (
"_obfuscate_auth must be removed from core.models — use _encrypt_auth instead"
)
def test_deobfuscate_auth_not_in_core_models():
"""AC5: _deobfuscate_auth must not exist in core.models (fully removed)."""
import core.models as m
assert not hasattr(m, "_deobfuscate_auth"), (
"_deobfuscate_auth must be removed from core.models — use _decrypt_auth instead"
)
def test_obfuscate_auth_not_imported_in_web_api():
"""AC5: _obfuscate_auth must not be imported or defined in web.api."""
import web.api as api_mod
assert not hasattr(api_mod, "_obfuscate_auth"), (
"_obfuscate_auth must not appear in web.api"
)
def test_deobfuscate_auth_not_imported_in_web_api():
"""AC5: _deobfuscate_auth must not be imported or defined in web.api."""
import web.api as api_mod
assert not hasattr(api_mod, "_deobfuscate_auth"), (
"_deobfuscate_auth must not appear in web.api"
)
# ---------------------------------------------------------------------------
# AC6 (KIN-095): ModuleNotFoundError for cryptography → 503, not 500
# ---------------------------------------------------------------------------
def test_create_environment_returns_503_when_cryptography_not_installed(env_client):
"""AC6: POST /environments returns 503 when cryptography package missing (not 500)."""
client = env_client
with patch("core.models._encrypt_auth", side_effect=ModuleNotFoundError("No module named 'cryptography'")):
r = client.post("/api/projects/envproj/environments", json={
"name": "creds-env", "host": "10.0.0.20", "username": "root",
"auth_type": "password", "auth_value": "secret",
})
assert r.status_code == 503, (
f"create_environment must return 503 when cryptography is missing, got {r.status_code}: {r.text}"
)
assert "cryptography" in r.json()["detail"].lower()
def test_create_environment_returns_503_not_500_for_missing_cryptography(env_client):
"""AC6: 500 must NOT be returned when cryptography package is absent."""
client = env_client
with patch("core.models._encrypt_auth", side_effect=ModuleNotFoundError("No module named 'cryptography'")):
r = client.post("/api/projects/envproj/environments", json={
"name": "creds-env2", "host": "10.0.0.21", "username": "root",
"auth_value": "secret2",
})
assert r.status_code != 500, "Missing cryptography must produce 503, not 500"
def test_patch_environment_returns_503_when_cryptography_not_installed(env_client):
"""AC6: PATCH /environments/{id} returns 503 when cryptography package missing."""
client = env_client
# Create env without auth_value so no encryption at create time
r = client.post("/api/projects/envproj/environments", json={
"name": "patch-env", "host": "10.0.0.22", "username": "root",
})
assert r.status_code == 201, f"Setup failed: {r.text}"
env_id = r.json()["id"]
with patch("core.models._encrypt_auth", side_effect=ModuleNotFoundError("No module named 'cryptography'")):
r = client.patch(f"/api/projects/envproj/environments/{env_id}", json={
"auth_value": "new_secret",
})
assert r.status_code == 503, (
f"patch_environment must return 503 when cryptography is missing, got {r.status_code}: {r.text}"
)
assert "cryptography" in r.json()["detail"].lower()