baton/tests/test_sec_003.py

255 lines
10 KiB
Python
Raw Normal View History

2026-03-21 08:12:01 +02:00
"""
Tests for BATON-SEC-003: API-ключи для аутентификации /api/signal.
Acceptance criteria:
1. POST /api/register возвращает api_key длиной 64 hex-символа.
2. POST /api/signal без Authorization header 401.
3. POST /api/signal с неверным api_key 401.
4. POST /api/signal с правильным api_key 200.
5. Повторная регистрация генерирует новый api_key (ротация ключа).
6. Старый api_key становится недействительным после ротации.
7. Новый api_key работает после ротации.
8. SHA-256 хэш api_key сохраняется в БД, сырой ключ нет (проверка через DB функцию).
UUID notes: все UUID ниже удовлетворяют паттерну UUID v4.
"""
from __future__ import annotations
import hashlib
import os
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
import pytest
from backend import db
from tests.conftest import make_app_client, temp_db
from backend import config
# Valid UUID v4 constants
_UUID_1 = "aa000001-0000-4000-8000-000000000001"
_UUID_2 = "aa000002-0000-4000-8000-000000000002"
_UUID_3 = "aa000003-0000-4000-8000-000000000003"
_UUID_4 = "aa000004-0000-4000-8000-000000000004"
_UUID_5 = "aa000005-0000-4000-8000-000000000005"
_UUID_6 = "aa000006-0000-4000-8000-000000000006"
_UUID_7 = "aa000007-0000-4000-8000-000000000007"
_UUID_8 = "aa000008-0000-4000-8000-000000000008"
# ---------------------------------------------------------------------------
# Criterion 1 — /api/register returns api_key of correct length
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_returns_api_key():
"""POST /api/register должен вернуть поле api_key в ответе."""
async with make_app_client() as client:
resp = await client.post(
"/api/register",
json={"uuid": _UUID_1, "name": "Alice"},
)
assert resp.status_code == 200
assert "api_key" in resp.json()
@pytest.mark.asyncio
async def test_register_api_key_is_64_hex_chars():
"""api_key должен быть строкой из 64 hex-символов (secrets.token_hex(32))."""
async with make_app_client() as client:
resp = await client.post(
"/api/register",
json={"uuid": _UUID_2, "name": "Bob"},
)
api_key = resp.json()["api_key"]
assert len(api_key) == 64
assert all(c in "0123456789abcdef" for c in api_key), (
f"api_key contains non-hex characters: {api_key}"
)
# ---------------------------------------------------------------------------
# Criterion 2 — Missing Authorization → 401
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_signal_without_auth_header_returns_401():
"""POST /api/signal без Authorization header должен вернуть 401."""
async with make_app_client() as client:
await client.post("/api/register", json={"uuid": _UUID_3, "name": "Charlie"})
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_3, "timestamp": 1742478000000},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_signal_without_bearer_scheme_returns_401():
"""POST /api/signal с неверной схемой (Basic вместо Bearer) должен вернуть 401."""
async with make_app_client() as client:
await client.post("/api/register", json={"uuid": _UUID_3, "name": "Charlie"})
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_3, "timestamp": 1742478000000},
headers={"Authorization": "Basic wrongtoken"},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# Criterion 3 — Wrong api_key → 401
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_signal_with_wrong_api_key_returns_401():
"""POST /api/signal с неверным api_key должен вернуть 401."""
async with make_app_client() as client:
await client.post("/api/register", json={"uuid": _UUID_4, "name": "Dave"})
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_4, "timestamp": 1742478000000},
headers={"Authorization": "Bearer " + "0" * 64},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_signal_with_unknown_user_returns_401():
"""POST /api/signal с api_key незарегистрированного пользователя должен вернуть 401."""
async with make_app_client() as client:
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_5, "timestamp": 1742478000000},
headers={"Authorization": "Bearer " + "a" * 64},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# Criterion 4 — Correct api_key → 200
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_signal_with_valid_api_key_returns_200():
"""POST /api/signal с правильным api_key должен вернуть 200."""
async with make_app_client() as client:
reg = await client.post(
"/api/register",
json={"uuid": _UUID_6, "name": "Eve"},
)
assert reg.status_code == 200
api_key = reg.json()["api_key"]
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_6, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {api_key}"},
)
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
# ---------------------------------------------------------------------------
# Criterion 5-7 — Key rotation on re-register
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_re_register_produces_new_api_key():
"""Повторная регистрация должна возвращать новый api_key (ротация)."""
async with make_app_client() as client:
r1 = await client.post("/api/register", json={"uuid": _UUID_7, "name": "Frank"})
r2 = await client.post("/api/register", json={"uuid": _UUID_7, "name": "Frank"})
assert r1.status_code == 200
assert r2.status_code == 200
# Ключи могут совпасть (очень маловероятно), но оба должны быть длиной 64
assert len(r2.json()["api_key"]) == 64
@pytest.mark.asyncio
async def test_old_api_key_invalid_after_re_register():
"""После повторной регистрации старый api_key не должен работать."""
async with make_app_client() as client:
r1 = await client.post("/api/register", json={"uuid": _UUID_8, "name": "Grace"})
old_key = r1.json()["api_key"]
# Повторная регистрация — ротация ключа
r2 = await client.post("/api/register", json={"uuid": _UUID_8, "name": "Grace"})
new_key = r2.json()["api_key"]
# Старый ключ больше не должен работать
old_resp = await client.post(
"/api/signal",
json={"user_id": _UUID_8, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {old_key}"},
)
# Новый ключ должен работать
new_resp = await client.post(
"/api/signal",
json={"user_id": _UUID_8, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {new_key}"},
)
assert old_resp.status_code == 401, "Старый ключ должен быть недействителен после ротации"
assert new_resp.status_code == 200, "Новый ключ должен работать"
# ---------------------------------------------------------------------------
# Criterion 8 — SHA-256 hash is stored, not the raw key
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_api_key_hash_stored_in_db_not_raw_key():
"""В БД должен храниться SHA-256 хэш api_key, а не сырой ключ."""
with temp_db():
from backend.main import app
import contextlib
import httpx
import respx
from httpx import AsyncClient, ASGITransport
tg_set_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook"
send_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage"
mock_router = respx.mock(assert_all_called=False)
mock_router.post(tg_set_url).mock(
return_value=httpx.Response(200, json={"ok": True, "result": True})
)
mock_router.post(send_url).mock(
return_value=httpx.Response(200, json={"ok": True})
)
with mock_router:
async with app.router.lifespan_context(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://testserver") as client:
reg = await client.post(
"/api/register",
json={"uuid": _UUID_1, "name": "HashTest"},
)
assert reg.status_code == 200
raw_api_key = reg.json()["api_key"]
# Читаем хэш из БД напрямую
stored_hash = await db.get_api_key_hash_by_uuid(_UUID_1)
expected_hash = hashlib.sha256(raw_api_key.encode()).hexdigest()
assert stored_hash is not None, "api_key_hash должен быть в БД"
assert stored_hash == expected_hash, (
"В БД должен быть SHA-256 хэш, а не сырой ключ"
)
assert stored_hash != raw_api_key, "В БД не должен храниться сырой ключ"