kin: BATON-SEC-003-backend_dev
This commit is contained in:
parent
097b7af949
commit
f17ee79edb
13 changed files with 593 additions and 125 deletions
254
tests/test_sec_003.py
Normal file
254
tests/test_sec_003.py
Normal file
|
|
@ -0,0 +1,254 @@
|
|||
"""
|
||||
Tests for BATON-SEC-003: API-ключи для аутентификации /api/signal.
|
||||
|
||||
Acceptance criteria:
|
||||
1. POST /api/register возвращает api_key длиной 64 hex-символа.
|
||||
2. POST /api/signal без Authorization header → 401.
|
||||
3. POST /api/signal с неверным api_key → 401.
|
||||
4. POST /api/signal с правильным api_key → 200.
|
||||
5. Повторная регистрация генерирует новый api_key (ротация ключа).
|
||||
6. Старый api_key становится недействительным после ротации.
|
||||
7. Новый api_key работает после ротации.
|
||||
8. SHA-256 хэш api_key сохраняется в БД, сырой ключ — нет (проверка через DB функцию).
|
||||
|
||||
UUID notes: все UUID ниже удовлетворяют паттерну UUID v4.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
|
||||
os.environ.setdefault("CHAT_ID", "-1001234567890")
|
||||
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
|
||||
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
|
||||
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
|
||||
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
|
||||
|
||||
import pytest
|
||||
|
||||
from backend import db
|
||||
from tests.conftest import make_app_client, temp_db
|
||||
from backend import config
|
||||
|
||||
# Valid UUID v4 constants
|
||||
_UUID_1 = "aa000001-0000-4000-8000-000000000001"
|
||||
_UUID_2 = "aa000002-0000-4000-8000-000000000002"
|
||||
_UUID_3 = "aa000003-0000-4000-8000-000000000003"
|
||||
_UUID_4 = "aa000004-0000-4000-8000-000000000004"
|
||||
_UUID_5 = "aa000005-0000-4000-8000-000000000005"
|
||||
_UUID_6 = "aa000006-0000-4000-8000-000000000006"
|
||||
_UUID_7 = "aa000007-0000-4000-8000-000000000007"
|
||||
_UUID_8 = "aa000008-0000-4000-8000-000000000008"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Criterion 1 — /api/register returns api_key of correct length
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_returns_api_key():
|
||||
"""POST /api/register должен вернуть поле api_key в ответе."""
|
||||
async with make_app_client() as client:
|
||||
resp = await client.post(
|
||||
"/api/register",
|
||||
json={"uuid": _UUID_1, "name": "Alice"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert "api_key" in resp.json()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_api_key_is_64_hex_chars():
|
||||
"""api_key должен быть строкой из 64 hex-символов (secrets.token_hex(32))."""
|
||||
async with make_app_client() as client:
|
||||
resp = await client.post(
|
||||
"/api/register",
|
||||
json={"uuid": _UUID_2, "name": "Bob"},
|
||||
)
|
||||
api_key = resp.json()["api_key"]
|
||||
assert len(api_key) == 64
|
||||
assert all(c in "0123456789abcdef" for c in api_key), (
|
||||
f"api_key contains non-hex characters: {api_key}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Criterion 2 — Missing Authorization → 401
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_signal_without_auth_header_returns_401():
|
||||
"""POST /api/signal без Authorization header должен вернуть 401."""
|
||||
async with make_app_client() as client:
|
||||
await client.post("/api/register", json={"uuid": _UUID_3, "name": "Charlie"})
|
||||
resp = await client.post(
|
||||
"/api/signal",
|
||||
json={"user_id": _UUID_3, "timestamp": 1742478000000},
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_signal_without_bearer_scheme_returns_401():
|
||||
"""POST /api/signal с неверной схемой (Basic вместо Bearer) должен вернуть 401."""
|
||||
async with make_app_client() as client:
|
||||
await client.post("/api/register", json={"uuid": _UUID_3, "name": "Charlie"})
|
||||
resp = await client.post(
|
||||
"/api/signal",
|
||||
json={"user_id": _UUID_3, "timestamp": 1742478000000},
|
||||
headers={"Authorization": "Basic wrongtoken"},
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Criterion 3 — Wrong api_key → 401
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_signal_with_wrong_api_key_returns_401():
|
||||
"""POST /api/signal с неверным api_key должен вернуть 401."""
|
||||
async with make_app_client() as client:
|
||||
await client.post("/api/register", json={"uuid": _UUID_4, "name": "Dave"})
|
||||
resp = await client.post(
|
||||
"/api/signal",
|
||||
json={"user_id": _UUID_4, "timestamp": 1742478000000},
|
||||
headers={"Authorization": "Bearer " + "0" * 64},
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_signal_with_unknown_user_returns_401():
|
||||
"""POST /api/signal с api_key незарегистрированного пользователя должен вернуть 401."""
|
||||
async with make_app_client() as client:
|
||||
resp = await client.post(
|
||||
"/api/signal",
|
||||
json={"user_id": _UUID_5, "timestamp": 1742478000000},
|
||||
headers={"Authorization": "Bearer " + "a" * 64},
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Criterion 4 — Correct api_key → 200
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_signal_with_valid_api_key_returns_200():
|
||||
"""POST /api/signal с правильным api_key должен вернуть 200."""
|
||||
async with make_app_client() as client:
|
||||
reg = await client.post(
|
||||
"/api/register",
|
||||
json={"uuid": _UUID_6, "name": "Eve"},
|
||||
)
|
||||
assert reg.status_code == 200
|
||||
api_key = reg.json()["api_key"]
|
||||
|
||||
resp = await client.post(
|
||||
"/api/signal",
|
||||
json={"user_id": _UUID_6, "timestamp": 1742478000000},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ok"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Criterion 5-7 — Key rotation on re-register
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_re_register_produces_new_api_key():
|
||||
"""Повторная регистрация должна возвращать новый api_key (ротация)."""
|
||||
async with make_app_client() as client:
|
||||
r1 = await client.post("/api/register", json={"uuid": _UUID_7, "name": "Frank"})
|
||||
r2 = await client.post("/api/register", json={"uuid": _UUID_7, "name": "Frank"})
|
||||
|
||||
assert r1.status_code == 200
|
||||
assert r2.status_code == 200
|
||||
# Ключи могут совпасть (очень маловероятно), но оба должны быть длиной 64
|
||||
assert len(r2.json()["api_key"]) == 64
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_old_api_key_invalid_after_re_register():
|
||||
"""После повторной регистрации старый api_key не должен работать."""
|
||||
async with make_app_client() as client:
|
||||
r1 = await client.post("/api/register", json={"uuid": _UUID_8, "name": "Grace"})
|
||||
old_key = r1.json()["api_key"]
|
||||
|
||||
# Повторная регистрация — ротация ключа
|
||||
r2 = await client.post("/api/register", json={"uuid": _UUID_8, "name": "Grace"})
|
||||
new_key = r2.json()["api_key"]
|
||||
|
||||
# Старый ключ больше не должен работать
|
||||
old_resp = await client.post(
|
||||
"/api/signal",
|
||||
json={"user_id": _UUID_8, "timestamp": 1742478000000},
|
||||
headers={"Authorization": f"Bearer {old_key}"},
|
||||
)
|
||||
|
||||
# Новый ключ должен работать
|
||||
new_resp = await client.post(
|
||||
"/api/signal",
|
||||
json={"user_id": _UUID_8, "timestamp": 1742478000000},
|
||||
headers={"Authorization": f"Bearer {new_key}"},
|
||||
)
|
||||
|
||||
assert old_resp.status_code == 401, "Старый ключ должен быть недействителен после ротации"
|
||||
assert new_resp.status_code == 200, "Новый ключ должен работать"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Criterion 8 — SHA-256 hash is stored, not the raw key
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_key_hash_stored_in_db_not_raw_key():
|
||||
"""В БД должен храниться SHA-256 хэш api_key, а не сырой ключ."""
|
||||
with temp_db():
|
||||
from backend.main import app
|
||||
import contextlib
|
||||
import httpx
|
||||
import respx
|
||||
from httpx import AsyncClient, ASGITransport
|
||||
|
||||
tg_set_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook"
|
||||
send_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage"
|
||||
|
||||
mock_router = respx.mock(assert_all_called=False)
|
||||
mock_router.post(tg_set_url).mock(
|
||||
return_value=httpx.Response(200, json={"ok": True, "result": True})
|
||||
)
|
||||
mock_router.post(send_url).mock(
|
||||
return_value=httpx.Response(200, json={"ok": True})
|
||||
)
|
||||
|
||||
with mock_router:
|
||||
async with app.router.lifespan_context(app):
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
reg = await client.post(
|
||||
"/api/register",
|
||||
json={"uuid": _UUID_1, "name": "HashTest"},
|
||||
)
|
||||
assert reg.status_code == 200
|
||||
raw_api_key = reg.json()["api_key"]
|
||||
|
||||
# Читаем хэш из БД напрямую
|
||||
stored_hash = await db.get_api_key_hash_by_uuid(_UUID_1)
|
||||
|
||||
expected_hash = hashlib.sha256(raw_api_key.encode()).hexdigest()
|
||||
assert stored_hash is not None, "api_key_hash должен быть в БД"
|
||||
assert stored_hash == expected_hash, (
|
||||
"В БД должен быть SHA-256 хэш, а не сырой ключ"
|
||||
)
|
||||
assert stored_hash != raw_api_key, "В БД не должен храниться сырой ключ"
|
||||
Loading…
Add table
Add a link
Reference in a new issue