kin: KIN-BIZ-007 Post-MVP: шифрование credentials в project_environments через Fernet
This commit is contained in:
parent
c0d67e4c22
commit
8ebc6f1111
3 changed files with 342 additions and 6 deletions
327
tests/test_kin_biz_007_fernet.py
Normal file
327
tests/test_kin_biz_007_fernet.py
Normal file
|
|
@ -0,0 +1,327 @@
|
|||
"""Tests for KIN-BIZ-007: Fernet encryption of credentials in project_environments.
|
||||
|
||||
Acceptance criteria:
|
||||
1. Roundtrip: _encrypt_auth → _decrypt_auth returns the original string.
|
||||
2. Migration: b64:-prefixed record is auto-re-encrypted on read; decrypt returns plaintext.
|
||||
3. Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500).
|
||||
4. Runner path: get_environment() returns decrypted plaintext auth_value.
|
||||
5. Old _obfuscate_auth / _deobfuscate_auth are not present anywhere.
|
||||
|
||||
Decision #214: patch на consuming-модуле, не на defining.
|
||||
Decision #215: использовать mock.assert_called_once().
|
||||
"""
|
||||
|
||||
import base64
|
||||
import os
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from core.db import init_db
|
||||
from core import models
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def conn():
|
||||
"""Fresh in-memory DB for each test."""
|
||||
c = init_db(db_path=":memory:")
|
||||
yield c
|
||||
c.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def conn_with_project(conn):
|
||||
"""In-memory DB with a test project."""
|
||||
models.create_project(conn, "testproj", "Test Project", "/test")
|
||||
return conn
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scan_client(tmp_path):
|
||||
"""TestClient with project + environment pre-created. Returns (client, env_id)."""
|
||||
import web.api as api_module
|
||||
api_module.DB_PATH = tmp_path / "scan_biz007.db"
|
||||
from web.api import app
|
||||
from fastapi.testclient import TestClient
|
||||
c = TestClient(app)
|
||||
c.post("/api/projects", json={"id": "scanproj", "name": "Scan Project", "path": "/scan"})
|
||||
r = c.post("/api/projects/scanproj/environments", json={
|
||||
"name": "prod", "host": "10.0.0.1", "username": "root",
|
||||
})
|
||||
env_id = r.json()["id"]
|
||||
return c, env_id
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC1: Roundtrip — _encrypt_auth → _decrypt_auth returns original string
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_encrypt_decrypt_roundtrip_returns_original(conn):
|
||||
"""AC1: encrypt → decrypt returns the exact original plaintext."""
|
||||
original = "my_super_secret_password"
|
||||
encrypted = models._encrypt_auth(original)
|
||||
decrypted = models._decrypt_auth(encrypted)
|
||||
assert decrypted == original
|
||||
|
||||
|
||||
def test_encrypt_produces_different_value_than_plaintext(conn):
|
||||
"""AC1: encrypted value is not the original (Fernet token, not plaintext)."""
|
||||
original = "plain_secret"
|
||||
encrypted = models._encrypt_auth(original)
|
||||
assert encrypted != original
|
||||
assert not encrypted.startswith("b64:")
|
||||
|
||||
|
||||
def test_encrypt_two_calls_produce_different_tokens(conn):
|
||||
"""AC1: Fernet uses random IV — two encryptions of same value differ but both decrypt correctly."""
|
||||
value = "same_password"
|
||||
enc1 = models._encrypt_auth(value)
|
||||
enc2 = models._encrypt_auth(value)
|
||||
# Encrypted forms must differ due to Fernet IV randomness
|
||||
assert enc1 != enc2
|
||||
# Both must decrypt to original
|
||||
assert models._decrypt_auth(enc1) == value
|
||||
assert models._decrypt_auth(enc2) == value
|
||||
|
||||
|
||||
def test_encrypt_raises_runtime_error_when_no_key(monkeypatch):
|
||||
"""AC1: _encrypt_auth raises RuntimeError when KIN_SECRET_KEY is absent."""
|
||||
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
|
||||
with pytest.raises(RuntimeError, match="KIN_SECRET_KEY"):
|
||||
models._encrypt_auth("any_value")
|
||||
|
||||
|
||||
def test_decrypt_fernet_token_without_key_returns_raw_not_plaintext(monkeypatch):
|
||||
"""AC1: _decrypt_auth without key cannot recover plaintext — returns stored token, not original."""
|
||||
original = "secret"
|
||||
encrypted = models._encrypt_auth(original)
|
||||
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
|
||||
result = models._decrypt_auth(encrypted)
|
||||
# Without the key we cannot get the plaintext back
|
||||
assert result != original
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC2: Migration — b64: record auto-re-encrypted on read
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_decrypt_auth_handles_b64_prefix_without_db(conn):
|
||||
"""AC2: _decrypt_auth decodes b64:-prefixed value (no DB needed for the decode itself)."""
|
||||
plaintext = "legacy_password"
|
||||
b64_stored = "b64:" + base64.b64encode(plaintext.encode()).decode()
|
||||
decrypted = models._decrypt_auth(b64_stored)
|
||||
assert decrypted == plaintext
|
||||
|
||||
|
||||
def test_decrypt_auth_b64_rewrites_db_when_conn_provided(conn_with_project):
|
||||
"""AC2: _decrypt_auth with conn+env_id re-encrypts b64: value in DB on read."""
|
||||
conn = conn_with_project
|
||||
plaintext = "legacy_pass_123"
|
||||
b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode()
|
||||
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO project_environments
|
||||
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
|
||||
VALUES ('testproj', 'legacy', 'host.example.com', 22, 'root', 'password', ?, 0)""",
|
||||
(b64_value,),
|
||||
)
|
||||
conn.commit()
|
||||
env_id = cur.lastrowid
|
||||
|
||||
# Call decrypt with conn+env_id — must trigger re-encryption
|
||||
decrypted = models._decrypt_auth(b64_value, conn=conn, env_id=env_id)
|
||||
assert decrypted == plaintext
|
||||
|
||||
# DB must now have Fernet token, not b64:
|
||||
stored_after = conn.execute(
|
||||
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
|
||||
).fetchone()["auth_value"]
|
||||
assert not stored_after.startswith("b64:"), (
|
||||
"After migration, b64: prefix must be replaced by a Fernet token"
|
||||
)
|
||||
# And the new token must decrypt correctly
|
||||
assert models._decrypt_auth(stored_after) == plaintext
|
||||
|
||||
|
||||
def test_get_environment_migrates_b64_and_returns_plaintext(conn_with_project):
|
||||
"""AC2: get_environment() transparently migrates b64: values and returns plaintext auth_value."""
|
||||
conn = conn_with_project
|
||||
plaintext = "old_secret"
|
||||
b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode()
|
||||
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO project_environments
|
||||
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
|
||||
VALUES ('testproj', 'legacy2', 'host2.example.com', 22, 'root', 'password', ?, 0)""",
|
||||
(b64_value,),
|
||||
)
|
||||
conn.commit()
|
||||
env_id = cur.lastrowid
|
||||
|
||||
env = models.get_environment(conn, env_id)
|
||||
|
||||
assert env["auth_value"] == plaintext, (
|
||||
f"get_environment must return plaintext after b64 migration, got: {env['auth_value']!r}"
|
||||
)
|
||||
|
||||
# DB must be updated: b64: replaced by Fernet token
|
||||
stored_after = conn.execute(
|
||||
"SELECT auth_value FROM project_environments WHERE id = ?", (env_id,)
|
||||
).fetchone()["auth_value"]
|
||||
assert not stored_after.startswith("b64:"), (
|
||||
"DB must contain Fernet token after get_environment migrates b64: record"
|
||||
)
|
||||
|
||||
|
||||
def test_get_environment_second_read_after_migration_still_decrypts(conn_with_project):
|
||||
"""AC2: After b64 migration, subsequent get_environment calls still return plaintext."""
|
||||
conn = conn_with_project
|
||||
plaintext = "migrated_secret"
|
||||
b64_value = "b64:" + base64.b64encode(plaintext.encode()).decode()
|
||||
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO project_environments
|
||||
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
|
||||
VALUES ('testproj', 'legacy3', 'host3.example.com', 22, 'root', 'password', ?, 0)""",
|
||||
(b64_value,),
|
||||
)
|
||||
conn.commit()
|
||||
env_id = cur.lastrowid
|
||||
|
||||
# First read: triggers migration
|
||||
env1 = models.get_environment(conn, env_id)
|
||||
assert env1["auth_value"] == plaintext
|
||||
|
||||
# Second read: now reads Fernet token (post-migration)
|
||||
env2 = models.get_environment(conn, env_id)
|
||||
assert env2["auth_value"] == plaintext
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC3: Missing KIN_SECRET_KEY → scan endpoint returns 503 (not 500)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_scan_endpoint_returns_503_when_kin_secret_key_missing(scan_client, monkeypatch):
|
||||
"""AC3: POST /environments/{id}/scan returns 503 when KIN_SECRET_KEY is not set."""
|
||||
client, env_id = scan_client
|
||||
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
|
||||
r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan")
|
||||
assert r.status_code == 503, (
|
||||
f"scan must return 503 when KIN_SECRET_KEY is missing, got {r.status_code}: {r.text}"
|
||||
)
|
||||
|
||||
|
||||
def test_scan_endpoint_returns_503_not_500(scan_client, monkeypatch):
|
||||
"""AC3: HTTP 503 (misconfiguration) must be returned, not 500 (code bug)."""
|
||||
client, env_id = scan_client
|
||||
monkeypatch.delenv("KIN_SECRET_KEY", raising=False)
|
||||
r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan")
|
||||
assert r.status_code != 500, "Missing KIN_SECRET_KEY must produce 503, not 500"
|
||||
assert r.status_code == 503
|
||||
|
||||
|
||||
def test_scan_endpoint_returns_202_when_key_present(scan_client):
|
||||
"""AC3: scan endpoint returns 202 when KIN_SECRET_KEY is correctly set."""
|
||||
client, env_id = scan_client
|
||||
with patch("subprocess.Popen") as mock_popen:
|
||||
mock_popen.return_value = MagicMock(pid=12345)
|
||||
r = client.post(f"/api/projects/scanproj/environments/{env_id}/scan")
|
||||
assert r.status_code == 202
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC4: Runner path — get_environment() returns decrypted plaintext auth_value
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_get_environment_returns_decrypted_auth_value(conn_with_project):
|
||||
"""AC4: get_environment() returns plaintext, not the Fernet token stored in DB."""
|
||||
conn = conn_with_project
|
||||
plaintext = "runner_secret_42"
|
||||
env = models.create_environment(
|
||||
conn, "testproj", "runner-env", "10.0.0.10", "root",
|
||||
auth_value=plaintext,
|
||||
)
|
||||
env_id = env["id"]
|
||||
|
||||
fetched = models.get_environment(conn, env_id)
|
||||
assert fetched["auth_value"] == plaintext, (
|
||||
f"get_environment must return plaintext auth_value, got: {fetched['auth_value']!r}"
|
||||
)
|
||||
|
||||
|
||||
def test_get_environment_auth_value_is_not_fernet_token(conn_with_project):
|
||||
"""AC4: auth_value from get_environment is decrypted (not a Fernet base64 token)."""
|
||||
conn = conn_with_project
|
||||
plaintext = "real_password_xyz"
|
||||
env = models.create_environment(
|
||||
conn, "testproj", "fernet-check", "10.0.0.11", "user",
|
||||
auth_value=plaintext,
|
||||
)
|
||||
|
||||
# Verify DB stores encrypted (not plaintext)
|
||||
raw_stored = conn.execute(
|
||||
"SELECT auth_value FROM project_environments WHERE id = ?", (env["id"],)
|
||||
).fetchone()["auth_value"]
|
||||
assert raw_stored != plaintext, "DB must store encrypted value, not plaintext"
|
||||
|
||||
# get_environment must return decrypted plaintext
|
||||
fetched = models.get_environment(conn, env["id"])
|
||||
assert fetched["auth_value"] == plaintext
|
||||
|
||||
|
||||
def test_get_environment_returns_none_auth_value_when_not_set(conn_with_project):
|
||||
"""AC4: get_environment() returns auth_value=None when no credential was stored."""
|
||||
conn = conn_with_project
|
||||
env = models.create_environment(
|
||||
conn, "testproj", "no-cred", "10.0.0.12", "user",
|
||||
auth_value=None,
|
||||
)
|
||||
fetched = models.get_environment(conn, env["id"])
|
||||
assert fetched["auth_value"] is None
|
||||
|
||||
|
||||
def test_create_environment_hides_auth_value_in_return(conn_with_project):
|
||||
"""AC4: create_environment() returns auth_value=None — plaintext only via get_environment."""
|
||||
conn = conn_with_project
|
||||
env = models.create_environment(
|
||||
conn, "testproj", "hidden-cred", "10.0.0.13", "user",
|
||||
auth_value="secret",
|
||||
)
|
||||
assert env["auth_value"] is None, (
|
||||
"create_environment must return auth_value=None for API safety"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AC5: Old _obfuscate_auth / _deobfuscate_auth are not present anywhere
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_obfuscate_auth_not_in_core_models():
|
||||
"""AC5: _obfuscate_auth must not exist in core.models (fully removed)."""
|
||||
import core.models as m
|
||||
assert not hasattr(m, "_obfuscate_auth"), (
|
||||
"_obfuscate_auth must be removed from core.models — use _encrypt_auth instead"
|
||||
)
|
||||
|
||||
|
||||
def test_deobfuscate_auth_not_in_core_models():
|
||||
"""AC5: _deobfuscate_auth must not exist in core.models (fully removed)."""
|
||||
import core.models as m
|
||||
assert not hasattr(m, "_deobfuscate_auth"), (
|
||||
"_deobfuscate_auth must be removed from core.models — use _decrypt_auth instead"
|
||||
)
|
||||
|
||||
|
||||
def test_obfuscate_auth_not_imported_in_web_api():
|
||||
"""AC5: _obfuscate_auth must not be imported or defined in web.api."""
|
||||
import web.api as api_mod
|
||||
assert not hasattr(api_mod, "_obfuscate_auth"), (
|
||||
"_obfuscate_auth must not appear in web.api"
|
||||
)
|
||||
|
||||
|
||||
def test_deobfuscate_auth_not_imported_in_web_api():
|
||||
"""AC5: _deobfuscate_auth must not be imported or defined in web.api."""
|
||||
import web.api as api_mod
|
||||
assert not hasattr(api_mod, "_deobfuscate_auth"), (
|
||||
"_deobfuscate_auth must not appear in web.api"
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue