baton/tests/test_arch_003.py

204 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tests for BATON-ARCH-003: Rate limiting on /api/register + timing-safe token comparison.
Acceptance criteria:
1. /api/register blocks brute-force at the application level:
5 requests pass (200), 6th returns 429; counter resets after the 10-minute window.
2. Token comparison is timing-safe:
secrets.compare_digest is used in middleware.py (no == / != for token comparison).
"""
from __future__ import annotations
import ast
import os
import time as _time
from pathlib import Path
from unittest.mock import patch
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
import pytest
from tests.conftest import make_app_client
_BACKEND_DIR = Path(__file__).parent.parent / "backend"
CORRECT_SECRET = "test-webhook-secret"
_SAMPLE_UPDATE = {
"update_id": 300,
"message": {
"message_id": 1,
"from": {"id": 99000001, "first_name": "Test", "last_name": "User"},
"chat": {"id": 99000001, "type": "private"},
"text": "/start",
},
}
# ---------------------------------------------------------------------------
# Criterion 1 — Rate limiting: first 5 requests pass
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_rate_limit_allows_five_requests():
"""POST /api/register: первые 5 запросов с одного IP возвращают 200."""
async with make_app_client() as client:
for i in range(5):
resp = await client.post(
"/api/register",
json={"uuid": f"rl-ok-{i:03d}", "name": f"User{i}"},
)
assert resp.status_code == 200, (
f"Request {i + 1}/5 unexpectedly returned {resp.status_code}"
)
# ---------------------------------------------------------------------------
# Criterion 1 — Rate limiting: 6th request is blocked
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_rate_limit_blocks_sixth_request():
"""POST /api/register: 6-й запрос с одного IP возвращает 429 Too Many Requests."""
async with make_app_client() as client:
for i in range(5):
await client.post(
"/api/register",
json={"uuid": f"rl-blk-{i:03d}", "name": f"User{i}"},
)
resp = await client.post(
"/api/register",
json={"uuid": "rl-blk-999", "name": "Attacker"},
)
assert resp.status_code == 429
# ---------------------------------------------------------------------------
# Criterion 1 — Rate limiting: counter resets after window expires
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_rate_limit_resets_after_window_expires():
"""POST /api/register: после истечения 10-минутного окна запросы снова принимаются."""
base_time = _time.time()
async with make_app_client() as client:
# Exhaust the rate limit
for i in range(5):
await client.post(
"/api/register",
json={"uuid": f"rl-exp-{i:03d}", "name": f"User{i}"},
)
# Verify the 6th is blocked before window expiry
blocked = await client.post(
"/api/register",
json={"uuid": "rl-exp-blk", "name": "Attacker"},
)
assert blocked.status_code == 429, (
"Expected 429 after exhausting rate limit, got " + str(blocked.status_code)
)
# Advance clock past the 10-minute window (600 s)
with patch("time.time", return_value=base_time + 601):
resp_after = await client.post(
"/api/register",
json={"uuid": "rl-exp-after", "name": "Legit"},
)
assert resp_after.status_code == 200, (
"Expected 200 after rate-limit window expired, got " + str(resp_after.status_code)
)
# ---------------------------------------------------------------------------
# Criterion 2 — Timing-safe: secrets.compare_digest is imported and used
# ---------------------------------------------------------------------------
def test_middleware_imports_secrets_module():
"""middleware.py должен импортировать модуль secrets."""
source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8")
assert "import secrets" in source, (
"middleware.py must import the 'secrets' module"
)
def test_middleware_uses_compare_digest():
"""middleware.py должен использовать secrets.compare_digest для сравнения токенов."""
source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8")
assert "secrets.compare_digest" in source, (
"middleware.py must use secrets.compare_digest — timing-safe comparison required"
)
def test_middleware_no_equality_operator_on_token():
"""middleware.py не должен сравнивать токен через == или != (уязвимость к timing-атаке)."""
source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8")
tree = ast.parse(source, filename="middleware.py")
unsafe_comparisons = []
for node in ast.walk(tree):
if isinstance(node, ast.Compare):
for op in node.ops:
if isinstance(op, (ast.Eq, ast.NotEq)):
all_nodes = [node.left] + node.comparators
node_texts = [ast.unparse(n) for n in all_nodes]
if any(
kw in txt.lower()
for txt in node_texts
for kw in ("secret", "token", "webhook")
):
unsafe_comparisons.append(ast.unparse(node))
assert unsafe_comparisons == [], (
"Timing-unsafe token comparisons found in middleware.py "
f"(use secrets.compare_digest instead): {unsafe_comparisons}"
)
# ---------------------------------------------------------------------------
# Criterion 2 — Timing-safe: functional verification
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_webhook_correct_token_returns_200_with_compare_digest():
"""Правильный токен webhook возвращает 200 (timing-safe путь)."""
async with make_app_client() as client:
resp = await client.post(
"/api/webhook/telegram",
json=_SAMPLE_UPDATE,
headers={"X-Telegram-Bot-Api-Secret-Token": CORRECT_SECRET},
)
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_webhook_wrong_token_returns_403_with_compare_digest():
"""Неверный токен webhook возвращает 403 (timing-safe путь)."""
async with make_app_client() as client:
resp = await client.post(
"/api/webhook/telegram",
json=_SAMPLE_UPDATE,
headers={"X-Telegram-Bot-Api-Secret-Token": "wrong-token-x9k2"},
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_webhook_missing_token_returns_403_with_compare_digest():
"""Отсутствующий токен webhook возвращает 403 (timing-safe путь)."""
async with make_app_client() as client:
resp = await client.post(
"/api/webhook/telegram",
json=_SAMPLE_UPDATE,
)
assert resp.status_code == 403