2026-03-16 20:55:01 +02:00
|
|
|
"""Tests for KIN-BIZ-007: Fernet encryption of credentials in project_environments.
|
|
|
|
|
|
|
|
|
|
Acceptance criteria:
|
|
|
|
|
1. Roundtrip: _encrypt_auth → _decrypt_auth returns the original string.
|
|
|
|
|
2. Migration: b64:-prefixed record is auto-re-encrypted on read; decrypt returns plaintext.
|
|
|
|
|
3. Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500).
|
|
|
|
|
4. Runner path: get_environment() returns decrypted plaintext auth_value.
|
|
|
|
|
5. Old _obfuscate_auth / _deobfuscate_auth are not present anywhere.
|
|
|
|
|
|
|
|
|
|
Decision #214: patch на consuming-модуле, не на defining.
|
|
|
|
|
Decision #215: использовать mock.assert_called_once().
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import base64
|
|
|
|
|
import os
|
|
|
|
|
import pytest
|
|
|
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
|
|
|
|
|
|
from core.db import init_db
|
|
|
|
|
from core import models
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def conn():
|
|
|
|
|
"""Fresh in-memory DB for each test."""
|
|
|
|
|
c = init_db(db_path=":memory:")
|
|
|
|
|
yield c
|
|
|
|
|
c.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def conn_with_project(conn):
|
|
|
|
|
"""In-memory DB with a test project."""
|
|
|
|
|
models.create_project(conn, "testproj", "Test Project", "/test")
|
|
|
|
|
return conn
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def scan_client(tmp_path):
|
|
|
|
|
"""TestClient with project + environment pre-created. Returns (client, env_id)."""
|
|
|
|
|
import web.api as api_module
|
|
|
|
|
api_module.DB_PATH = tmp_path / "scan_biz007.db"
|
|
|
|
|
from web.api import app
|
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
c = TestClient(app)
|
|
|
|
|
c.post("/api/projects", json={"id": "scanproj", "name": "Scan Project", "path": "/scan"})
|
|
|
|
|
r = c.post("/api/projects/scanproj/environments", json={
|
|
|
|
|
"name": "prod", "host": "10.0.0.1", "username": "root",
|
|
|
|
|
})
|
|
|
|
|
env_id = r.json()["id"]
|
|
|
|
|
return c, env_id
|
|
|
|
|
|
|
|
|
|
|
2026-03-16 20:58:44 +02:00
|
|
|
@pytest.fixture
|
|
|
|
|
def env_client(tmp_path):
|
|
|
|
|
"""TestClient with just a project pre-created. Returns client."""
|
|
|
|
|
import web.api as api_module
|
|
|
|
|
api_module.DB_PATH = tmp_path / "env_biz007.db"
|
|
|
|
|
from web.api import app
|
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
c = TestClient(app)
|
|
|
|
|
c.post("/api/projects", json={"id": "envproj", "name": "Env Project", "path": "/env"})
|
|
|
|
|
return c
|
|
|
|
|
|
|
|
|
|
|
2026-03-16 20:55:01 +02:00
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# AC1: Roundtrip — _encrypt_auth → _decrypt_auth returns original string
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def test_encrypt_decrypt_roundtrip_returns_original(conn):
|
|
|
|
|
"""AC1: encrypt → decrypt returns the exact original plaintext."""
|
|
|
|
|
original = "my_super_secret_password"
|
|
|
|
|
encrypted = models._encrypt_auth(original)
|
|
|
|
|
decrypted = models._decrypt_auth(encrypted)
|
|
|
|
|
assert decrypted == original
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_encrypt_produces_different_value_than_plaintext(conn):
|
|
|
|
|
"""AC1: encrypted value is not the original (Fernet token, not plaintext)."""
|
|
|
|
|
original = "plain_secret"
|
|
|
|
|
encrypted = models._encrypt_auth(original)
|
|
|
|
|
assert encrypted != original
|
|
|
|
|
assert not encrypted.startswith("b64:")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_encrypt_two_calls_produce_different_tokens(conn):
|
|
|
|
|
"""AC1: Fernet uses random IV — two encryptions of same value differ but both decrypt correctly."""
|
|
|
|
|
value = "same_password"
|
|
|
|
|
enc1 = models._encrypt_auth(value)
|
|
|
|
|
enc2 = models._encrypt_auth(value)
|
|
|
|
|
# Encrypted forms must differ due to Fernet IV randomness
|
|
|
|
|
assert enc1 != enc2
|
|
|
|
|
# Both must decrypt to original
|
|
|
|
|
assert models._decrypt_auth(enc1) == value
|
|
|
|
|
assert models._decrypt_auth(enc2) == value
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_encrypt_raises_runtime_error_when_no_key(monkeypatch):
|
|
|
|
|
"""AC1: _encrypt_auth raises RuntimeError when KIN_SECRET_KEY is absent."""
|
|
|
|
|
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
|
|
|
|
|
with pytest.raises(RuntimeError, match="KIN_SECRET_KEY"):
|
|
|
|
|
models._encrypt_auth("any_value")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_decrypt_fernet_token_without_key_returns_raw_not_plaintext(monkeypatch):
|
|
|
|
|
"""AC1: _decrypt_auth without key cannot recover plaintext — returns stored token, not original."""
|
|
|
|
|
original = "secret"
|
|
|
|
|
encrypted = models._encrypt_auth(original)
|
|
|
|
|
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
|
|
|
|
|
result = models._decrypt_auth(encrypted)
|
|
|
|
|
# Without the key we cannot get the plaintext back
|
|
|
|
|
assert result != original
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# AC2: Migration — b64: record auto-re-encrypted on read
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def test_decrypt_auth_handles_b64_prefix_without_db(conn):
|
|
|
|
|
"""AC2: _decrypt_auth decodes b64:-prefixed value (no DB needed for the decode itself)."""
|
|
|
|
|
plaintext = "legacy_password"
|
|
|
|
|
b64_stored = "b64:" + base64.b64encode(plaintext.encode()).decode()
|
|
|
|
|
decrypted = models._decrypt_auth(b64_stored)
|
|
|
|
|
assert decrypted == plaintext
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_decrypt_auth_b64_rewrites_db_when_conn_provided(conn_with_project):
|
|
|
|
|
"""AC2: _decrypt_auth with conn+env_id re-encrypts b64: value in DB on read."""
|
|
|
|
|
conn = conn_with_project
|
|
|
|
|
plaintext = "legacy_pass_123"
|
|
|
|
|
b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode()
|
|
|
|
|
|
|
|
|
|
cur = conn.execute(
|
|
|
|
|
"""INSERT INTO project_environments
|
|
|
|
|
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
|
|
|
|
|
VALUES ('testproj', 'legacy', 'host.example.com', 22, 'root', 'password', ?, 0)""",
|
|
|
|
|
(b64_value,),
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
env_id = cur.lastrowid
|
|
|
|
|
|
|
|
|
|
# Call decrypt with conn+env_id — must trigger re-encryption
|
|
|
|
|
decrypted = models._decrypt_auth(b64_value, conn=conn, env_id=env_id)
|
|
|
|
|
assert decrypted == plaintext
|
|
|
|
|
|
|
|
|
|
# DB must now have Fernet token, not b64:
|
|
|
|
|
stored_after = conn.execute(
|
|
|
|
|
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
|
|
|
|
|
).fetchone()["auth_value"]
|
|
|
|
|
assert not stored_after.startswith("b64:"), (
|
|
|
|
|
"After migration, b64: prefix must be replaced by a Fernet token"
|
|
|
|
|
)
|
|
|
|
|
# And the new token must decrypt correctly
|
|
|
|
|
assert models._decrypt_auth(stored_after) == plaintext
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_environment_migrates_b64_and_returns_plaintext(conn_with_project):
|
|
|
|
|
"""AC2: get_environment() transparently migrates b64: values and returns plaintext auth_value."""
|
|
|
|
|
conn = conn_with_project
|
|
|
|
|
plaintext = "old_secret"
|
|
|
|
|
b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode()
|
|
|
|
|
|
|
|
|
|
cur = conn.execute(
|
|
|
|
|
"""INSERT INTO project_environments
|
|
|
|
|
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
|
|
|
|
|
VALUES ('testproj', 'legacy2', 'host2.example.com', 22, 'root', 'password', ?, 0)""",
|
|
|
|
|
(b64_value,),
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
env_id = cur.lastrowid
|
|
|
|
|
|
|
|
|
|
env = models.get_environment(conn, env_id)
|
|
|
|
|
|
|
|
|
|
assert env["auth_value"] == plaintext, (
|
|
|
|
|
f"get_environment must return plaintext after b64 migration, got: {env['auth_value']!r}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# DB must be updated: b64: replaced by Fernet token
|
|
|
|
|
stored_after = conn.execute(
|
|
|
|
|
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
|
|
|
|
|
).fetchone()["auth_value"]
|
|
|
|
|
assert not stored_after.startswith("b64:"), (
|
|
|
|
|
"DB must contain Fernet token after get_environment migrates b64: record"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_environment_second_read_after_migration_still_decrypts(conn_with_project):
|
|
|
|
|
"""AC2: After b64 migration, subsequent get_environment calls still return plaintext."""
|
|
|
|
|
conn = conn_with_project
|
|
|
|
|
plaintext = "migrated_secret"
|
|
|
|
|
b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode()
|
|
|
|
|
|
|
|
|
|
cur = conn.execute(
|
|
|
|
|
"""INSERT INTO project_environments
|
|
|
|
|
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
|
|
|
|
|
VALUES ('testproj', 'legacy3', 'host3.example.com', 22, 'root', 'password', ?, 0)""",
|
|
|
|
|
(b64_value,),
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
env_id = cur.lastrowid
|
|
|
|
|
|
|
|
|
|
# First read: triggers migration
|
|
|
|
|
env1 = models.get_environment(conn, env_id)
|
|
|
|
|
assert env1["auth_value"] == plaintext
|
|
|
|
|
|
|
|
|
|
# Second read: now reads Fernet token (post-migration)
|
|
|
|
|
env2 = models.get_environment(conn, env_id)
|
|
|
|
|
assert env2["auth_value"] == plaintext
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# AC3: Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500)
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def test_scan_endpoint_returns_503_when_kin_secret_key_missing(scan_client, monkeypatch):
|
|
|
|
|
"""AC3: POST /environments/{id}/scan returns 503 when KIN_SECRET_KEY is not set."""
|
|
|
|
|
client, env_id = scan_client
|
|
|
|
|
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
|
|
|
|
|
r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan")
|
|
|
|
|
assert r.status_code == 503, (
|
|
|
|
|
f"scan must return 503 when KIN_SECRET_KEY is missing, got {r.status_code}: {r.text}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_scan_endpoint_returns_503_not_500(scan_client, monkeypatch):
|
|
|
|
|
"""AC3: HTTP 503 (misconfiguration) must be returned, not 500 (code bug)."""
|
|
|
|
|
client, env_id = scan_client
|
|
|
|
|
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
|
|
|
|
|
r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan")
|
|
|
|
|
assert r.status_code != 500, "Missing KIN_SECRET_KEY must produce 503, not 500"
|
|
|
|
|
assert r.status_code == 503
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_scan_endpoint_returns_202_when_key_present(scan_client):
|
|
|
|
|
"""AC3: scan endpoint returns 202 when KIN_SECRET_KEY is correctly set."""
|
|
|
|
|
client, env_id = scan_client
|
|
|
|
|
with patch("subprocess.Popen") as mock_popen:
|
|
|
|
|
mock_popen.return_value = MagicMock(pid=12345)
|
|
|
|
|
r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan")
|
|
|
|
|
assert r.status_code == 202
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# AC4: Runner path — get_environment() returns decrypted plaintext auth_value
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def test_get_environment_returns_decrypted_auth_value(conn_with_project):
|
|
|
|
|
"""AC4: get_environment() returns plaintext, not the Fernet token stored in DB."""
|
|
|
|
|
conn = conn_with_project
|
|
|
|
|
plaintext = "runner_secret_42"
|
|
|
|
|
env = models.create_environment(
|
|
|
|
|
conn, "testproj", "runner-env", "10.0.0.10", "root",
|
|
|
|
|
auth_value=plaintext,
|
|
|
|
|
)
|
|
|
|
|
env_id = env["id"]
|
|
|
|
|
|
|
|
|
|
fetched = models.get_environment(conn, env_id)
|
|
|
|
|
assert fetched["auth_value"] == plaintext, (
|
|
|
|
|
f"get_environment must return plaintext auth_value, got: {fetched['auth_value']!r}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_environment_auth_value_is_not_fernet_token(conn_with_project):
|
|
|
|
|
"""AC4: auth_value from get_environment is decrypted (not a Fernet base64 token)."""
|
|
|
|
|
conn = conn_with_project
|
|
|
|
|
plaintext = "real_password_xyz"
|
|
|
|
|
env = models.create_environment(
|
|
|
|
|
conn, "testproj", "fernet-check", "10.0.0.11", "user",
|
|
|
|
|
auth_value=plaintext,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Verify DB stores encrypted (not plaintext)
|
|
|
|
|
raw_stored = conn.execute(
|
|
|
|
|
"SELECT auth_value FROM project_environments WHERE id = ?", (env["id"],)
|
|
|
|
|
).fetchone()["auth_value"]
|
|
|
|
|
assert raw_stored != plaintext, "DB must store encrypted value, not plaintext"
|
|
|
|
|
|
|
|
|
|
# get_environment must return decrypted plaintext
|
|
|
|
|
fetched = models.get_environment(conn, env["id"])
|
|
|
|
|
assert fetched["auth_value"] == plaintext
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_environment_returns_none_auth_value_when_not_set(conn_with_project):
|
|
|
|
|
"""AC4: get_environment() returns auth_value=None when no credential was stored."""
|
|
|
|
|
conn = conn_with_project
|
|
|
|
|
env = models.create_environment(
|
|
|
|
|
conn, "testproj", "no-cred", "10.0.0.12", "user",
|
|
|
|
|
auth_value=None,
|
|
|
|
|
)
|
|
|
|
|
fetched = models.get_environment(conn, env["id"])
|
|
|
|
|
assert fetched["auth_value"] is None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_environment_hides_auth_value_in_return(conn_with_project):
|
|
|
|
|
"""AC4: create_environment() returns auth_value=None — plaintext only via get_environment."""
|
|
|
|
|
conn = conn_with_project
|
|
|
|
|
env = models.create_environment(
|
|
|
|
|
conn, "testproj", "hidden-cred", "10.0.0.13", "user",
|
|
|
|
|
auth_value="secret",
|
|
|
|
|
)
|
|
|
|
|
assert env["auth_value"] is None, (
|
|
|
|
|
"create_environment must return auth_value=None for API safety"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# AC5: Old _obfuscate_auth / _deobfuscate_auth are not present anywhere
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def test_obfuscate_auth_not_in_core_models():
|
|
|
|
|
"""AC5: _obfuscate_auth must not exist in core.models (fully removed)."""
|
|
|
|
|
import core.models as m
|
|
|
|
|
assert not hasattr(m, "_obfuscate_auth"), (
|
|
|
|
|
"_obfuscate_auth must be removed from core.models — use _encrypt_auth instead"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_deobfuscate_auth_not_in_core_models():
|
|
|
|
|
"""AC5: _deobfuscate_auth must not exist in core.models (fully removed)."""
|
|
|
|
|
import core.models as m
|
|
|
|
|
assert not hasattr(m, "_deobfuscate_auth"), (
|
|
|
|
|
"_deobfuscate_auth must be removed from core.models — use _decrypt_auth instead"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_obfuscate_auth_not_imported_in_web_api():
|
|
|
|
|
"""AC5: _obfuscate_auth must not be imported or defined in web.api."""
|
|
|
|
|
import web.api as api_mod
|
|
|
|
|
assert not hasattr(api_mod, "_obfuscate_auth"), (
|
|
|
|
|
"_obfuscate_auth must not appear in web.api"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_deobfuscate_auth_not_imported_in_web_api():
|
|
|
|
|
"""AC5: _deobfuscate_auth must not be imported or defined in web.api."""
|
|
|
|
|
import web.api as api_mod
|
|
|
|
|
assert not hasattr(api_mod, "_deobfuscate_auth"), (
|
|
|
|
|
"_deobfuscate_auth must not appear in web.api"
|
|
|
|
|
)
|
2026-03-16 20:58:44 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# AC6 (KIN-095): ModuleNotFoundError for cryptography → 503, not 500
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def test_create_environment_returns_503_when_cryptography_not_installed(env_client):
|
|
|
|
|
"""AC6: POST /environments returns 503 when cryptography package missing (not 500)."""
|
|
|
|
|
client = env_client
|
|
|
|
|
with patch("core.models._encrypt_auth", side_effect=ModuleNotFoundError("No module named 'cryptography'")):
|
|
|
|
|
r = client.post("/api/projects/envproj/environments", json={
|
|
|
|
|
"name": "creds-env", "host": "10.0.0.20", "username": "root",
|
|
|
|
|
"auth_type": "password", "auth_value": "secret",
|
|
|
|
|
})
|
|
|
|
|
assert r.status_code == 503, (
|
|
|
|
|
f"create_environment must return 503 when cryptography is missing, got {r.status_code}: {r.text}"
|
|
|
|
|
)
|
|
|
|
|
assert "cryptography" in r.json()["detail"].lower()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_environment_returns_503_not_500_for_missing_cryptography(env_client):
|
|
|
|
|
"""AC6: 500 must NOT be returned when cryptography package is absent."""
|
|
|
|
|
client = env_client
|
|
|
|
|
with patch("core.models._encrypt_auth", side_effect=ModuleNotFoundError("No module named 'cryptography'")):
|
|
|
|
|
r = client.post("/api/projects/envproj/environments", json={
|
|
|
|
|
"name": "creds-env2", "host": "10.0.0.21", "username": "root",
|
|
|
|
|
"auth_value": "secret2",
|
|
|
|
|
})
|
|
|
|
|
assert r.status_code != 500, "Missing cryptography must produce 503, not 500"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_patch_environment_returns_503_when_cryptography_not_installed(env_client):
|
|
|
|
|
"""AC6: PATCH /environments/{id} returns 503 when cryptography package missing."""
|
|
|
|
|
client = env_client
|
|
|
|
|
# Create env without auth_value so no encryption at create time
|
|
|
|
|
r = client.post("/api/projects/envproj/environments", json={
|
|
|
|
|
"name": "patch-env", "host": "10.0.0.22", "username": "root",
|
|
|
|
|
})
|
|
|
|
|
assert r.status_code == 201, f"Setup failed: {r.text}"
|
|
|
|
|
env_id = r.json()["id"]
|
|
|
|
|
|
|
|
|
|
with patch("core.models._encrypt_auth", side_effect=ModuleNotFoundError("No module named 'cryptography'")):
|
|
|
|
|
r = client.patch(f"/api/projects/envproj/environments/{env_id}", json={
|
|
|
|
|
"auth_value": "new_secret",
|
|
|
|
|
})
|
|
|
|
|
assert r.status_code == 503, (
|
|
|
|
|
f"patch_environment must return 503 when cryptography is missing, got {r.status_code}: {r.text}"
|
|
|
|
|
)
|
|
|
|
|
assert "cryptography" in r.json()["detail"].lower()
|