""" Regression tests for BATON-SEC-007: 1. Retry loop in telegram.py is bounded to exactly 3 attempts. 2. Exponential backoff applies correctly: sleep = retry_after * (attempt + 1). 3. POST /api/signal uses asyncio.create_task — HTTP response is not blocked by Telegram rate-limit pauses. 4. GET /health returns only {"status": "ok"} — no timestamp field. """ from __future__ import annotations import asyncio import logging import os os.environ.setdefault("BOT_TOKEN", "test-bot-token") os.environ.setdefault("CHAT_ID", "-1001234567890") os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") from unittest.mock import AsyncMock, patch import httpx import pytest import respx from backend import config from backend.telegram import send_message from tests.conftest import make_app_client SEND_URL = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage" # --------------------------------------------------------------------------- # Criterion 1 — retry loop is bounded to max 3 attempts # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_retry_loop_stops_after_3_attempts_on_all_429(): """When all 3 responses are 429, send_message makes exactly 3 HTTP requests and stops.""" responses = [ httpx.Response(429, json={"ok": False, "parameters": {"retry_after": 1}}), httpx.Response(429, json={"ok": False, "parameters": {"retry_after": 1}}), httpx.Response(429, json={"ok": False, "parameters": {"retry_after": 1}}), ] with respx.mock(assert_all_called=False) as mock: route = mock.post(SEND_URL).mock(side_effect=responses) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await send_message("test max 3 attempts") assert route.call_count == 3 @pytest.mark.asyncio async def test_retry_loop_does_not_make_4th_attempt_on_all_429(): """send_message must never attempt a 4th request when the first 3 all return 429.""" call_count = 0 async def _count_and_return_429(_request): nonlocal call_count call_count += 1 return httpx.Response(429, json={"ok": False, "parameters": {"retry_after": 1}}) with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(side_effect=_count_and_return_429) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await send_message("test no 4th attempt") assert call_count == 3 # --------------------------------------------------------------------------- # Criterion 2 — exponential backoff: sleep = retry_after * (attempt + 1) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_retry_429_first_attempt_sleeps_retry_after_times_1(): """First 429 (attempt 0): sleep duration must be retry_after * 1.""" retry_after = 7 responses = [ httpx.Response(429, json={"ok": False, "parameters": {"retry_after": retry_after}}), httpx.Response(200, json={"ok": True}), ] with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(side_effect=responses) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: await send_message("test attempt 0 backoff") mock_sleep.assert_called_once_with(retry_after * 1) @pytest.mark.asyncio async def test_retry_429_exponential_backoff_sleep_sequence(): """Two consecutive 429 responses produce sleep = retry_after*1 then retry_after*2.""" retry_after = 10 responses = [ httpx.Response(429, json={"ok": False, "parameters": {"retry_after": retry_after}}), httpx.Response(429, json={"ok": False, "parameters": {"retry_after": retry_after}}), httpx.Response(200, json={"ok": True}), ] with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(side_effect=responses) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: await send_message("test backoff sequence") sleep_args = [c.args[0] for c in mock_sleep.call_args_list] assert retry_after * 1 in sleep_args, f"Expected sleep({retry_after}) not found in {sleep_args}" assert retry_after * 2 in sleep_args, f"Expected sleep({retry_after * 2}) not found in {sleep_args}" @pytest.mark.asyncio async def test_retry_429_third_attempt_sleeps_retry_after_times_3(): """Third 429 (attempt 2): sleep duration must be retry_after * 3.""" retry_after = 5 responses = [ httpx.Response(429, json={"ok": False, "parameters": {"retry_after": retry_after}}), httpx.Response(429, json={"ok": False, "parameters": {"retry_after": retry_after}}), httpx.Response(429, json={"ok": False, "parameters": {"retry_after": retry_after}}), ] with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(side_effect=responses) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: await send_message("test attempt 2 backoff") sleep_args = [c.args[0] for c in mock_sleep.call_args_list] assert retry_after * 3 in sleep_args, f"Expected sleep({retry_after * 3}) not found in {sleep_args}" # --------------------------------------------------------------------------- # After exhausting all 3 attempts — error is logged # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_send_message_all_attempts_exhausted_logs_error(caplog): """After 3 failed 429 attempts, an ERROR containing 'all 3 attempts' is logged.""" responses = [ httpx.Response(429, json={"ok": False, "parameters": {"retry_after": 1}}), httpx.Response(429, json={"ok": False, "parameters": {"retry_after": 1}}), httpx.Response(429, json={"ok": False, "parameters": {"retry_after": 1}}), ] with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(side_effect=responses) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): with caplog.at_level(logging.ERROR, logger="backend.telegram"): await send_message("test exhausted log") error_messages = [r.message for r in caplog.records if r.levelno == logging.ERROR] assert any("all 3 attempts" in m.lower() for m in error_messages), ( f"Expected 'all 3 attempts' in error logs, got: {error_messages}" ) # --------------------------------------------------------------------------- # Criterion 3 — POST /api/signal uses asyncio.create_task (non-blocking) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_signal_uses_create_task_for_telegram_send_message(): """POST /api/signal must wrap telegram.send_message in asyncio.create_task.""" with patch("backend.main.asyncio.create_task", wraps=asyncio.create_task) as mock_ct: async with make_app_client() as client: await client.post("/api/register", json={"uuid": "d4e5f6a7-b8c9-4d0e-a1b2-c3d4e5f6a7b8", "name": "CT"}) resp = await client.post( "/api/signal", json={"user_id": "d4e5f6a7-b8c9-4d0e-a1b2-c3d4e5f6a7b8", "timestamp": 1742478000000}, ) assert resp.status_code == 200 assert mock_ct.called, "asyncio.create_task was never called — send_message may have been awaited directly" @pytest.mark.asyncio async def test_signal_response_returns_before_telegram_completes(): """POST /api/signal returns 200 even when Telegram send_message is delayed.""" # Simulate a slow Telegram response. If send_message is awaited directly, # the HTTP response would be delayed until sleep completes. slow_sleep_called = False async def slow_send_message(_text: str) -> None: nonlocal slow_sleep_called slow_sleep_called = True await asyncio.sleep(9999) # would block forever if awaited with patch("backend.main.telegram.send_message", side_effect=slow_send_message): with patch("backend.main.asyncio.create_task", wraps=asyncio.create_task): async with make_app_client() as client: await client.post( "/api/register", json={"uuid": "e5f6a7b8-c9d0-4e1f-a2b3-c4d5e6f7a8b9", "name": "Slow"}, ) resp = await client.post( "/api/signal", json={ "user_id": "e5f6a7b8-c9d0-4e1f-a2b3-c4d5e6f7a8b9", "timestamp": 1742478000000, }, ) # Response must be immediate — no blocking on the 9999-second sleep assert resp.status_code == 200 # --------------------------------------------------------------------------- # Criterion 4 — GET /health exact response body (regression guard) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_health_response_is_exactly_status_ok(): """GET /health body must be exactly {"status": "ok"} — no extra fields.""" async with make_app_client() as client: response = await client.get("/health") assert response.status_code == 200 assert response.json() == {"status": "ok"} @pytest.mark.asyncio async def test_health_no_timestamp_field(): """GET /health must not expose a timestamp field (time-based fingerprinting prevention).""" async with make_app_client() as client: response = await client.get("/health") assert "timestamp" not in response.json()