baton/backend/telegram.py
2026-03-21 09:24:31 +02:00

220 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional
import httpx
from backend import config, db
logger = logging.getLogger(__name__)
# Suppress httpx/httpcore transport-level logging to prevent BOT_TOKEN URL leakage.
# httpx logs request URLs (which embed the token) at DEBUG/INFO level depending on version.
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
_TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}"
def _mask_token(token: str) -> str:
"""Return a safe representation of the bot token for logging."""
if not token or len(token) < 4:
return "***REDACTED***"
return f"***{token[-4:]}"
async def validate_bot_token() -> bool:
"""Validate BOT_TOKEN by calling getMe. Logs ERROR if invalid. Never raises."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="getMe")
async with httpx.AsyncClient(timeout=10) as client:
try:
resp = await client.get(url)
if resp.status_code == 200:
bot_name = resp.json().get("result", {}).get("username", "?")
logger.info("Telegram token valid, bot: @%s", bot_name)
return True
logger.error(
"BOT_TOKEN invalid — getMe returned %s: %s", resp.status_code, resp.text
)
return False
except Exception as exc:
# Do not log `exc` directly — it may contain the API URL with the token
# embedded (httpx includes request URL in some exception types/versions).
logger.error(
"BOT_TOKEN validation failed (network error): %s — token ends with %s",
type(exc).__name__,
_mask_token(config.BOT_TOKEN),
)
return False
async def send_message(text: str) -> None:
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="sendMessage")
async with httpx.AsyncClient(timeout=10) as client:
for attempt in range(3):
resp = await client.post(url, json={"chat_id": config.CHAT_ID, "text": text})
if resp.status_code == 429:
retry_after = resp.json().get("parameters", {}).get("retry_after", 30)
sleep = retry_after * (attempt + 1)
logger.warning("Telegram 429, sleeping %s sec (attempt %d)", sleep, attempt + 1)
await asyncio.sleep(sleep)
continue
if resp.status_code >= 500:
logger.error("Telegram 5xx: %s", resp.text)
await asyncio.sleep(30)
continue
elif resp.status_code != 200:
logger.error("Telegram error %s: %s", resp.status_code, resp.text)
break
else:
logger.error("Telegram send_message: all 3 attempts failed, message dropped")
async def send_registration_notification(
reg_id: int, login: str, email: str, created_at: str
) -> None:
"""Send registration request notification to admin with approve/reject inline buttons.
Swallows all errors — never raises."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="sendMessage")
text = (
f"📋 Новая заявка на регистрацию\n\n"
f"Login: {login}\nEmail: {email}\nДата: {created_at}"
)
reply_markup = {
"inline_keyboard": [[
{"text": "✅ Одобрить", "callback_data": f"approve:{reg_id}"},
{"text": "❌ Отклонить", "callback_data": f"reject:{reg_id}"},
]]
}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
url,
json={
"chat_id": config.ADMIN_CHAT_ID,
"text": text,
"reply_markup": reply_markup,
},
)
if resp.status_code != 200:
logger.error(
"send_registration_notification failed %s: %s",
resp.status_code,
resp.text,
)
except Exception as exc:
logger.error("send_registration_notification error: %s", exc)
async def answer_callback_query(callback_query_id: str) -> None:
"""Answer a Telegram callback query. Swallows all errors."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="answerCallbackQuery")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json={"callback_query_id": callback_query_id})
if resp.status_code != 200:
logger.error("answerCallbackQuery failed %s: %s", resp.status_code, resp.text)
except Exception as exc:
logger.error("answerCallbackQuery error: %s", exc)
async def edit_message_text(chat_id: str | int, message_id: int, text: str) -> None:
"""Edit a Telegram message text. Swallows all errors."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="editMessageText")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
url, json={"chat_id": chat_id, "message_id": message_id, "text": text}
)
if resp.status_code != 200:
logger.error("editMessageText failed %s: %s", resp.status_code, resp.text)
except Exception as exc:
logger.error("editMessageText error: %s", exc)
async def set_webhook(url: str, secret: str) -> None:
api_url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="setWebhook")
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
api_url, json={"url": url, "secret_token": secret}
)
if resp.status_code != 200 or not resp.json().get("result"):
raise RuntimeError(f"setWebhook failed: {resp.text}")
logger.info("Webhook registered: %s", url)
# v2.0 feature
class SignalAggregator:
def __init__(self, interval: int = 10) -> None:
self._interval = interval
self._buffer: list[dict] = []
self._lock = asyncio.Lock()
self._stopped = False
async def add_signal(
self,
user_uuid: str,
user_name: Optional[str],
timestamp: int,
geo: Optional[dict],
signal_id: int,
) -> None:
async with self._lock:
self._buffer.append(
{
"user_uuid": user_uuid,
"user_name": user_name,
"timestamp": timestamp,
"geo": geo,
"signal_id": signal_id,
}
)
async def flush(self) -> None:
async with self._lock:
if not self._buffer:
return
items = self._buffer[:]
self._buffer.clear()
signal_ids = [item["signal_id"] for item in items]
timestamps = [item["timestamp"] for item in items]
ts_start = datetime.fromtimestamp(min(timestamps) / 1000, tz=timezone.utc)
ts_end = datetime.fromtimestamp(max(timestamps) / 1000, tz=timezone.utc)
t_fmt = "%H:%M:%S"
names = []
for item in items:
name = item["user_name"]
label = name if name else item["user_uuid"][:8]
names.append(label)
geo_count = sum(1 for item in items if item["geo"])
n = len(items)
text = (
f"\U0001f6a8 Получено {n} сигнал{'ов' if n != 1 else ''} "
f"[{ts_start.strftime(t_fmt)}{ts_end.strftime(t_fmt)}]\n"
f"Пользователи: {', '.join(names)}\n"
f"\U0001f4cd С геолокацией: {geo_count} из {n}"
)
try:
await send_message(text)
await db.save_telegram_batch(text, n, signal_ids)
# rate-limit: 1 msg/sec max (#1014)
await asyncio.sleep(1)
except Exception:
logger.exception("Failed to flush aggregator batch")
async def run(self) -> None:
while not self._stopped:
await asyncio.sleep(self._interval)
if self._buffer:
await self.flush()
def stop(self) -> None:
self._stopped = True