kin: BATON-ARCH-012 Добавить WEBHOOK_ENABLED флаг для локальной разработки

This commit is contained in:
Gros Frumos 2026-03-20 21:03:45 +02:00
parent 69d01ac3a6
commit 0f8ecdfc49
5 changed files with 434 additions and 0 deletions

204
tests/test_arch_003.py Normal file
View file

@ -0,0 +1,204 @@
"""
Tests for BATON-ARCH-003: Rate limiting on /api/register + timing-safe token comparison.
Acceptance criteria:
1. /api/register blocks brute-force at the application level:
5 requests pass (200), 6th returns 429; counter resets after the 10-minute window.
2. Token comparison is timing-safe:
secrets.compare_digest is used in middleware.py (no == / != for token comparison).
"""
from __future__ import annotations
import ast
import os
import time as _time
from pathlib import Path
from unittest.mock import patch
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
import pytest
from tests.conftest import make_app_client
_BACKEND_DIR = Path(__file__).parent.parent / "backend"
CORRECT_SECRET = "test-webhook-secret"
_SAMPLE_UPDATE = {
"update_id": 300,
"message": {
"message_id": 1,
"from": {"id": 99000001, "first_name": "Test", "last_name": "User"},
"chat": {"id": 99000001, "type": "private"},
"text": "/start",
},
}
# ---------------------------------------------------------------------------
# Criterion 1 — Rate limiting: first 5 requests pass
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_rate_limit_allows_five_requests():
"""POST /api/register: первые 5 запросов с одного IP возвращают 200."""
async with make_app_client() as client:
for i in range(5):
resp = await client.post(
"/api/register",
json={"uuid": f"rl-ok-{i:03d}", "name": f"User{i}"},
)
assert resp.status_code == 200, (
f"Request {i + 1}/5 unexpectedly returned {resp.status_code}"
)
# ---------------------------------------------------------------------------
# Criterion 1 — Rate limiting: 6th request is blocked
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_rate_limit_blocks_sixth_request():
"""POST /api/register: 6-й запрос с одного IP возвращает 429 Too Many Requests."""
async with make_app_client() as client:
for i in range(5):
await client.post(
"/api/register",
json={"uuid": f"rl-blk-{i:03d}", "name": f"User{i}"},
)
resp = await client.post(
"/api/register",
json={"uuid": "rl-blk-999", "name": "Attacker"},
)
assert resp.status_code == 429
# ---------------------------------------------------------------------------
# Criterion 1 — Rate limiting: counter resets after window expires
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_rate_limit_resets_after_window_expires():
"""POST /api/register: после истечения 10-минутного окна запросы снова принимаются."""
base_time = _time.time()
async with make_app_client() as client:
# Exhaust the rate limit
for i in range(5):
await client.post(
"/api/register",
json={"uuid": f"rl-exp-{i:03d}", "name": f"User{i}"},
)
# Verify the 6th is blocked before window expiry
blocked = await client.post(
"/api/register",
json={"uuid": "rl-exp-blk", "name": "Attacker"},
)
assert blocked.status_code == 429, (
"Expected 429 after exhausting rate limit, got " + str(blocked.status_code)
)
# Advance clock past the 10-minute window (600 s)
with patch("time.time", return_value=base_time + 601):
resp_after = await client.post(
"/api/register",
json={"uuid": "rl-exp-after", "name": "Legit"},
)
assert resp_after.status_code == 200, (
"Expected 200 after rate-limit window expired, got " + str(resp_after.status_code)
)
# ---------------------------------------------------------------------------
# Criterion 2 — Timing-safe: secrets.compare_digest is imported and used
# ---------------------------------------------------------------------------
def test_middleware_imports_secrets_module():
"""middleware.py должен импортировать модуль secrets."""
source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8")
assert "import secrets" in source, (
"middleware.py must import the 'secrets' module"
)
def test_middleware_uses_compare_digest():
"""middleware.py должен использовать secrets.compare_digest для сравнения токенов."""
source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8")
assert "secrets.compare_digest" in source, (
"middleware.py must use secrets.compare_digest — timing-safe comparison required"
)
def test_middleware_no_equality_operator_on_token():
"""middleware.py не должен сравнивать токен через == или != (уязвимость к timing-атаке)."""
source = (_BACKEND_DIR / "middleware.py").read_text(encoding="utf-8")
tree = ast.parse(source, filename="middleware.py")
unsafe_comparisons = []
for node in ast.walk(tree):
if isinstance(node, ast.Compare):
for op in node.ops:
if isinstance(op, (ast.Eq, ast.NotEq)):
all_nodes = [node.left] + node.comparators
node_texts = [ast.unparse(n) for n in all_nodes]
if any(
kw in txt.lower()
for txt in node_texts
for kw in ("secret", "token", "webhook")
):
unsafe_comparisons.append(ast.unparse(node))
assert unsafe_comparisons == [], (
"Timing-unsafe token comparisons found in middleware.py "
f"(use secrets.compare_digest instead): {unsafe_comparisons}"
)
# ---------------------------------------------------------------------------
# Criterion 2 — Timing-safe: functional verification
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_webhook_correct_token_returns_200_with_compare_digest():
"""Правильный токен webhook возвращает 200 (timing-safe путь)."""
async with make_app_client() as client:
resp = await client.post(
"/api/webhook/telegram",
json=_SAMPLE_UPDATE,
headers={"X-Telegram-Bot-Api-Secret-Token": CORRECT_SECRET},
)
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_webhook_wrong_token_returns_403_with_compare_digest():
"""Неверный токен webhook возвращает 403 (timing-safe путь)."""
async with make_app_client() as client:
resp = await client.post(
"/api/webhook/telegram",
json=_SAMPLE_UPDATE,
headers={"X-Telegram-Bot-Api-Secret-Token": "wrong-token-x9k2"},
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_webhook_missing_token_returns_403_with_compare_digest():
"""Отсутствующий токен webhook возвращает 403 (timing-safe путь)."""
async with make_app_client() as client:
resp = await client.post(
"/api/webhook/telegram",
json=_SAMPLE_UPDATE,
)
assert resp.status_code == 403

61
tests/test_arch_012.py Normal file
View file

@ -0,0 +1,61 @@
"""
Tests for BATON-ARCH-012: WEBHOOK_ENABLED flag for local development.
Acceptance criteria:
1. When WEBHOOK_ENABLED=False set_webhook is NOT called during lifespan startup
(mock set_webhook.call_count == 0).
2. When WEBHOOK_ENABLED=True (default) set_webhook IS called exactly once.
"""
from __future__ import annotations
import os
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
from unittest.mock import AsyncMock, patch
import pytest
from tests.conftest import temp_db
# ---------------------------------------------------------------------------
# Criterion 1 — WEBHOOK_ENABLED=False: set_webhook must NOT be called
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_lifespan_webhook_disabled_set_webhook_not_called():
"""При WEBHOOK_ENABLED=False вызов set_webhook не должен происходить (call_count == 0)."""
from backend.main import app
with temp_db():
with patch("backend.telegram.set_webhook", new_callable=AsyncMock) as mock_set_webhook:
with patch("backend.config.WEBHOOK_ENABLED", False):
async with app.router.lifespan_context(app):
pass
assert mock_set_webhook.call_count == 0
# ---------------------------------------------------------------------------
# Criterion 2 — WEBHOOK_ENABLED=True (default): set_webhook must be called
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_lifespan_webhook_enabled_set_webhook_called_once():
"""При WEBHOOK_ENABLED=True вызов set_webhook должен произойти ровно один раз."""
from backend.main import app
with temp_db():
with patch("backend.telegram.set_webhook", new_callable=AsyncMock) as mock_set_webhook:
with patch("backend.config.WEBHOOK_ENABLED", True):
async with app.router.lifespan_context(app):
pass
assert mock_set_webhook.call_count == 1