baton/tests/test_arch_003.py

227 lines
8.4 KiB
Python
Raw Normal View History

"""
Tests for BATON-ARCH-003: Rate limiting on /api/register + timing-safe token comparison.
Acceptance criteria:
1. /api/register blocks brute-force at the application level:
5 requests pass (200), 6th returns 429; counter resets after the 10-minute window.
2. Token comparison is timing-safe:
secrets.compare_digest is used in middleware.py (no == / != for token comparison).
2026-03-21 08:12:01 +02:00
UUID notes: RegisterRequest.uuid requires a valid UUID v4 pattern.
All UUID constants below satisfy this constraint.
"""
from __future__ import annotations
import ast
import os
import time as _time
from pathlib import Path
from unittest.mock import patch
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
2026-03-21 08:12:01 +02:00
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
import pytest
from tests.conftest import make_app_client
_BACKEND_DIR = Path(__file__).parent.parent / "backend"
CORRECT_SECRET = "test-webhook-secret"
_SAMPLE_UPDATE = {
"update_id": 300,
"message": {
"message_id": 1,
"from": {"id": 99000001, "first_name": "Test", "last_name": "User"},
"chat": {"id": 99000001, "type": "private"},
"text": "/start",
},
}
2026-03-21 08:12:01 +02:00
# Valid UUID v4 constants for rate-limit tests
# Pattern: [0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}
_UUIDS_OK = [
f"d0{i:06d}-0000-4000-8000-000000000001"
for i in range(10)
]
_UUIDS_BLK = [
f"d1{i:06d}-0000-4000-8000-000000000001"
for i in range(10)
]
_UUIDS_EXP = [
f"d2{i:06d}-0000-4000-8000-000000000001"
for i in range(10)
]
_UUID_BLK_999 = "d1000999-0000-4000-8000-000000000001"
_UUID_EXP_BLK = "d2000999-0000-4000-8000-000000000001"
_UUID_EXP_AFTER = "d2001000-0000-4000-8000-000000000001"
# ---------------------------------------------------------------------------
# Criterion 1 — Rate limiting: first 5 requests pass
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_rate_limit_allows_five_requests():
"""POST /api/register: первые 5 запросов с одного IP возвращают 200."""
async with make_app_client() as client:
for i in range(5):
resp = await client.post(
"/api/register",
2026-03-21 08:12:01 +02:00
json={"uuid": _UUIDS_OK[i], "name": f"User{i}"},
)
assert resp.status_code == 200, (
f"Request {i + 1}/5 unexpectedly returned {resp.status_code}"
)
# ---------------------------------------------------------------------------
# Criterion 1 — Rate limiting: 6th request is blocked
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_rate_limit_blocks_sixth_request():
"""POST /api/register: 6-й запрос с одного IP возвращает 429 Too Many Requests."""
async with make_app_client() as client:
for i in range(5):
await client.post(
"/api/register",
2026-03-21 08:12:01 +02:00
json={"uuid": _UUIDS_BLK[i], "name": f"User{i}"},
)
resp = await client.post(
"/api/register",
2026-03-21 08:12:01 +02:00
json={"uuid": _UUID_BLK_999, "name": "Attacker"},
)
assert resp.status_code == 429
# ---------------------------------------------------------------------------
# Criterion 1 — Rate limiting: counter resets after window expires
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_rate_limit_resets_after_window_expires():
"""POST /api/register: после истечения 10-минутного окна запросы снова принимаются."""
base_time = _time.time()
async with make_app_client() as client:
# Exhaust the rate limit
for i in range(5):
await client.post(
"/api/register",
2026-03-21 08:12:01 +02:00
json={"uuid": _UUIDS_EXP[i], "name": f"User{i}"},
)
# Verify the 6th is blocked before window expiry
blocked = await client.post(
"/api/register",
2026-03-21 08:12:01 +02:00
json={"uuid": _UUID_EXP_BLK, "name": "Attacker"},
)
assert blocked.status_code == 429, (
"Expected 429 after exhausting rate limit, got " + str(blocked.status_code)
)
# Advance clock past the 10-minute window (600 s)
with patch("time.time", return_value=base_time + 601):
resp_after = await client.post(
"/api/register",
2026-03-21 08:12:01 +02:00
json={"uuid": _UUID_EXP_AFTER, "name": "Legit"},
)
assert resp_after.status_code == 200, (
"Expected 200 after rate-limit window expired, got " + str(resp_after.status_code)
)
# ---------------------------------------------------------------------------
# Criterion 2 — Timing-safe: secrets.compare_digest is imported and used
# ---------------------------------------------------------------------------
def test_middleware_imports_secrets_module():
"""middleware.py должен импортировать модуль secrets."""
source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8")
assert "import secrets" in source, (
"middleware.py must import the 'secrets' module"
)
def test_middleware_uses_compare_digest():
"""middleware.py должен использовать secrets.compare_digest для сравнения токенов."""
source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8")
assert "secrets.compare_digest" in source, (
"middleware.py must use secrets.compare_digest — timing-safe comparison required"
)
def test_middleware_no_equality_operator_on_token():
"""middleware.py не должен сравнивать токен через == или != (уязвимость к timing-атаке)."""
source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8")
tree = ast.parse(source, filename="middleware.py")
unsafe_comparisons = []
for node in ast.walk(tree):
if isinstance(node, ast.Compare):
for op in node.ops:
if isinstance(op, (ast.Eq, ast.NotEq)):
all_nodes = [node.left] + node.comparators
node_texts = [ast.unparse(n) for n in all_nodes]
if any(
kw in txt.lower()
for txt in node_texts
for kw in ("secret", "token", "webhook")
):
unsafe_comparisons.append(ast.unparse(node))
assert unsafe_comparisons == [], (
"Timing-unsafe token comparisons found in middleware.py "
f"(use secrets.compare_digest instead): {unsafe_comparisons}"
)
# ---------------------------------------------------------------------------
# Criterion 2 — Timing-safe: functional verification
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_webhook_correct_token_returns_200_with_compare_digest():
"""Правильный токен webhook возвращает 200 (timing-safe путь)."""
async with make_app_client() as client:
resp = await client.post(
"/api/webhook/telegram",
json=_SAMPLE_UPDATE,
headers={"X-Telegram-Bot-Api-Secret-Token": CORRECT_SECRET},
)
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_webhook_wrong_token_returns_403_with_compare_digest():
"""Неверный токен webhook возвращает 403 (timing-safe путь)."""
async with make_app_client() as client:
resp = await client.post(
"/api/webhook/telegram",
json=_SAMPLE_UPDATE,
headers={"X-Telegram-Bot-Api-Secret-Token": "wrong-token-x9k2"},
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_webhook_missing_token_returns_403_with_compare_digest():
"""Отсутствующий токен webhook возвращает 403 (timing-safe путь)."""
async with make_app_client() as client:
resp = await client.post(
"/api/webhook/telegram",
json=_SAMPLE_UPDATE,
)
assert resp.status_code == 403