sec: server-side email domain check + IP block on violations

Only @tutlot.com emails allowed for registration (checked server-side,
invisible to frontend inspect). Wrong domain → scary message + IP
violation tracked. 5 violations → IP permanently blocked from login
and registration. Block screen with OK button on frontend.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Gros Frumos 2026-03-21 15:58:16 +02:00
parent 47b89ded8d
commit 0562cb4e47
8 changed files with 123 additions and 30 deletions

View file

@ -18,7 +18,9 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from backend import config, db, push, telegram
from backend.middleware import (
_get_client_ip,
_verify_jwt_token,
check_ip_not_blocked,
create_auth_token,
rate_limit_auth_login,
rate_limit_auth_register,
@ -237,18 +239,35 @@ async def signal(
return SignalResponse(status="ok", signal_id=signal_id)
_ALLOWED_EMAIL_DOMAIN = "tutlot.com"
_VIOLATION_BLOCK_THRESHOLD = 5
@app.post("/api/auth/register", response_model=AuthRegisterResponse, status_code=201)
async def auth_register(
request: Request,
body: AuthRegisterRequest,
_: None = Depends(rate_limit_auth_register),
__: None = Depends(check_ip_not_blocked),
) -> AuthRegisterResponse:
# Domain verification (server-side only)
email_str = str(body.email)
domain = email_str.rsplit("@", 1)[-1].lower() if "@" in email_str else ""
if domain != _ALLOWED_EMAIL_DOMAIN:
client_ip = _get_client_ip(request)
count = await db.record_ip_violation(client_ip)
logger.warning("Domain violation from %s (attempt %d): %s", client_ip, count, email_str)
raise HTTPException(
status_code=403,
detail="Ваш IP отправлен компетентным службам и за вами уже выехали. Ожидайте.",
)
password_hash = _hash_password(body.password)
push_sub_json = (
body.push_subscription.model_dump_json() if body.push_subscription else None
)
try:
reg_id = await db.create_registration(
email=str(body.email),
email=email_str,
login=body.login,
password_hash=password_hash,
push_subscription=push_sub_json,
@ -263,7 +282,7 @@ async def auth_register(
telegram.send_registration_notification(
reg_id=reg_id,
login=body.login,
email=str(body.email),
email=email_str,
created_at=reg["created_at"] if reg else "",
)
)
@ -274,6 +293,7 @@ async def auth_register(
async def auth_login(
body: AuthLoginRequest,
_: None = Depends(rate_limit_auth_login),
__: None = Depends(check_ip_not_blocked),
) -> AuthLoginResponse:
reg = await db.get_registration_by_login_or_email(body.login_or_email)
if reg is None or not _verify_password(body.password, reg["password_hash"]):