2026-03-20 20:44:00 +02:00
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
import logging
|
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
|
|
import httpx
|
|
|
|
|
|
|
|
|
|
|
|
from backend import config, db
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
_TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}"
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-03-21 08:54:07 +02:00
|
|
|
|
async def validate_bot_token() -> bool:
|
|
|
|
|
|
"""Validate BOT_TOKEN by calling getMe. Logs ERROR if invalid. Never raises."""
|
|
|
|
|
|
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="getMe")
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
|
|
|
|
try:
|
|
|
|
|
|
resp = await client.get(url)
|
|
|
|
|
|
if resp.status_code == 200:
|
|
|
|
|
|
bot_name = resp.json().get("result", {}).get("username", "?")
|
|
|
|
|
|
logger.info("Telegram token valid, bot: @%s", bot_name)
|
|
|
|
|
|
return True
|
|
|
|
|
|
logger.error(
|
|
|
|
|
|
"BOT_TOKEN invalid — getMe returned %s: %s", resp.status_code, resp.text
|
|
|
|
|
|
)
|
|
|
|
|
|
return False
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
logger.error("BOT_TOKEN validation failed (network): %s", exc)
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-03-20 20:44:00 +02:00
|
|
|
|
async def send_message(text: str) -> None:
|
|
|
|
|
|
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="sendMessage")
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
2026-03-21 07:39:41 +02:00
|
|
|
|
for attempt in range(3):
|
2026-03-20 20:44:00 +02:00
|
|
|
|
resp = await client.post(url, json={"chat_id": config.CHAT_ID, "text": text})
|
|
|
|
|
|
if resp.status_code == 429:
|
|
|
|
|
|
retry_after = resp.json().get("parameters", {}).get("retry_after", 30)
|
2026-03-21 07:39:41 +02:00
|
|
|
|
sleep = retry_after * (attempt + 1)
|
|
|
|
|
|
logger.warning("Telegram 429, sleeping %s sec (attempt %d)", sleep, attempt + 1)
|
|
|
|
|
|
await asyncio.sleep(sleep)
|
2026-03-20 20:44:00 +02:00
|
|
|
|
continue
|
|
|
|
|
|
if resp.status_code >= 500:
|
|
|
|
|
|
logger.error("Telegram 5xx: %s", resp.text)
|
|
|
|
|
|
await asyncio.sleep(30)
|
2026-03-21 07:39:41 +02:00
|
|
|
|
continue
|
2026-03-20 20:44:00 +02:00
|
|
|
|
elif resp.status_code != 200:
|
|
|
|
|
|
logger.error("Telegram error %s: %s", resp.status_code, resp.text)
|
|
|
|
|
|
break
|
2026-03-21 07:39:41 +02:00
|
|
|
|
else:
|
|
|
|
|
|
logger.error("Telegram send_message: all 3 attempts failed, message dropped")
|
2026-03-20 20:44:00 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def set_webhook(url: str, secret: str) -> None:
|
|
|
|
|
|
api_url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="setWebhook")
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
|
|
|
|
resp = await client.post(
|
|
|
|
|
|
api_url, json={"url": url, "secret_token": secret}
|
|
|
|
|
|
)
|
|
|
|
|
|
if resp.status_code != 200 or not resp.json().get("result"):
|
|
|
|
|
|
raise RuntimeError(f"setWebhook failed: {resp.text}")
|
|
|
|
|
|
logger.info("Webhook registered: %s", url)
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-03-20 20:50:31 +02:00
|
|
|
|
# v2.0 feature
|
2026-03-20 20:44:00 +02:00
|
|
|
|
class SignalAggregator:
|
|
|
|
|
|
def __init__(self, interval: int = 10) -> None:
|
|
|
|
|
|
self._interval = interval
|
|
|
|
|
|
self._buffer: list[dict] = []
|
|
|
|
|
|
self._lock = asyncio.Lock()
|
|
|
|
|
|
self._stopped = False
|
|
|
|
|
|
|
|
|
|
|
|
async def add_signal(
|
|
|
|
|
|
self,
|
|
|
|
|
|
user_uuid: str,
|
|
|
|
|
|
user_name: Optional[str],
|
|
|
|
|
|
timestamp: int,
|
|
|
|
|
|
geo: Optional[dict],
|
|
|
|
|
|
signal_id: int,
|
|
|
|
|
|
) -> None:
|
|
|
|
|
|
async with self._lock:
|
|
|
|
|
|
self._buffer.append(
|
|
|
|
|
|
{
|
|
|
|
|
|
"user_uuid": user_uuid,
|
|
|
|
|
|
"user_name": user_name,
|
|
|
|
|
|
"timestamp": timestamp,
|
|
|
|
|
|
"geo": geo,
|
|
|
|
|
|
"signal_id": signal_id,
|
|
|
|
|
|
}
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
async def flush(self) -> None:
|
|
|
|
|
|
async with self._lock:
|
|
|
|
|
|
if not self._buffer:
|
|
|
|
|
|
return
|
|
|
|
|
|
items = self._buffer[:]
|
|
|
|
|
|
self._buffer.clear()
|
|
|
|
|
|
|
|
|
|
|
|
signal_ids = [item["signal_id"] for item in items]
|
|
|
|
|
|
timestamps = [item["timestamp"] for item in items]
|
|
|
|
|
|
ts_start = datetime.fromtimestamp(min(timestamps) / 1000, tz=timezone.utc)
|
|
|
|
|
|
ts_end = datetime.fromtimestamp(max(timestamps) / 1000, tz=timezone.utc)
|
|
|
|
|
|
t_fmt = "%H:%M:%S"
|
|
|
|
|
|
|
|
|
|
|
|
names = []
|
|
|
|
|
|
for item in items:
|
|
|
|
|
|
name = item["user_name"]
|
|
|
|
|
|
label = name if name else item["user_uuid"][:8]
|
|
|
|
|
|
names.append(label)
|
|
|
|
|
|
|
|
|
|
|
|
geo_count = sum(1 for item in items if item["geo"])
|
|
|
|
|
|
n = len(items)
|
|
|
|
|
|
|
|
|
|
|
|
text = (
|
|
|
|
|
|
f"\U0001f6a8 Получено {n} сигнал{'ов' if n != 1 else ''} "
|
|
|
|
|
|
f"[{ts_start.strftime(t_fmt)}—{ts_end.strftime(t_fmt)}]\n"
|
|
|
|
|
|
f"Пользователи: {', '.join(names)}\n"
|
|
|
|
|
|
f"\U0001f4cd С геолокацией: {geo_count} из {n}"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
await send_message(text)
|
|
|
|
|
|
await db.save_telegram_batch(text, n, signal_ids)
|
|
|
|
|
|
# rate-limit: 1 msg/sec max (#1014)
|
|
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
logger.exception("Failed to flush aggregator batch")
|
|
|
|
|
|
|
|
|
|
|
|
async def run(self) -> None:
|
|
|
|
|
|
while not self._stopped:
|
|
|
|
|
|
await asyncio.sleep(self._interval)
|
|
|
|
|
|
if self._buffer:
|
|
|
|
|
|
await self.flush()
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self) -> None:
|
|
|
|
|
|
self._stopped = True
|