from __future__ import annotations import asyncio import hashlib import logging import os import secrets from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Any, Optional import httpx from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from backend import config, db, push, telegram from backend.middleware import rate_limit_auth_register, rate_limit_register, rate_limit_signal, verify_admin_token, verify_webhook_secret from backend.models import ( AdminBlockRequest, AdminCreateUserRequest, AdminSetPasswordRequest, AuthRegisterRequest, AuthRegisterResponse, RegisterRequest, RegisterResponse, SignalRequest, SignalResponse, ) _api_key_bearer = HTTPBearer(auto_error=False) logging.basicConfig(level=logging.INFO) logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger(__name__) def _hash_api_key(key: str) -> str: """SHA-256 хэш для API-ключа (без соли — для быстрого сравнения).""" return hashlib.sha256(key.encode()).hexdigest() def _hash_password(password: str) -> str: """Hash a password using PBKDF2-HMAC-SHA256 (stdlib, no external deps). Stored format: ``:`` """ salt = os.urandom(16) dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 260_000) return f"{salt.hex()}:{dk.hex()}" # aggregator = telegram.SignalAggregator(interval=10) # v2.0 feature — отключено в v1 (ADR-004) _KEEPALIVE_INTERVAL = 600 # 10 минут async def _keep_alive_loop(app_url: str) -> None: """Периодически пингует /health чтобы предотвратить cold start на бесплатных хостингах.""" health_url = f"{app_url.rstrip('/')}/health" async with httpx.AsyncClient(timeout=10.0) as client: while True: await asyncio.sleep(_KEEPALIVE_INTERVAL) try: resp = await client.get(health_url) logger.info("Keep-alive ping %s → %d", health_url, resp.status_code) except Exception as exc: logger.warning("Keep-alive ping failed: %s", exc) @asynccontextmanager async def lifespan(app: FastAPI): # Startup await db.init_db() logger.info("Database initialized") if not await telegram.validate_bot_token(): logger.error( "CRITICAL: BOT_TOKEN is invalid — Telegram delivery is broken. Update .env and restart." ) if config.WEBHOOK_ENABLED: await telegram.set_webhook(url=config.WEBHOOK_URL, secret=config.WEBHOOK_SECRET) logger.info("Webhook registered") # v2.0 feature — агрегатор отключён в v1 (ADR-004) # task = asyncio.create_task(aggregator.run()) # logger.info("Aggregator started") keepalive_task: asyncio.Task | None = None if config.APP_URL: keepalive_task = asyncio.create_task(_keep_alive_loop(config.APP_URL)) logger.info("Keep-alive task started (target: %s/health)", config.APP_URL) else: logger.info("APP_URL not set — keep-alive disabled") yield # Shutdown if keepalive_task is not None: keepalive_task.cancel() try: await keepalive_task except asyncio.CancelledError: pass logger.info("Keep-alive task stopped") # aggregator.stop() # await aggregator.flush() # task.cancel() # try: # await task # except asyncio.CancelledError: # pass # logger.info("Aggregator stopped, final flush done") app = FastAPI(lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=[config.FRONTEND_ORIGIN], allow_methods=["POST"], allow_headers=["Content-Type", "Authorization"], ) @app.get("/health") @app.get("/api/health") async def health() -> dict[str, Any]: return {"status": "ok"} @app.post("/api/register", response_model=RegisterResponse) async def register(body: RegisterRequest, _: None = Depends(rate_limit_register)) -> RegisterResponse: api_key = secrets.token_hex(32) result = await db.register_user(uuid=body.uuid, name=body.name, api_key_hash=_hash_api_key(api_key)) return RegisterResponse(user_id=result["user_id"], uuid=result["uuid"], api_key=api_key) @app.post("/api/signal", response_model=SignalResponse) async def signal( body: SignalRequest, credentials: Optional[HTTPAuthorizationCredentials] = Depends(_api_key_bearer), _: None = Depends(rate_limit_signal), ) -> SignalResponse: if credentials is None: raise HTTPException(status_code=401, detail="Unauthorized") key_hash = _hash_api_key(credentials.credentials) stored_hash = await db.get_api_key_hash_by_uuid(body.user_id) if stored_hash is None or not secrets.compare_digest(key_hash, stored_hash): raise HTTPException(status_code=401, detail="Unauthorized") if await db.is_user_blocked(body.user_id): raise HTTPException(status_code=403, detail="User is blocked") geo = body.geo lat = geo.lat if geo else None lon = geo.lon if geo else None accuracy = geo.accuracy if geo else None signal_id = await db.save_signal( user_uuid=body.user_id, timestamp=body.timestamp, lat=lat, lon=lon, accuracy=accuracy, ) user_name = await db.get_user_name(body.user_id) ts = datetime.fromtimestamp(body.timestamp / 1000, tz=timezone.utc) name = user_name or body.user_id[:8] geo_info = ( f"📍 {lat}, {lon} (±{accuracy}м)" if geo else "Без геолокации" ) text = ( f"🚨 Сигнал от {name}\n" f"⏰ {ts.strftime('%H:%M:%S')} UTC\n" f"{geo_info}" ) asyncio.create_task(telegram.send_message(text)) return SignalResponse(status="ok", signal_id=signal_id) @app.post("/api/auth/register", response_model=AuthRegisterResponse, status_code=201) async def auth_register( body: AuthRegisterRequest, _: None = Depends(rate_limit_auth_register), ) -> AuthRegisterResponse: password_hash = _hash_password(body.password) push_sub_json = ( body.push_subscription.model_dump_json() if body.push_subscription else None ) try: reg_id = await db.create_registration( email=str(body.email), login=body.login, password_hash=password_hash, push_subscription=push_sub_json, ) except Exception as exc: # aiosqlite.IntegrityError on email/login UNIQUE conflict if "UNIQUE" in str(exc) or "unique" in str(exc).lower(): raise HTTPException(status_code=409, detail="Email или логин уже существует") raise reg = await db.get_registration(reg_id) asyncio.create_task( telegram.send_registration_notification( reg_id=reg_id, login=body.login, email=str(body.email), created_at=reg["created_at"] if reg else "", ) ) return AuthRegisterResponse(status="pending", message="Заявка отправлена на рассмотрение") async def _handle_callback_query(cb: dict) -> None: """Process approve/reject callback from admin Telegram inline buttons.""" data = cb.get("data", "") callback_query_id = cb.get("id", "") message = cb.get("message", {}) chat_id = message.get("chat", {}).get("id") message_id = message.get("message_id") if ":" not in data: return action, reg_id_str = data.split(":", 1) try: reg_id = int(reg_id_str) except ValueError: return reg = await db.get_registration(reg_id) if reg is None: await telegram.answer_callback_query(callback_query_id) return if action == "approve": updated = await db.update_registration_status(reg_id, "approved") if not updated: # Already processed (not pending) — ack the callback and stop await telegram.answer_callback_query(callback_query_id) return if chat_id and message_id: await telegram.edit_message_text( chat_id, message_id, f"✅ Пользователь {reg['login']} одобрен" ) if reg["push_subscription"]: asyncio.create_task( push.send_push( reg["push_subscription"], "Baton", "Ваша регистрация одобрена!", ) ) elif action == "reject": updated = await db.update_registration_status(reg_id, "rejected") if not updated: await telegram.answer_callback_query(callback_query_id) return if chat_id and message_id: await telegram.edit_message_text( chat_id, message_id, f"❌ Пользователь {reg['login']} отклонён" ) await telegram.answer_callback_query(callback_query_id) @app.get("/admin/users", dependencies=[Depends(verify_admin_token)]) async def admin_list_users() -> list[dict]: return await db.admin_list_users() @app.post("/admin/users", status_code=201, dependencies=[Depends(verify_admin_token)]) async def admin_create_user(body: AdminCreateUserRequest) -> dict: password_hash = _hash_password(body.password) if body.password else None result = await db.admin_create_user(body.uuid, body.name, password_hash) if result is None: raise HTTPException(status_code=409, detail="User with this UUID already exists") return result @app.put("/admin/users/{user_id}/password", dependencies=[Depends(verify_admin_token)]) async def admin_set_password(user_id: int, body: AdminSetPasswordRequest) -> dict: changed = await db.admin_set_password(user_id, _hash_password(body.password)) if not changed: raise HTTPException(status_code=404, detail="User not found") return {"ok": True} @app.put("/admin/users/{user_id}/block", dependencies=[Depends(verify_admin_token)]) async def admin_block_user(user_id: int, body: AdminBlockRequest) -> dict: changed = await db.admin_set_blocked(user_id, body.is_blocked) if not changed: raise HTTPException(status_code=404, detail="User not found") user = await db.admin_get_user_by_id(user_id) return user # type: ignore[return-value] @app.delete("/admin/users/{user_id}", status_code=204, dependencies=[Depends(verify_admin_token)]) async def admin_delete_user(user_id: int) -> None: deleted = await db.admin_delete_user(user_id) if not deleted: raise HTTPException(status_code=404, detail="User not found") @app.post("/api/webhook/telegram") async def webhook_telegram( request: Request, _: None = Depends(verify_webhook_secret), ) -> dict[str, Any]: update = await request.json() # Handle inline button callback queries (approve/reject registration) callback_query = update.get("callback_query") if callback_query: await _handle_callback_query(callback_query) return {"ok": True} message = update.get("message", {}) text = message.get("text", "") if text.startswith("/start"): tg_user = message.get("from", {}) tg_user_id = str(tg_user.get("id", "")) first_name = tg_user.get("first_name", "") last_name = tg_user.get("last_name", "") name = (first_name + " " + last_name).strip() or tg_user_id if tg_user_id: await db.register_user(uuid=tg_user_id, name=name) logger.info("Telegram /start: registered user %s", tg_user_id) return {"ok": True}