baton/backend/main.py
2026-03-20 23:39:28 +02:00

217 lines
7.1 KiB
Python

from __future__ import annotations
import asyncio
import hashlib
import logging
import os
import time
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any
import httpx
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from backend import config, db, telegram
from backend.middleware import rate_limit_register, verify_admin_token, verify_webhook_secret
from backend.models import (
AdminBlockRequest,
AdminCreateUserRequest,
AdminSetPasswordRequest,
RegisterRequest,
RegisterResponse,
SignalRequest,
SignalResponse,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _hash_password(password: str) -> str:
"""Hash a password using PBKDF2-HMAC-SHA256 (stdlib, no external deps).
Stored format: ``<salt_hex>:<dk_hex>``
"""
salt = os.urandom(16)
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 260_000)
return f"{salt.hex()}:{dk.hex()}"
# aggregator = telegram.SignalAggregator(interval=10) # v2.0 feature — отключено в v1 (ADR-004)
_KEEPALIVE_INTERVAL = 600 # 10 минут
async def _keep_alive_loop(app_url: str) -> None:
"""Периодически пингует /health чтобы предотвратить cold start на бесплатных хостингах."""
health_url = f"{app_url.rstrip('/')}/health"
async with httpx.AsyncClient(timeout=10.0) as client:
while True:
await asyncio.sleep(_KEEPALIVE_INTERVAL)
try:
resp = await client.get(health_url)
logger.info("Keep-alive ping %s%d", health_url, resp.status_code)
except Exception as exc:
logger.warning("Keep-alive ping failed: %s", exc)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
app.state.rate_counters = {}
await db.init_db()
logger.info("Database initialized")
if config.WEBHOOK_ENABLED:
await telegram.set_webhook(url=config.WEBHOOK_URL, secret=config.WEBHOOK_SECRET)
logger.info("Webhook registered")
# v2.0 feature — агрегатор отключён в v1 (ADR-004)
# task = asyncio.create_task(aggregator.run())
# logger.info("Aggregator started")
keepalive_task: asyncio.Task | None = None
if config.APP_URL:
keepalive_task = asyncio.create_task(_keep_alive_loop(config.APP_URL))
logger.info("Keep-alive task started (target: %s/health)", config.APP_URL)
else:
logger.info("APP_URL not set — keep-alive disabled")
yield
# Shutdown
if keepalive_task is not None:
keepalive_task.cancel()
try:
await keepalive_task
except asyncio.CancelledError:
pass
logger.info("Keep-alive task stopped")
# aggregator.stop()
# await aggregator.flush()
# task.cancel()
# try:
# await task
# except asyncio.CancelledError:
# pass
# logger.info("Aggregator stopped, final flush done")
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=[config.FRONTEND_ORIGIN],
allow_methods=["POST"],
allow_headers=["Content-Type"],
)
@app.get("/health")
async def health() -> dict[str, Any]:
return {"status": "ok", "timestamp": int(time.time())}
@app.post("/api/register", response_model=RegisterResponse)
async def register(body: RegisterRequest, _: None = Depends(rate_limit_register)) -> RegisterResponse:
result = await db.register_user(uuid=body.uuid, name=body.name)
return RegisterResponse(user_id=result["user_id"], uuid=result["uuid"])
@app.post("/api/signal", response_model=SignalResponse)
async def signal(body: SignalRequest) -> SignalResponse:
if await db.is_user_blocked(body.user_id):
raise HTTPException(status_code=403, detail="User is blocked")
geo = body.geo
lat = geo.lat if geo else None
lon = geo.lon if geo else None
accuracy = geo.accuracy if geo else None
signal_id = await db.save_signal(
user_uuid=body.user_id,
timestamp=body.timestamp,
lat=lat,
lon=lon,
accuracy=accuracy,
)
user_name = await db.get_user_name(body.user_id)
ts = datetime.fromtimestamp(body.timestamp / 1000, tz=timezone.utc)
name = user_name or body.user_id[:8]
geo_info = (
f"📍 {lat}, {lon}{accuracy}м)"
if geo
else "Без геолокации"
)
text = (
f"🚨 Сигнал от {name}\n"
f"{ts.strftime('%H:%M:%S')} UTC\n"
f"{geo_info}"
)
await telegram.send_message(text)
return SignalResponse(status="ok", signal_id=signal_id)
@app.get("/admin/users", dependencies=[Depends(verify_admin_token)])
async def admin_list_users() -> list[dict]:
return await db.admin_list_users()
@app.post("/admin/users", status_code=201, dependencies=[Depends(verify_admin_token)])
async def admin_create_user(body: AdminCreateUserRequest) -> dict:
password_hash = _hash_password(body.password) if body.password else None
result = await db.admin_create_user(body.uuid, body.name, password_hash)
if result is None:
raise HTTPException(status_code=409, detail="User with this UUID already exists")
return result
@app.put("/admin/users/{user_id}/password", dependencies=[Depends(verify_admin_token)])
async def admin_set_password(user_id: int, body: AdminSetPasswordRequest) -> dict:
changed = await db.admin_set_password(user_id, _hash_password(body.password))
if not changed:
raise HTTPException(status_code=404, detail="User not found")
return {"ok": True}
@app.put("/admin/users/{user_id}/block", dependencies=[Depends(verify_admin_token)])
async def admin_block_user(user_id: int, body: AdminBlockRequest) -> dict:
changed = await db.admin_set_blocked(user_id, body.is_blocked)
if not changed:
raise HTTPException(status_code=404, detail="User not found")
user = await db.admin_get_user_by_id(user_id)
return user # type: ignore[return-value]
@app.delete("/admin/users/{user_id}", status_code=204, dependencies=[Depends(verify_admin_token)])
async def admin_delete_user(user_id: int) -> None:
deleted = await db.admin_delete_user(user_id)
if not deleted:
raise HTTPException(status_code=404, detail="User not found")
@app.post("/api/webhook/telegram")
async def webhook_telegram(
request: Request,
_: None = Depends(verify_webhook_secret),
) -> dict[str, Any]:
update = await request.json()
message = update.get("message", {})
text = message.get("text", "")
if text.startswith("/start"):
tg_user = message.get("from", {})
tg_user_id = str(tg_user.get("id", ""))
first_name = tg_user.get("first_name", "")
last_name = tg_user.get("last_name", "")
name = (first_name + " " + last_name).strip() or tg_user_id
if tg_user_id:
await db.register_user(uuid=tg_user_id, name=name)
logger.info("Telegram /start: registered user %s", tg_user_id)
return {"ok": True}