from __future__ import annotations import secrets from typing import Optional from fastapi import Depends, Header, HTTPException, Request from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from backend import config, db _bearer = HTTPBearer(auto_error=False) _RATE_LIMIT = 5 _RATE_WINDOW = 600 # 10 minutes _SIGNAL_RATE_LIMIT = 10 _SIGNAL_RATE_WINDOW = 60 # 1 minute _AUTH_REGISTER_RATE_LIMIT = 3 _AUTH_REGISTER_RATE_WINDOW = 600 # 10 minutes def _get_client_ip(request: Request) -> str: return ( request.headers.get("X-Real-IP") or request.headers.get("X-Forwarded-For", "").split(",")[0].strip() or (request.client.host if request.client else "unknown") ) async def verify_webhook_secret( x_telegram_bot_api_secret_token: str = Header(default=""), ) -> None: if not secrets.compare_digest( x_telegram_bot_api_secret_token, config.WEBHOOK_SECRET ): raise HTTPException(status_code=403, detail="Forbidden") async def verify_admin_token( credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer), ) -> None: if credentials is None or not secrets.compare_digest( credentials.credentials, config.ADMIN_TOKEN ): raise HTTPException(status_code=401, detail="Unauthorized") async def rate_limit_register(request: Request) -> None: key = f"reg:{_get_client_ip(request)}" count = await db.rate_limit_increment(key, _RATE_WINDOW) if count > _RATE_LIMIT: raise HTTPException(status_code=429, detail="Too Many Requests") async def rate_limit_signal(request: Request) -> None: key = f"sig:{_get_client_ip(request)}" count = await db.rate_limit_increment(key, _SIGNAL_RATE_WINDOW) if count > _SIGNAL_RATE_LIMIT: raise HTTPException(status_code=429, detail="Too Many Requests") async def rate_limit_auth_register(request: Request) -> None: key = f"authreg:{_get_client_ip(request)}" count = await db.rate_limit_increment(key, _AUTH_REGISTER_RATE_WINDOW) if count > _AUTH_REGISTER_RATE_LIMIT: raise HTTPException(status_code=429, detail="Too Many Requests")