""" Tests for BATON-SEC-002: 1. _get_client_ip() extracts real IP from X-Real-IP / X-Forwarded-For headers. 2. POST /api/signal returns 429 when the per-IP rate limit is exceeded. 3. Rate counters for register and signal are independent (separate key namespaces). UUID notes: RegisterRequest.uuid and SignalRequest.user_id both require a valid UUID v4 pattern (^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$). All constants below satisfy this constraint. """ from __future__ import annotations import os os.environ.setdefault("BOT_TOKEN", "test-bot-token") os.environ.setdefault("CHAT_ID", "-1001234567890") os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") os.environ.setdefault("ADMIN_TOKEN", "test-admin-token") import pytest from starlette.requests import Request from backend.middleware import _get_client_ip from tests.conftest import make_app_client # ── Valid UUID v4 constants ────────────────────────────────────────────────── # Pattern: xxxxxxxx-xxxx-4xxx-[89ab]xxx-xxxxxxxxxxxx (all hex chars) _UUID_SIG_RL = "a0000001-0000-4000-8000-000000000001" # rate-limit 429 test _UUID_SIG_OK = "a0000002-0000-4000-8000-000000000002" # first-10-allowed test _UUID_IND_SIG = "a0000003-0000-4000-8000-000000000003" # independence (exhaust signal) _UUID_IND_SIG2 = "a0000033-0000-4000-8000-000000000033" # second register after exhaust _UUID_IND_REG = "a0000004-0000-4000-8000-000000000004" # independence (exhaust register) _UUID_IP_A = "a0000005-0000-4000-8000-000000000005" # per-IP isolation, user A _UUID_IP_B = "a0000006-0000-4000-8000-000000000006" # per-IP isolation, user B # ── Helpers ───────────────────────────────────────────────────────────────── def _make_request(headers: dict | None = None, client_host: str = "127.0.0.1") -> Request: """Build a minimal Starlette Request with given headers and remote address.""" scope = { "type": "http", "method": "POST", "path": "/", "headers": [ (k.lower().encode(), v.encode()) for k, v in (headers or {}).items() ], "client": (client_host, 12345), } return Request(scope) # ── Unit: _get_client_ip ──────────────────────────────────────────────────── def test_get_client_ip_returns_x_real_ip_when_present(): """X-Real-IP header is returned as-is (highest priority).""" req = _make_request({"X-Real-IP": "203.0.113.10"}, client_host="127.0.0.1") assert _get_client_ip(req) == "203.0.113.10" def test_get_client_ip_ignores_client_host_when_x_real_ip_set(): """When X-Real-IP is present, client.host (127.0.0.1) must NOT be returned.""" req = _make_request({"X-Real-IP": "10.20.30.40"}, client_host="127.0.0.1") assert _get_client_ip(req) != "127.0.0.1" def test_get_client_ip_uses_x_forwarded_for_when_no_x_real_ip(): """X-Forwarded-For is used when X-Real-IP is absent.""" req = _make_request({"X-Forwarded-For": "198.51.100.5"}, client_host="127.0.0.1") assert _get_client_ip(req) == "198.51.100.5" def test_get_client_ip_x_forwarded_for_returns_first_ip_in_chain(): """When X-Forwarded-For contains a chain, only the first (original) IP is returned.""" req = _make_request( {"X-Forwarded-For": "192.0.2.1, 10.0.0.1, 172.16.0.1"}, client_host="127.0.0.1", ) assert _get_client_ip(req) == "192.0.2.1" def test_get_client_ip_x_real_ip_takes_priority_over_x_forwarded_for(): """X-Real-IP beats X-Forwarded-For when both headers are present.""" req = _make_request( {"X-Real-IP": "1.1.1.1", "X-Forwarded-For": "2.2.2.2"}, client_host="127.0.0.1", ) assert _get_client_ip(req) == "1.1.1.1" def test_get_client_ip_falls_back_to_client_host_when_no_proxy_headers(): """Without proxy headers, client.host is returned.""" req = _make_request(client_host="203.0.113.99") assert _get_client_ip(req) == "203.0.113.99" def test_get_client_ip_returns_unknown_when_no_client_and_no_headers(): """If no proxy headers and client is None, 'unknown' is returned.""" scope = { "type": "http", "method": "POST", "path": "/", "headers": [], "client": None, } req = Request(scope) assert _get_client_ip(req) == "unknown" # ── Integration: signal rate limit (429) ──────────────────────────────────── @pytest.mark.asyncio async def test_signal_rate_limit_returns_429_after_10_requests(): """POST /api/signal returns 429 on the 11th request from the same IP.""" async with make_app_client() as client: await client.post("/api/register", json={"uuid": _UUID_SIG_RL, "name": "RL"}) payload = {"user_id": _UUID_SIG_RL, "timestamp": 1742478000000} ip_hdrs = {"X-Real-IP": "5.5.5.5"} statuses = [] for _ in range(11): r = await client.post("/api/signal", json=payload, headers=ip_hdrs) statuses.append(r.status_code) assert statuses[-1] == 429, f"Expected 429 on 11th request, got {statuses}" @pytest.mark.asyncio async def test_signal_first_10_requests_are_allowed(): """First 10 POST /api/signal requests from the same IP must all return 200.""" async with make_app_client() as client: await client.post("/api/register", json={"uuid": _UUID_SIG_OK, "name": "OK"}) payload = {"user_id": _UUID_SIG_OK, "timestamp": 1742478000000} ip_hdrs = {"X-Real-IP": "6.6.6.6"} statuses = [] for _ in range(10): r = await client.post("/api/signal", json=payload, headers=ip_hdrs) statuses.append(r.status_code) assert all(s == 200 for s in statuses), ( f"Some request(s) before limit returned non-200: {statuses}" ) # ── Integration: independence of register and signal rate limits ───────────── @pytest.mark.asyncio async def test_signal_rate_limit_does_not_affect_register_counter(): """ Exhausting the signal rate limit (11 requests) must NOT cause /api/register to return 429 — the counters use different keys ('sig:IP' vs 'IP'). """ async with make_app_client() as client: ip_hdrs = {"X-Real-IP": "7.7.7.7"} # Register a user (increments register counter, key='7.7.7.7', count=1) r_reg = await client.post( "/api/register", json={"uuid": _UUID_IND_SIG, "name": "Ind"}, headers=ip_hdrs, ) assert r_reg.status_code == 200 # Exhaust signal rate limit for same IP (11 requests, key='sig:7.7.7.7') payload = {"user_id": _UUID_IND_SIG, "timestamp": 1742478000000} for _ in range(11): await client.post("/api/signal", json=payload, headers=ip_hdrs) # Register counter is still at 1 — must allow another registration r_reg2 = await client.post( "/api/register", json={"uuid": _UUID_IND_SIG2, "name": "Ind2"}, headers=ip_hdrs, ) assert r_reg2.status_code == 200, ( f"Register returned {r_reg2.status_code} — " "signal exhaustion incorrectly bled into register counter" ) @pytest.mark.asyncio async def test_register_rate_limit_does_not_affect_signal_counter(): """ Exhausting the register rate limit (6 requests → 6th returns 429) must NOT prevent subsequent /api/signal requests from the same IP. """ async with make_app_client() as client: ip_hdrs = {"X-Real-IP": "8.8.8.8"} # First register succeeds and creates the user we'll signal later r0 = await client.post( "/api/register", json={"uuid": _UUID_IND_REG, "name": "Reg"}, headers=ip_hdrs, ) assert r0.status_code == 200 # Send 5 more register requests from the same IP to exhaust the limit # (register limit = 5/600s, so request #6 → 429) for _ in range(5): await client.post( "/api/register", json={"uuid": _UUID_IND_REG, "name": "Reg"}, headers=ip_hdrs, ) # Signal must still succeed — signal counter (key='sig:8.8.8.8') is still 0 r_sig = await client.post( "/api/signal", json={"user_id": _UUID_IND_REG, "timestamp": 1742478000000}, headers=ip_hdrs, ) assert r_sig.status_code == 200, ( f"Signal returned {r_sig.status_code} — " "register exhaustion incorrectly bled into signal counter" ) # ── Integration: signal rate limit is per-IP ───────────────────────────────── @pytest.mark.asyncio async def test_signal_rate_limit_is_per_ip_different_ips_are_independent(): """ Rate limit counters are per-IP — exhausting for IP A must not block IP B. """ async with make_app_client() as client: await client.post("/api/register", json={"uuid": _UUID_IP_A, "name": "IPA"}) await client.post("/api/register", json={"uuid": _UUID_IP_B, "name": "IPB"}) # Exhaust rate limit for IP A (11 requests → 11th is 429) for _ in range(11): await client.post( "/api/signal", json={"user_id": _UUID_IP_A, "timestamp": 1742478000000}, headers={"X-Real-IP": "11.11.11.11"}, ) # IP B should still be allowed (independent counter) r = await client.post( "/api/signal", json={"user_id": _UUID_IP_B, "timestamp": 1742478000000}, headers={"X-Real-IP": "22.22.22.22"}, ) assert r.status_code == 200, f"IP B was incorrectly blocked: {r.status_code}"