""" Tests for BATON-SEC-003: API-ключи для аутентификации /api/signal. Acceptance criteria: 1. POST /api/register возвращает api_key длиной 64 hex-символа. 2. POST /api/signal без Authorization header → 401. 3. POST /api/signal с неверным api_key → 401. 4. POST /api/signal с правильным api_key → 200. 5. Повторная регистрация генерирует новый api_key (ротация ключа). 6. Старый api_key становится недействительным после ротации. 7. Новый api_key работает после ротации. 8. SHA-256 хэш api_key сохраняется в БД, сырой ключ — нет (проверка через DB функцию). UUID notes: все UUID ниже удовлетворяют паттерну UUID v4. """ from __future__ import annotations import hashlib import os os.environ.setdefault("BOT_TOKEN", "test-bot-token") os.environ.setdefault("CHAT_ID", "-1001234567890") os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") os.environ.setdefault("ADMIN_TOKEN", "test-admin-token") import pytest from backend import db from tests.conftest import make_app_client, temp_db from backend import config # Valid UUID v4 constants _UUID_1 = "aa000001-0000-4000-8000-000000000001" _UUID_2 = "aa000002-0000-4000-8000-000000000002" _UUID_3 = "aa000003-0000-4000-8000-000000000003" _UUID_4 = "aa000004-0000-4000-8000-000000000004" _UUID_5 = "aa000005-0000-4000-8000-000000000005" _UUID_6 = "aa000006-0000-4000-8000-000000000006" _UUID_7 = "aa000007-0000-4000-8000-000000000007" _UUID_8 = "aa000008-0000-4000-8000-000000000008" _UUID_9 = "aa000009-0000-4000-8000-000000000009" _UUID_10 = "aa00000a-0000-4000-8000-00000000000a" # --------------------------------------------------------------------------- # Criterion 1 — /api/register returns api_key of correct length # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_register_returns_api_key(): """POST /api/register должен вернуть поле api_key в ответе.""" async with make_app_client() as client: resp = await client.post( "/api/register", json={"uuid": _UUID_1, "name": "Alice"}, ) assert resp.status_code == 200 assert "api_key" in resp.json() @pytest.mark.asyncio async def test_register_api_key_is_64_hex_chars(): """api_key должен быть строкой из 64 hex-символов (secrets.token_hex(32)).""" async with make_app_client() as client: resp = await client.post( "/api/register", json={"uuid": _UUID_2, "name": "Bob"}, ) api_key = resp.json()["api_key"] assert len(api_key) == 64 assert all(c in "0123456789abcdef" for c in api_key), ( f"api_key contains non-hex characters: {api_key}" ) # --------------------------------------------------------------------------- # Criterion 2 — Missing Authorization → 401 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_signal_without_auth_header_returns_401(): """POST /api/signal без Authorization header должен вернуть 401.""" async with make_app_client() as client: await client.post("/api/register", json={"uuid": _UUID_3, "name": "Charlie"}) resp = await client.post( "/api/signal", json={"user_id": _UUID_3, "timestamp": 1742478000000}, ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_signal_without_bearer_scheme_returns_401(): """POST /api/signal с неверной схемой (Basic вместо Bearer) должен вернуть 401.""" async with make_app_client() as client: await client.post("/api/register", json={"uuid": _UUID_3, "name": "Charlie"}) resp = await client.post( "/api/signal", json={"user_id": _UUID_3, "timestamp": 1742478000000}, headers={"Authorization": "Basic wrongtoken"}, ) assert resp.status_code == 401 # --------------------------------------------------------------------------- # Criterion 3 — Wrong api_key → 401 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_signal_with_wrong_api_key_returns_401(): """POST /api/signal с неверным api_key должен вернуть 401.""" async with make_app_client() as client: await client.post("/api/register", json={"uuid": _UUID_4, "name": "Dave"}) resp = await client.post( "/api/signal", json={"user_id": _UUID_4, "timestamp": 1742478000000}, headers={"Authorization": "Bearer " + "0" * 64}, ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_signal_with_unknown_user_returns_401(): """POST /api/signal с api_key незарегистрированного пользователя должен вернуть 401.""" async with make_app_client() as client: resp = await client.post( "/api/signal", json={"user_id": _UUID_5, "timestamp": 1742478000000}, headers={"Authorization": "Bearer " + "a" * 64}, ) assert resp.status_code == 401 # --------------------------------------------------------------------------- # Criterion 4 — Correct api_key → 200 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_signal_with_valid_api_key_returns_200(): """POST /api/signal с правильным api_key должен вернуть 200.""" async with make_app_client() as client: reg = await client.post( "/api/register", json={"uuid": _UUID_6, "name": "Eve"}, ) assert reg.status_code == 200 api_key = reg.json()["api_key"] resp = await client.post( "/api/signal", json={"user_id": _UUID_6, "timestamp": 1742478000000}, headers={"Authorization": f"Bearer {api_key}"}, ) assert resp.status_code == 200 assert resp.json()["status"] == "ok" # --------------------------------------------------------------------------- # Criterion 5-7 — Key rotation on re-register # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_re_register_produces_new_api_key(): """Повторная регистрация должна возвращать новый api_key (ротация).""" async with make_app_client() as client: r1 = await client.post("/api/register", json={"uuid": _UUID_7, "name": "Frank"}) r2 = await client.post("/api/register", json={"uuid": _UUID_7, "name": "Frank"}) assert r1.status_code == 200 assert r2.status_code == 200 # Ключи могут совпасть (очень маловероятно), но оба должны быть длиной 64 assert len(r2.json()["api_key"]) == 64 @pytest.mark.asyncio async def test_old_api_key_invalid_after_re_register(): """После повторной регистрации старый api_key не должен работать.""" async with make_app_client() as client: r1 = await client.post("/api/register", json={"uuid": _UUID_8, "name": "Grace"}) old_key = r1.json()["api_key"] # Повторная регистрация — ротация ключа r2 = await client.post("/api/register", json={"uuid": _UUID_8, "name": "Grace"}) new_key = r2.json()["api_key"] # Старый ключ больше не должен работать old_resp = await client.post( "/api/signal", json={"user_id": _UUID_8, "timestamp": 1742478000000}, headers={"Authorization": f"Bearer {old_key}"}, ) # Новый ключ должен работать new_resp = await client.post( "/api/signal", json={"user_id": _UUID_8, "timestamp": 1742478000000}, headers={"Authorization": f"Bearer {new_key}"}, ) assert old_resp.status_code == 401, "Старый ключ должен быть недействителен после ротации" assert new_resp.status_code == 200, "Новый ключ должен работать" # --------------------------------------------------------------------------- # Criterion 5 (task brief) — Token from another user → 401 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_signal_with_other_user_token_returns_401(): """POST /api/signal с токеном другого пользователя должен вернуть 401. Невозможно отправить сигнал от чужого имени даже зная UUID. """ async with make_app_client() as client: # Регистрируем двух пользователей r_a = await client.post("/api/register", json={"uuid": _UUID_9, "name": "UserA"}) r_b = await client.post("/api/register", json={"uuid": _UUID_10, "name": "UserB"}) assert r_a.status_code == 200 assert r_b.status_code == 200 api_key_a = r_a.json()["api_key"] api_key_b = r_b.json()["api_key"] # UserA пытается отправить сигнал с токеном UserB resp_a_with_b_key = await client.post( "/api/signal", json={"user_id": _UUID_9, "timestamp": 1742478000000}, headers={"Authorization": f"Bearer {api_key_b}"}, ) # UserB пытается отправить сигнал с токеном UserA resp_b_with_a_key = await client.post( "/api/signal", json={"user_id": _UUID_10, "timestamp": 1742478000000}, headers={"Authorization": f"Bearer {api_key_a}"}, ) assert resp_a_with_b_key.status_code == 401, ( "Нельзя отправить сигнал от имени UserA с токеном UserB" ) assert resp_b_with_a_key.status_code == 401, ( "Нельзя отправить сигнал от имени UserB с токеном UserA" ) # --------------------------------------------------------------------------- # Criterion 8 — SHA-256 hash is stored, not the raw key # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_api_key_hash_stored_in_db_not_raw_key(): """В БД должен храниться SHA-256 хэш api_key, а не сырой ключ.""" with temp_db(): from backend.main import app import contextlib import httpx import respx from httpx import AsyncClient, ASGITransport tg_set_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook" send_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage" mock_router = respx.mock(assert_all_called=False) mock_router.post(tg_set_url).mock( return_value=httpx.Response(200, json={"ok": True, "result": True}) ) mock_router.post(send_url).mock( return_value=httpx.Response(200, json={"ok": True}) ) with mock_router: async with app.router.lifespan_context(app): transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as client: reg = await client.post( "/api/register", json={"uuid": _UUID_1, "name": "HashTest"}, ) assert reg.status_code == 200 raw_api_key = reg.json()["api_key"] # Читаем хэш из БД напрямую stored_hash = await db.get_api_key_hash_by_uuid(_UUID_1) expected_hash = hashlib.sha256(raw_api_key.encode()).hexdigest() assert stored_hash is not None, "api_key_hash должен быть в БД" assert stored_hash == expected_hash, ( "В БД должен быть SHA-256 хэш, а не сырой ключ" ) assert stored_hash != raw_api_key, "В БД не должен храниться сырой ключ"