57 lines
1.7 KiB
Python
57 lines
1.7 KiB
Python
from __future__ import annotations
|
|
|
|
import secrets
|
|
from typing import Optional
|
|
|
|
from fastapi import Depends, Header, HTTPException, Request
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
|
|
from backend import config, db
|
|
|
|
_bearer = HTTPBearer(auto_error=False)
|
|
|
|
_RATE_LIMIT = 5
|
|
_RATE_WINDOW = 600 # 10 minutes
|
|
|
|
_SIGNAL_RATE_LIMIT = 10
|
|
_SIGNAL_RATE_WINDOW = 60 # 1 minute
|
|
|
|
|
|
def _get_client_ip(request: Request) -> str:
|
|
return (
|
|
request.headers.get("X-Real-IP")
|
|
or request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
|
|
or (request.client.host if request.client else "unknown")
|
|
)
|
|
|
|
|
|
async def verify_webhook_secret(
|
|
x_telegram_bot_api_secret_token: str = Header(default=""),
|
|
) -> None:
|
|
if not secrets.compare_digest(
|
|
x_telegram_bot_api_secret_token, config.WEBHOOK_SECRET
|
|
):
|
|
raise HTTPException(status_code=403, detail="Forbidden")
|
|
|
|
|
|
async def verify_admin_token(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer),
|
|
) -> None:
|
|
if credentials is None or not secrets.compare_digest(
|
|
credentials.credentials, config.ADMIN_TOKEN
|
|
):
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
|
|
async def rate_limit_register(request: Request) -> None:
|
|
key = f"reg:{_get_client_ip(request)}"
|
|
count = await db.rate_limit_increment(key, _RATE_WINDOW)
|
|
if count > _RATE_LIMIT:
|
|
raise HTTPException(status_code=429, detail="Too Many Requests")
|
|
|
|
|
|
async def rate_limit_signal(request: Request) -> None:
|
|
key = f"sig:{_get_client_ip(request)}"
|
|
count = await db.rate_limit_increment(key, _SIGNAL_RATE_WINDOW)
|
|
if count > _SIGNAL_RATE_LIMIT:
|
|
raise HTTPException(status_code=429, detail="Too Many Requests")
|