kin: BATON-BIZ-004-backend_dev
This commit is contained in:
parent
86a41a3b35
commit
e266b6506e
1 changed files with 0 additions and 78 deletions
|
|
@ -11,11 +11,6 @@ from backend import config, db
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Suppress httpx/httpcore transport-level logging to prevent BOT_TOKEN URL leakage.
|
|
||||||
# httpx logs request URLs (which embed the token) at DEBUG/INFO level depending on version.
|
|
||||||
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
||||||
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
|
||||||
|
|
||||||
_TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}"
|
_TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}"
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -148,76 +143,3 @@ async def set_webhook(url: str, secret: str) -> None:
|
||||||
raise RuntimeError(f"setWebhook failed: {resp.text}")
|
raise RuntimeError(f"setWebhook failed: {resp.text}")
|
||||||
logger.info("Webhook registered: %s", url)
|
logger.info("Webhook registered: %s", url)
|
||||||
|
|
||||||
|
|
||||||
# v2.0 feature
|
|
||||||
class SignalAggregator:
|
|
||||||
def __init__(self, interval: int = 10) -> None:
|
|
||||||
self._interval = interval
|
|
||||||
self._buffer: list[dict] = []
|
|
||||||
self._lock = asyncio.Lock()
|
|
||||||
self._stopped = False
|
|
||||||
|
|
||||||
async def add_signal(
|
|
||||||
self,
|
|
||||||
user_uuid: str,
|
|
||||||
user_name: Optional[str],
|
|
||||||
timestamp: int,
|
|
||||||
geo: Optional[dict],
|
|
||||||
signal_id: int,
|
|
||||||
) -> None:
|
|
||||||
async with self._lock:
|
|
||||||
self._buffer.append(
|
|
||||||
{
|
|
||||||
"user_uuid": user_uuid,
|
|
||||||
"user_name": user_name,
|
|
||||||
"timestamp": timestamp,
|
|
||||||
"geo": geo,
|
|
||||||
"signal_id": signal_id,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
async def flush(self) -> None:
|
|
||||||
async with self._lock:
|
|
||||||
if not self._buffer:
|
|
||||||
return
|
|
||||||
items = self._buffer[:]
|
|
||||||
self._buffer.clear()
|
|
||||||
|
|
||||||
signal_ids = [item["signal_id"] for item in items]
|
|
||||||
timestamps = [item["timestamp"] for item in items]
|
|
||||||
ts_start = datetime.fromtimestamp(min(timestamps) / 1000, tz=timezone.utc)
|
|
||||||
ts_end = datetime.fromtimestamp(max(timestamps) / 1000, tz=timezone.utc)
|
|
||||||
t_fmt = "%H:%M:%S"
|
|
||||||
|
|
||||||
names = []
|
|
||||||
for item in items:
|
|
||||||
name = item["user_name"]
|
|
||||||
label = name if name else item["user_uuid"][:8]
|
|
||||||
names.append(label)
|
|
||||||
|
|
||||||
geo_count = sum(1 for item in items if item["geo"])
|
|
||||||
n = len(items)
|
|
||||||
|
|
||||||
text = (
|
|
||||||
f"\U0001f6a8 Получено {n} сигнал{'ов' if n != 1 else ''} "
|
|
||||||
f"[{ts_start.strftime(t_fmt)}—{ts_end.strftime(t_fmt)}]\n"
|
|
||||||
f"Пользователи: {', '.join(names)}\n"
|
|
||||||
f"\U0001f4cd С геолокацией: {geo_count} из {n}"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
await send_message(text)
|
|
||||||
await db.save_telegram_batch(text, n, signal_ids)
|
|
||||||
# rate-limit: 1 msg/sec max (#1014)
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Failed to flush aggregator batch")
|
|
||||||
|
|
||||||
async def run(self) -> None:
|
|
||||||
while not self._stopped:
|
|
||||||
await asyncio.sleep(self._interval)
|
|
||||||
if self._buffer:
|
|
||||||
await self.flush()
|
|
||||||
|
|
||||||
def stop(self) -> None:
|
|
||||||
self._stopped = True
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue