baton/backend/main.py
Gros Frumos 268fb62bf3 feat: test signal via avatar/indicator tap on main screen
Tapping user avatar or network indicator sends a test signal with
geo data. Backend formats it as "Тест от username" (🧪) instead of
"Сигнал" (🚨). Only active after login on main screen.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 16:06:02 +02:00

434 lines
15 KiB
Python

from __future__ import annotations
import asyncio
import hashlib
import hmac
import logging
import os
import secrets
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any, Optional
import httpx
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from backend import config, db, push, telegram
from backend.middleware import (
_get_client_ip,
_verify_jwt_token,
check_ip_not_blocked,
create_auth_token,
rate_limit_auth_login,
rate_limit_auth_register,
rate_limit_register,
rate_limit_signal,
verify_admin_token,
verify_webhook_secret,
)
from backend.models import (
AdminBlockRequest,
AdminCreateUserRequest,
AdminSetPasswordRequest,
AuthLoginRequest,
AuthLoginResponse,
AuthRegisterRequest,
AuthRegisterResponse,
RegisterRequest,
RegisterResponse,
SignalRequest,
SignalResponse,
)
_api_key_bearer = HTTPBearer(auto_error=False)
logging.basicConfig(level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
def _hash_api_key(key: str) -> str:
"""SHA-256 хэш для API-ключа (без соли — для быстрого сравнения)."""
return hashlib.sha256(key.encode()).hexdigest()
def _hash_password(password: str) -> str:
"""Hash a password using PBKDF2-HMAC-SHA256 (stdlib, no external deps).
Stored format: ``<salt_hex>:<dk_hex>``
"""
salt = os.urandom(16)
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 260_000)
return f"{salt.hex()}:{dk.hex()}"
def _verify_password(password: str, stored_hash: str) -> bool:
"""Verify a password against a stored PBKDF2-HMAC-SHA256 hash (salt_hex:dk_hex)."""
try:
salt_hex, dk_hex = stored_hash.split(":", 1)
salt = bytes.fromhex(salt_hex)
expected_dk = bytes.fromhex(dk_hex)
actual_dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 260_000)
return hmac.compare_digest(actual_dk, expected_dk)
except Exception:
return False
# aggregator = telegram.SignalAggregator(interval=10) # v2.0 feature — отключено в v1 (ADR-004)
_KEEPALIVE_INTERVAL = 600 # 10 минут
async def _keep_alive_loop(app_url: str) -> None:
"""Периодически пингует /health чтобы предотвратить cold start на бесплатных хостингах."""
health_url = f"{app_url.rstrip('/')}/health"
async with httpx.AsyncClient(timeout=10.0) as client:
while True:
await asyncio.sleep(_KEEPALIVE_INTERVAL)
try:
resp = await client.get(health_url)
logger.info("Keep-alive ping %s%d", health_url, resp.status_code)
except Exception as exc:
logger.warning("Keep-alive ping failed: %s", exc)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await db.init_db()
logger.info("Database initialized")
if not await telegram.validate_bot_token():
logger.error(
"CRITICAL: BOT_TOKEN is invalid — Telegram delivery is broken. Update .env and restart."
)
if config.WEBHOOK_ENABLED:
await telegram.set_webhook(url=config.WEBHOOK_URL, secret=config.WEBHOOK_SECRET)
logger.info("Webhook registered")
# v2.0 feature — агрегатор отключён в v1 (ADR-004)
# task = asyncio.create_task(aggregator.run())
# logger.info("Aggregator started")
keepalive_task: asyncio.Task | None = None
if config.APP_URL:
keepalive_task = asyncio.create_task(_keep_alive_loop(config.APP_URL))
logger.info("Keep-alive task started (target: %s/health)", config.APP_URL)
else:
logger.info("APP_URL not set — keep-alive disabled")
yield
# Shutdown
if keepalive_task is not None:
keepalive_task.cancel()
try:
await keepalive_task
except asyncio.CancelledError:
pass
logger.info("Keep-alive task stopped")
# aggregator.stop()
# await aggregator.flush()
# task.cancel()
# try:
# await task
# except asyncio.CancelledError:
# pass
# logger.info("Aggregator stopped, final flush done")
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=[config.FRONTEND_ORIGIN],
allow_methods=["GET", "HEAD", "OPTIONS", "POST"],
allow_headers=["Content-Type", "Authorization"],
)
@app.get("/health")
@app.get("/api/health")
async def health() -> dict[str, Any]:
return {"status": "ok"}
@app.get("/api/vapid-public-key")
@app.get("/api/push/public-key")
async def vapid_public_key() -> dict[str, str]:
return {"vapid_public_key": config.VAPID_PUBLIC_KEY}
@app.post("/api/register", response_model=RegisterResponse)
async def register(body: RegisterRequest, _: None = Depends(rate_limit_register)) -> RegisterResponse:
api_key = secrets.token_hex(32)
result = await db.register_user(uuid=body.uuid, name=body.name, api_key_hash=_hash_api_key(api_key))
return RegisterResponse(user_id=result["user_id"], uuid=result["uuid"], api_key=api_key)
@app.post("/api/signal", response_model=SignalResponse)
async def signal(
body: SignalRequest,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(_api_key_bearer),
_: None = Depends(rate_limit_signal),
) -> SignalResponse:
if credentials is None:
raise HTTPException(status_code=401, detail="Unauthorized")
user_identifier: str = ""
user_name: str = ""
# Try JWT auth first (new registration flow)
jwt_payload = None
try:
jwt_payload = _verify_jwt_token(credentials.credentials)
except Exception:
pass
if jwt_payload is not None:
reg_id = int(jwt_payload["sub"])
reg = await db.get_registration(reg_id)
if reg is None or reg["status"] != "approved":
raise HTTPException(status_code=401, detail="Unauthorized")
user_identifier = reg["login"]
user_name = reg["login"]
else:
# Legacy api_key auth
if not body.user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
key_hash = _hash_api_key(credentials.credentials)
stored_hash = await db.get_api_key_hash_by_uuid(body.user_id)
if stored_hash is None or not secrets.compare_digest(key_hash, stored_hash):
raise HTTPException(status_code=401, detail="Unauthorized")
if await db.is_user_blocked(body.user_id):
raise HTTPException(status_code=403, detail="User is blocked")
user_identifier = body.user_id
user_name = await db.get_user_name(body.user_id) or body.user_id[:8]
geo = body.geo
lat = geo.lat if geo else None
lon = geo.lon if geo else None
accuracy = geo.accuracy if geo else None
signal_id = await db.save_signal(
user_uuid=user_identifier,
timestamp=body.timestamp,
lat=lat,
lon=lon,
accuracy=accuracy,
)
ts = datetime.fromtimestamp(body.timestamp / 1000, tz=timezone.utc)
geo_info = (
f"📍 <a href=\"https://maps.google.com/maps?q={lat},{lon}\">{lat}, {lon}</a> (±{accuracy:.0f}м)"
if geo
else "Гео нету"
)
if body.is_test:
text = (
f"🧪 Тест от {user_name}\n"
f"{ts.strftime('%H:%M:%S')} UTC\n"
f"{geo_info}"
)
else:
text = (
f"🚨 Сигнал от {user_name}\n"
f"{ts.strftime('%H:%M:%S')} UTC\n"
f"{geo_info}"
)
asyncio.create_task(telegram.send_message(text))
return SignalResponse(status="ok", signal_id=signal_id)
_ALLOWED_EMAIL_DOMAIN = "tutlot.com"
_VIOLATION_BLOCK_THRESHOLD = 5
@app.post("/api/auth/register", response_model=AuthRegisterResponse, status_code=201)
async def auth_register(
request: Request,
body: AuthRegisterRequest,
_: None = Depends(rate_limit_auth_register),
__: None = Depends(check_ip_not_blocked),
) -> AuthRegisterResponse:
# Domain verification (server-side only)
email_str = str(body.email)
domain = email_str.rsplit("@", 1)[-1].lower() if "@" in email_str else ""
if domain != _ALLOWED_EMAIL_DOMAIN:
client_ip = _get_client_ip(request)
count = await db.record_ip_violation(client_ip)
logger.warning("Domain violation from %s (attempt %d): %s", client_ip, count, email_str)
raise HTTPException(
status_code=403,
detail="Ваш IP отправлен компетентным службам и за вами уже выехали. Ожидайте.",
)
password_hash = _hash_password(body.password)
push_sub_json = (
body.push_subscription.model_dump_json() if body.push_subscription else None
)
try:
reg_id = await db.create_registration(
email=email_str,
login=body.login,
password_hash=password_hash,
push_subscription=push_sub_json,
)
except Exception as exc:
# aiosqlite.IntegrityError on email/login UNIQUE conflict
if "UNIQUE" in str(exc) or "unique" in str(exc).lower():
raise HTTPException(status_code=409, detail="Email или логин уже существует")
raise
reg = await db.get_registration(reg_id)
asyncio.create_task(
telegram.send_registration_notification(
reg_id=reg_id,
login=body.login,
email=email_str,
created_at=reg["created_at"] if reg else "",
)
)
return AuthRegisterResponse(status="pending", message="Заявка отправлена на рассмотрение")
@app.post("/api/auth/login", response_model=AuthLoginResponse)
async def auth_login(
body: AuthLoginRequest,
_: None = Depends(rate_limit_auth_login),
__: None = Depends(check_ip_not_blocked),
) -> AuthLoginResponse:
reg = await db.get_registration_by_login_or_email(body.login_or_email)
if reg is None or not _verify_password(body.password, reg["password_hash"]):
raise HTTPException(status_code=401, detail="Неверный логин или пароль")
if reg["status"] == "pending":
raise HTTPException(status_code=403, detail="Ваша заявка ожидает рассмотрения")
if reg["status"] == "rejected":
raise HTTPException(status_code=403, detail="Ваша заявка отклонена")
if reg["status"] != "approved":
raise HTTPException(status_code=403, detail="Доступ запрещён")
token = create_auth_token(reg["id"], reg["login"])
return AuthLoginResponse(token=token, login=reg["login"])
async def _handle_callback_query(cb: dict) -> None:
"""Process approve/reject callback from admin Telegram inline buttons."""
data = cb.get("data", "")
callback_query_id = cb.get("id", "")
message = cb.get("message", {})
chat_id = message.get("chat", {}).get("id")
message_id = message.get("message_id")
if ":" not in data:
return
action, reg_id_str = data.split(":", 1)
try:
reg_id = int(reg_id_str)
except ValueError:
return
reg = await db.get_registration(reg_id)
if reg is None:
await telegram.answer_callback_query(callback_query_id)
return
if action == "approve":
updated = await db.update_registration_status(reg_id, "approved")
if not updated:
# Already processed (not pending) — ack the callback and stop
await telegram.answer_callback_query(callback_query_id)
return
if chat_id and message_id:
await telegram.edit_message_text(
chat_id, message_id, f"✅ Пользователь {reg['login']} одобрен"
)
if reg["push_subscription"]:
asyncio.create_task(
push.send_push(
reg["push_subscription"],
"Baton",
"Ваша регистрация одобрена!",
)
)
elif action == "reject":
updated = await db.update_registration_status(reg_id, "rejected")
if not updated:
await telegram.answer_callback_query(callback_query_id)
return
if chat_id and message_id:
await telegram.edit_message_text(
chat_id, message_id, f"❌ Пользователь {reg['login']} отклонён"
)
await telegram.answer_callback_query(callback_query_id)
@app.get("/admin/users", dependencies=[Depends(verify_admin_token)])
async def admin_list_users() -> list[dict]:
return await db.admin_list_users()
@app.post("/admin/users", status_code=201, dependencies=[Depends(verify_admin_token)])
async def admin_create_user(body: AdminCreateUserRequest) -> dict:
password_hash = _hash_password(body.password) if body.password else None
result = await db.admin_create_user(body.uuid, body.name, password_hash)
if result is None:
raise HTTPException(status_code=409, detail="User with this UUID already exists")
return result
@app.put("/admin/users/{user_id}/password", dependencies=[Depends(verify_admin_token)])
async def admin_set_password(user_id: int, body: AdminSetPasswordRequest) -> dict:
changed = await db.admin_set_password(user_id, _hash_password(body.password))
if not changed:
raise HTTPException(status_code=404, detail="User not found")
return {"ok": True}
@app.put("/admin/users/{user_id}/block", dependencies=[Depends(verify_admin_token)])
async def admin_block_user(user_id: int, body: AdminBlockRequest) -> dict:
changed = await db.admin_set_blocked(user_id, body.is_blocked)
if not changed:
raise HTTPException(status_code=404, detail="User not found")
user = await db.admin_get_user_by_id(user_id)
return user # type: ignore[return-value]
@app.delete("/admin/users/{user_id}", status_code=204, dependencies=[Depends(verify_admin_token)])
async def admin_delete_user(user_id: int) -> None:
deleted = await db.admin_delete_user(user_id)
if not deleted:
raise HTTPException(status_code=404, detail="User not found")
@app.post("/api/webhook/telegram")
async def webhook_telegram(
request: Request,
_: None = Depends(verify_webhook_secret),
) -> dict[str, Any]:
update = await request.json()
# Handle inline button callback queries (approve/reject registration)
callback_query = update.get("callback_query")
if callback_query:
await _handle_callback_query(callback_query)
return {"ok": True}
message = update.get("message", {})
text = message.get("text", "")
if text.startswith("/start"):
tg_user = message.get("from", {})
tg_user_id = str(tg_user.get("id", ""))
first_name = tg_user.get("first_name", "")
last_name = tg_user.get("last_name", "")
name = (first_name + " " + last_name).strip() or tg_user_id
if tg_user_id:
await db.register_user(uuid=tg_user_id, name=name)
logger.info("Telegram /start: registered user %s", tg_user_id)
return {"ok": True}