from __future__ import annotations import asyncio import logging from datetime import datetime, timezone from typing import Optional import httpx from backend import config, db logger = logging.getLogger(__name__) _TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}" async def send_message(text: str) -> None: url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="sendMessage") async with httpx.AsyncClient(timeout=10) as client: for attempt in range(3): resp = await client.post(url, json={"chat_id": config.CHAT_ID, "text": text}) if resp.status_code == 429: retry_after = resp.json().get("parameters", {}).get("retry_after", 30) sleep = retry_after * (attempt + 1) logger.warning("Telegram 429, sleeping %s sec (attempt %d)", sleep, attempt + 1) await asyncio.sleep(sleep) continue if resp.status_code >= 500: logger.error("Telegram 5xx: %s", resp.text) await asyncio.sleep(30) continue elif resp.status_code != 200: logger.error("Telegram error %s: %s", resp.status_code, resp.text) break else: logger.error("Telegram send_message: all 3 attempts failed, message dropped") async def set_webhook(url: str, secret: str) -> None: api_url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="setWebhook") async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( api_url, json={"url": url, "secret_token": secret} ) if resp.status_code != 200 or not resp.json().get("result"): raise RuntimeError(f"setWebhook failed: {resp.text}") logger.info("Webhook registered: %s", url) # v2.0 feature class SignalAggregator: def __init__(self, interval: int = 10) -> None: self._interval = interval self._buffer: list[dict] = [] self._lock = asyncio.Lock() self._stopped = False async def add_signal( self, user_uuid: str, user_name: Optional[str], timestamp: int, geo: Optional[dict], signal_id: int, ) -> None: async with self._lock: self._buffer.append( { "user_uuid": user_uuid, "user_name": user_name, "timestamp": timestamp, "geo": geo, "signal_id": signal_id, } ) async def flush(self) -> None: async with self._lock: if not self._buffer: return items = self._buffer[:] self._buffer.clear() signal_ids = [item["signal_id"] for item in items] timestamps = [item["timestamp"] for item in items] ts_start = datetime.fromtimestamp(min(timestamps) / 1000, tz=timezone.utc) ts_end = datetime.fromtimestamp(max(timestamps) / 1000, tz=timezone.utc) t_fmt = "%H:%M:%S" names = [] for item in items: name = item["user_name"] label = name if name else item["user_uuid"][:8] names.append(label) geo_count = sum(1 for item in items if item["geo"]) n = len(items) text = ( f"\U0001f6a8 Получено {n} сигнал{'ов' if n != 1 else ''} " f"[{ts_start.strftime(t_fmt)}—{ts_end.strftime(t_fmt)}]\n" f"Пользователи: {', '.join(names)}\n" f"\U0001f4cd С геолокацией: {geo_count} из {n}" ) try: await send_message(text) await db.save_telegram_batch(text, n, signal_ids) # rate-limit: 1 msg/sec max (#1014) await asyncio.sleep(1) except Exception: logger.exception("Failed to flush aggregator batch") async def run(self) -> None: while not self._stopped: await asyncio.sleep(self._interval) if self._buffer: await self.flush() def stop(self) -> None: self._stopped = True