""" Tests for BATON-ARCH-003: Rate limiting on /api/register + timing-safe token comparison. Acceptance criteria: 1. /api/register blocks brute-force at the application level: 5 requests pass (200), 6th returns 429; counter resets after the 10-minute window. 2. Token comparison is timing-safe: secrets.compare_digest is used in middleware.py (no == / != for token comparison). """ from __future__ import annotations import ast import os import time as _time from pathlib import Path from unittest.mock import patch os.environ.setdefault("BOT_TOKEN", "test-bot-token") os.environ.setdefault("CHAT_ID", "-1001234567890") os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") import pytest from tests.conftest import make_app_client _BACKEND_DIR = Path(__file__).parent.parent / "backend" CORRECT_SECRET = "test-webhook-secret" _SAMPLE_UPDATE = { "update_id": 300, "message": { "message_id": 1, "from": {"id": 99000001, "first_name": "Test", "last_name": "User"}, "chat": {"id": 99000001, "type": "private"}, "text": "/start", }, } # --------------------------------------------------------------------------- # Criterion 1 — Rate limiting: first 5 requests pass # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_register_rate_limit_allows_five_requests(): """POST /api/register: первые 5 запросов с одного IP возвращают 200.""" async with make_app_client() as client: for i in range(5): resp = await client.post( "/api/register", json={"uuid": f"rl-ok-{i:03d}", "name": f"User{i}"}, ) assert resp.status_code == 200, ( f"Request {i + 1}/5 unexpectedly returned {resp.status_code}" ) # --------------------------------------------------------------------------- # Criterion 1 — Rate limiting: 6th request is blocked # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_register_rate_limit_blocks_sixth_request(): """POST /api/register: 6-й запрос с одного IP возвращает 429 Too Many Requests.""" async with make_app_client() as client: for i in range(5): await client.post( "/api/register", json={"uuid": f"rl-blk-{i:03d}", "name": f"User{i}"}, ) resp = await client.post( "/api/register", json={"uuid": "rl-blk-999", "name": "Attacker"}, ) assert resp.status_code == 429 # --------------------------------------------------------------------------- # Criterion 1 — Rate limiting: counter resets after window expires # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_register_rate_limit_resets_after_window_expires(): """POST /api/register: после истечения 10-минутного окна запросы снова принимаются.""" base_time = _time.time() async with make_app_client() as client: # Exhaust the rate limit for i in range(5): await client.post( "/api/register", json={"uuid": f"rl-exp-{i:03d}", "name": f"User{i}"}, ) # Verify the 6th is blocked before window expiry blocked = await client.post( "/api/register", json={"uuid": "rl-exp-blk", "name": "Attacker"}, ) assert blocked.status_code == 429, ( "Expected 429 after exhausting rate limit, got " + str(blocked.status_code) ) # Advance clock past the 10-minute window (600 s) with patch("time.time", return_value=base_time + 601): resp_after = await client.post( "/api/register", json={"uuid": "rl-exp-after", "name": "Legit"}, ) assert resp_after.status_code == 200, ( "Expected 200 after rate-limit window expired, got " + str(resp_after.status_code) ) # --------------------------------------------------------------------------- # Criterion 2 — Timing-safe: secrets.compare_digest is imported and used # --------------------------------------------------------------------------- def test_middleware_imports_secrets_module(): """middleware.py должен импортировать модуль secrets.""" source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8") assert "import secrets" in source, ( "middleware.py must import the 'secrets' module" ) def test_middleware_uses_compare_digest(): """middleware.py должен использовать secrets.compare_digest для сравнения токенов.""" source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8") assert "secrets.compare_digest" in source, ( "middleware.py must use secrets.compare_digest — timing-safe comparison required" ) def test_middleware_no_equality_operator_on_token(): """middleware.py не должен сравнивать токен через == или != (уязвимость к timing-атаке).""" source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8") tree = ast.parse(source, filename="middleware.py") unsafe_comparisons = [] for node in ast.walk(tree): if isinstance(node, ast.Compare): for op in node.ops: if isinstance(op, (ast.Eq, ast.NotEq)): all_nodes = [node.left] + node.comparators node_texts = [ast.unparse(n) for n in all_nodes] if any( kw in txt.lower() for txt in node_texts for kw in ("secret", "token", "webhook") ): unsafe_comparisons.append(ast.unparse(node)) assert unsafe_comparisons == [], ( "Timing-unsafe token comparisons found in middleware.py " f"(use secrets.compare_digest instead): {unsafe_comparisons}" ) # --------------------------------------------------------------------------- # Criterion 2 — Timing-safe: functional verification # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_webhook_correct_token_returns_200_with_compare_digest(): """Правильный токен webhook возвращает 200 (timing-safe путь).""" async with make_app_client() as client: resp = await client.post( "/api/webhook/telegram", json=_SAMPLE_UPDATE, headers={"X-Telegram-Bot-Api-Secret-Token": CORRECT_SECRET}, ) assert resp.status_code == 200 @pytest.mark.asyncio async def test_webhook_wrong_token_returns_403_with_compare_digest(): """Неверный токен webhook возвращает 403 (timing-safe путь).""" async with make_app_client() as client: resp = await client.post( "/api/webhook/telegram", json=_SAMPLE_UPDATE, headers={"X-Telegram-Bot-Api-Secret-Token": "wrong-token-x9k2"}, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_webhook_missing_token_returns_403_with_compare_digest(): """Отсутствующий токен webhook возвращает 403 (timing-safe путь).""" async with make_app_client() as client: resp = await client.post( "/api/webhook/telegram", json=_SAMPLE_UPDATE, ) assert resp.status_code == 403