baton/backend/middleware.py

68 lines
2.1 KiB
Python
Raw Permalink Normal View History

2026-03-20 20:44:00 +02:00
from __future__ import annotations
import secrets
2026-03-20 23:39:28 +02:00
from typing import Optional
2026-03-20 23:39:28 +02:00
from fastapi import Depends, Header, HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
2026-03-20 20:44:00 +02:00
2026-03-21 07:56:44 +02:00
from backend import config, db
2026-03-20 20:44:00 +02:00
2026-03-20 23:39:28 +02:00
_bearer = HTTPBearer(auto_error=False)
_RATE_LIMIT = 5
_RATE_WINDOW = 600 # 10 minutes
2026-03-21 07:36:33 +02:00
_SIGNAL_RATE_LIMIT = 10
_SIGNAL_RATE_WINDOW = 60 # 1 minute
2026-03-21 09:19:50 +02:00
_AUTH_REGISTER_RATE_LIMIT = 3
_AUTH_REGISTER_RATE_WINDOW = 600 # 10 minutes
2026-03-21 07:36:33 +02:00
def _get_client_ip(request: Request) -> str:
return (
request.headers.get("X-Real-IP")
or request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
or (request.client.host if request.client else "unknown")
)
2026-03-20 20:44:00 +02:00
async def verify_webhook_secret(
x_telegram_bot_api_secret_token: str = Header(default=""),
) -> None:
if not secrets.compare_digest(
x_telegram_bot_api_secret_token, config.WEBHOOK_SECRET
):
2026-03-20 20:44:00 +02:00
raise HTTPException(status_code=403, detail="Forbidden")
2026-03-20 23:39:28 +02:00
async def verify_admin_token(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer),
) -> None:
if credentials is None or not secrets.compare_digest(
credentials.credentials, config.ADMIN_TOKEN
):
raise HTTPException(status_code=401, detail="Unauthorized")
async def rate_limit_register(request: Request) -> None:
2026-03-21 07:56:44 +02:00
key = f"reg:{_get_client_ip(request)}"
count = await db.rate_limit_increment(key, _RATE_WINDOW)
if count > _RATE_LIMIT:
raise HTTPException(status_code=429, detail="Too Many Requests")
2026-03-21 07:36:33 +02:00
async def rate_limit_signal(request: Request) -> None:
key = f"sig:{_get_client_ip(request)}"
2026-03-21 07:56:44 +02:00
count = await db.rate_limit_increment(key, _SIGNAL_RATE_WINDOW)
2026-03-21 07:36:33 +02:00
if count > _SIGNAL_RATE_LIMIT:
raise HTTPException(status_code=429, detail="Too Many Requests")
2026-03-21 09:19:50 +02:00
async def rate_limit_auth_register(request: Request) -> None:
key = f"authreg:{_get_client_ip(request)}"
count = await db.rate_limit_increment(key, _AUTH_REGISTER_RATE_WINDOW)
if count > _AUTH_REGISTER_RATE_LIMIT:
raise HTTPException(status_code=429, detail="Too Many Requests")