baton/backend/telegram.py

224 lines
8.5 KiB
Python
Raw Normal View History

2026-03-20 20:44:00 +02:00
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional
import httpx
from backend import config, db
logger = logging.getLogger(__name__)
# Suppress httpx/httpcore transport-level logging to prevent BOT_TOKEN URL leakage.
# httpx logs request URLs (which embed the token) at DEBUG/INFO level depending on version.
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
2026-03-20 20:44:00 +02:00
_TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}"
def _mask_token(token: str) -> str:
"""Return a safe representation of the bot token for logging."""
if not token or len(token) < 4:
return "***REDACTED***"
return f"***{token[-4:]}"
async def validate_bot_token() -> bool:
"""Validate BOT_TOKEN by calling getMe. Logs ERROR if invalid. Never raises."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="getMe")
async with httpx.AsyncClient(timeout=10) as client:
try:
resp = await client.get(url)
if resp.status_code == 200:
bot_name = resp.json().get("result", {}).get("username", "?")
logger.info("Telegram token valid, bot: @%s", bot_name)
return True
logger.error(
"BOT_TOKEN invalid — getMe returned %s: %s", resp.status_code, resp.text
)
return False
except Exception as exc:
# Do not log `exc` directly — it may contain the API URL with the token
# embedded (httpx includes request URL in some exception types/versions).
logger.error(
"BOT_TOKEN validation failed (network error): %s — token ends with %s",
type(exc).__name__,
_mask_token(config.BOT_TOKEN),
)
return False
2026-03-20 20:44:00 +02:00
async def send_message(text: str) -> None:
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="sendMessage")
async with httpx.AsyncClient(timeout=10) as client:
2026-03-21 07:39:41 +02:00
for attempt in range(3):
2026-03-20 20:44:00 +02:00
resp = await client.post(url, json={"chat_id": config.CHAT_ID, "text": text})
if resp.status_code == 429:
retry_after = resp.json().get("parameters", {}).get("retry_after", 30)
2026-03-21 07:39:41 +02:00
sleep = retry_after * (attempt + 1)
logger.warning("Telegram 429, sleeping %s sec (attempt %d)", sleep, attempt + 1)
await asyncio.sleep(sleep)
2026-03-20 20:44:00 +02:00
continue
if resp.status_code >= 500:
logger.error("Telegram 5xx: %s", resp.text)
await asyncio.sleep(30)
2026-03-21 07:39:41 +02:00
continue
2026-03-20 20:44:00 +02:00
elif resp.status_code != 200:
logger.error("Telegram error %s: %s", resp.status_code, resp.text)
break
2026-03-21 07:39:41 +02:00
else:
logger.error("Telegram send_message: all 3 attempts failed, message dropped")
2026-03-20 20:44:00 +02:00
2026-03-21 09:19:50 +02:00
async def send_registration_notification(
reg_id: int, login: str, email: str, created_at: str
) -> None:
"""Send registration request notification to admin with approve/reject inline buttons.
Swallows all errors never raises."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="sendMessage")
text = (
f"📋 Новая заявка на регистрацию\n\n"
f"Login: {login}\nEmail: {email}\nДата: {created_at}"
)
reply_markup = {
"inline_keyboard": [[
{"text": "✅ Одобрить", "callback_data": f"approve:{reg_id}"},
{"text": "❌ Отклонить", "callback_data": f"reject:{reg_id}"},
]]
}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
url,
json={
"chat_id": config.ADMIN_CHAT_ID,
"text": text,
"reply_markup": reply_markup,
},
)
if resp.status_code != 200:
logger.error(
"send_registration_notification failed %s: %s",
resp.status_code,
resp.text,
)
except Exception as exc:
# Do not log exc directly — httpx exceptions embed the full API URL with BOT_TOKEN
logger.error("send_registration_notification error: %s", type(exc).__name__)
2026-03-21 09:19:50 +02:00
async def answer_callback_query(callback_query_id: str) -> None:
"""Answer a Telegram callback query. Swallows all errors."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="answerCallbackQuery")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json={"callback_query_id": callback_query_id})
if resp.status_code != 200:
logger.error("answerCallbackQuery failed %s: %s", resp.status_code, resp.text)
except Exception as exc:
# Do not log exc directly — httpx exceptions embed the full API URL with BOT_TOKEN
logger.error("answerCallbackQuery error: %s", type(exc).__name__)
2026-03-21 09:19:50 +02:00
async def edit_message_text(chat_id: str | int, message_id: int, text: str) -> None:
"""Edit a Telegram message text. Swallows all errors."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="editMessageText")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
url, json={"chat_id": chat_id, "message_id": message_id, "text": text}
)
if resp.status_code != 200:
logger.error("editMessageText failed %s: %s", resp.status_code, resp.text)
except Exception as exc:
# Do not log exc directly — httpx exceptions embed the full API URL with BOT_TOKEN
logger.error("editMessageText error: %s", type(exc).__name__)
2026-03-21 09:19:50 +02:00
2026-03-20 20:44:00 +02:00
async def set_webhook(url: str, secret: str) -> None:
api_url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="setWebhook")
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
api_url, json={"url": url, "secret_token": secret}
)
if resp.status_code != 200 or not resp.json().get("result"):
raise RuntimeError(f"setWebhook failed: {resp.text}")
logger.info("Webhook registered: %s", url)
2026-03-20 20:50:31 +02:00
# v2.0 feature
2026-03-20 20:44:00 +02:00
class SignalAggregator:
def __init__(self, interval: int = 10) -> None:
self._interval = interval
self._buffer: list[dict] = []
self._lock = asyncio.Lock()
self._stopped = False
async def add_signal(
self,
user_uuid: str,
user_name: Optional[str],
timestamp: int,
geo: Optional[dict],
signal_id: int,
) -> None:
async with self._lock:
self._buffer.append(
{
"user_uuid": user_uuid,
"user_name": user_name,
"timestamp": timestamp,
"geo": geo,
"signal_id": signal_id,
}
)
async def flush(self) -> None:
async with self._lock:
if not self._buffer:
return
items = self._buffer[:]
self._buffer.clear()
signal_ids = [item["signal_id"] for item in items]
timestamps = [item["timestamp"] for item in items]
ts_start = datetime.fromtimestamp(min(timestamps) / 1000, tz=timezone.utc)
ts_end = datetime.fromtimestamp(max(timestamps) / 1000, tz=timezone.utc)
t_fmt = "%H:%M:%S"
names = []
for item in items:
name = item["user_name"]
label = name if name else item["user_uuid"][:8]
names.append(label)
geo_count = sum(1 for item in items if item["geo"])
n = len(items)
text = (
f"\U0001f6a8 Получено {n} сигнал{'ов' if n != 1 else ''} "
f"[{ts_start.strftime(t_fmt)}{ts_end.strftime(t_fmt)}]\n"
f"Пользователи: {', '.join(names)}\n"
f"\U0001f4cd С геолокацией: {geo_count} из {n}"
)
try:
await send_message(text)
await db.save_telegram_batch(text, n, signal_ids)
# rate-limit: 1 msg/sec max (#1014)
await asyncio.sleep(1)
except Exception:
logger.exception("Failed to flush aggregator batch")
async def run(self) -> None:
while not self._stopped:
await asyncio.sleep(self._interval)
if self._buffer:
await self.flush()
def stop(self) -> None:
self._stopped = True