baton/tests/test_sec_003.py

298 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tests for BATON-SEC-003: API-ключи для аутентификации /api/signal.
Acceptance criteria:
1. POST /api/register возвращает api_key длиной 64 hex-символа.
2. POST /api/signal без Authorization header → 401.
3. POST /api/signal с неверным api_key → 401.
4. POST /api/signal с правильным api_key → 200.
5. Повторная регистрация генерирует новый api_key (ротация ключа).
6. Старый api_key становится недействительным после ротации.
7. Новый api_key работает после ротации.
8. SHA-256 хэш api_key сохраняется в БД, сырой ключ — нет (проверка через DB функцию).
UUID notes: все UUID ниже удовлетворяют паттерну UUID v4.
"""
from __future__ import annotations
import hashlib
import os
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
import pytest
from backend import db
from tests.conftest import make_app_client, temp_db
from backend import config
# Valid UUID v4 constants
_UUID_1 = "aa000001-0000-4000-8000-000000000001"
_UUID_2 = "aa000002-0000-4000-8000-000000000002"
_UUID_3 = "aa000003-0000-4000-8000-000000000003"
_UUID_4 = "aa000004-0000-4000-8000-000000000004"
_UUID_5 = "aa000005-0000-4000-8000-000000000005"
_UUID_6 = "aa000006-0000-4000-8000-000000000006"
_UUID_7 = "aa000007-0000-4000-8000-000000000007"
_UUID_8 = "aa000008-0000-4000-8000-000000000008"
_UUID_9 = "aa000009-0000-4000-8000-000000000009"
_UUID_10 = "aa00000a-0000-4000-8000-00000000000a"
# ---------------------------------------------------------------------------
# Criterion 1 — /api/register returns api_key of correct length
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_returns_api_key():
"""POST /api/register должен вернуть поле api_key в ответе."""
async with make_app_client() as client:
resp = await client.post(
"/api/register",
json={"uuid": _UUID_1, "name": "Alice"},
)
assert resp.status_code == 200
assert "api_key" in resp.json()
@pytest.mark.asyncio
async def test_register_api_key_is_64_hex_chars():
"""api_key должен быть строкой из 64 hex-символов (secrets.token_hex(32))."""
async with make_app_client() as client:
resp = await client.post(
"/api/register",
json={"uuid": _UUID_2, "name": "Bob"},
)
api_key = resp.json()["api_key"]
assert len(api_key) == 64
assert all(c in "0123456789abcdef" for c in api_key), (
f"api_key contains non-hex characters: {api_key}"
)
# ---------------------------------------------------------------------------
# Criterion 2 — Missing Authorization → 401
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_signal_without_auth_header_returns_401():
"""POST /api/signal без Authorization header должен вернуть 401."""
async with make_app_client() as client:
await client.post("/api/register", json={"uuid": _UUID_3, "name": "Charlie"})
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_3, "timestamp": 1742478000000},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_signal_without_bearer_scheme_returns_401():
"""POST /api/signal с неверной схемой (Basic вместо Bearer) должен вернуть 401."""
async with make_app_client() as client:
await client.post("/api/register", json={"uuid": _UUID_3, "name": "Charlie"})
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_3, "timestamp": 1742478000000},
headers={"Authorization": "Basic wrongtoken"},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# Criterion 3 — Wrong api_key → 401
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_signal_with_wrong_api_key_returns_401():
"""POST /api/signal с неверным api_key должен вернуть 401."""
async with make_app_client() as client:
await client.post("/api/register", json={"uuid": _UUID_4, "name": "Dave"})
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_4, "timestamp": 1742478000000},
headers={"Authorization": "Bearer " + "0" * 64},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_signal_with_unknown_user_returns_401():
"""POST /api/signal с api_key незарегистрированного пользователя должен вернуть 401."""
async with make_app_client() as client:
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_5, "timestamp": 1742478000000},
headers={"Authorization": "Bearer " + "a" * 64},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# Criterion 4 — Correct api_key → 200
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_signal_with_valid_api_key_returns_200():
"""POST /api/signal с правильным api_key должен вернуть 200."""
async with make_app_client() as client:
reg = await client.post(
"/api/register",
json={"uuid": _UUID_6, "name": "Eve"},
)
assert reg.status_code == 200
api_key = reg.json()["api_key"]
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_6, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {api_key}"},
)
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
# ---------------------------------------------------------------------------
# Criterion 5-7 — Key rotation on re-register
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_re_register_produces_new_api_key():
"""Повторная регистрация должна возвращать новый api_key (ротация)."""
async with make_app_client() as client:
r1 = await client.post("/api/register", json={"uuid": _UUID_7, "name": "Frank"})
r2 = await client.post("/api/register", json={"uuid": _UUID_7, "name": "Frank"})
assert r1.status_code == 200
assert r2.status_code == 200
# Ключи могут совпасть (очень маловероятно), но оба должны быть длиной 64
assert len(r2.json()["api_key"]) == 64
@pytest.mark.asyncio
async def test_old_api_key_invalid_after_re_register():
"""После повторной регистрации старый api_key не должен работать."""
async with make_app_client() as client:
r1 = await client.post("/api/register", json={"uuid": _UUID_8, "name": "Grace"})
old_key = r1.json()["api_key"]
# Повторная регистрация — ротация ключа
r2 = await client.post("/api/register", json={"uuid": _UUID_8, "name": "Grace"})
new_key = r2.json()["api_key"]
# Старый ключ больше не должен работать
old_resp = await client.post(
"/api/signal",
json={"user_id": _UUID_8, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {old_key}"},
)
# Новый ключ должен работать
new_resp = await client.post(
"/api/signal",
json={"user_id": _UUID_8, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {new_key}"},
)
assert old_resp.status_code == 401, "Старый ключ должен быть недействителен после ротации"
assert new_resp.status_code == 200, "Новый ключ должен работать"
# ---------------------------------------------------------------------------
# Criterion 5 (task brief) — Token from another user → 401
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_signal_with_other_user_token_returns_401():
"""POST /api/signal с токеном другого пользователя должен вернуть 401.
Невозможно отправить сигнал от чужого имени даже зная UUID.
"""
async with make_app_client() as client:
# Регистрируем двух пользователей
r_a = await client.post("/api/register", json={"uuid": _UUID_9, "name": "UserA"})
r_b = await client.post("/api/register", json={"uuid": _UUID_10, "name": "UserB"})
assert r_a.status_code == 200
assert r_b.status_code == 200
api_key_a = r_a.json()["api_key"]
api_key_b = r_b.json()["api_key"]
# UserA пытается отправить сигнал с токеном UserB
resp_a_with_b_key = await client.post(
"/api/signal",
json={"user_id": _UUID_9, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {api_key_b}"},
)
# UserB пытается отправить сигнал с токеном UserA
resp_b_with_a_key = await client.post(
"/api/signal",
json={"user_id": _UUID_10, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {api_key_a}"},
)
assert resp_a_with_b_key.status_code == 401, (
"Нельзя отправить сигнал от имени UserA с токеном UserB"
)
assert resp_b_with_a_key.status_code == 401, (
"Нельзя отправить сигнал от имени UserB с токеном UserA"
)
# ---------------------------------------------------------------------------
# Criterion 8 — SHA-256 hash is stored, not the raw key
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_api_key_hash_stored_in_db_not_raw_key():
"""В БД должен храниться SHA-256 хэш api_key, а не сырой ключ."""
with temp_db():
from backend.main import app
import contextlib
import httpx
import respx
from httpx import AsyncClient, ASGITransport
tg_set_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook"
send_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage"
mock_router = respx.mock(assert_all_called=False)
mock_router.post(tg_set_url).mock(
return_value=httpx.Response(200, json={"ok": True, "result": True})
)
mock_router.post(send_url).mock(
return_value=httpx.Response(200, json={"ok": True})
)
with mock_router:
async with app.router.lifespan_context(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://testserver") as client:
reg = await client.post(
"/api/register",
json={"uuid": _UUID_1, "name": "HashTest"},
)
assert reg.status_code == 200
raw_api_key = reg.json()["api_key"]
# Читаем хэш из БД напрямую
stored_hash = await db.get_api_key_hash_by_uuid(_UUID_1)
expected_hash = hashlib.sha256(raw_api_key.encode()).hexdigest()
assert stored_hash is not None, "api_key_hash должен быть в БД"
assert stored_hash == expected_hash, (
"В БД должен быть SHA-256 хэш, а не сырой ключ"
)
assert stored_hash != raw_api_key, "В БД не должен храниться сырой ключ"