kin: BATON-ARCH-002-backend_dev
This commit is contained in:
parent
057e500d5f
commit
a1279b92e6
3 changed files with 36 additions and 29 deletions
|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
|||
import asyncio
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Depends, FastAPI, Request
|
||||
|
|
@ -21,7 +22,7 @@ from backend.models import (
|
|||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
aggregator = telegram.SignalAggregator(interval=10)
|
||||
# aggregator = telegram.SignalAggregator(interval=10) # v2.0 feature — отключено в v1 (ADR-004)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
|
@ -33,20 +34,21 @@ async def lifespan(app: FastAPI):
|
|||
await telegram.set_webhook(url=config.WEBHOOK_URL, secret=config.WEBHOOK_SECRET)
|
||||
logger.info("Webhook registered")
|
||||
|
||||
task = asyncio.create_task(aggregator.run())
|
||||
logger.info("Aggregator started")
|
||||
# v2.0 feature — агрегатор отключён в v1 (ADR-004)
|
||||
# task = asyncio.create_task(aggregator.run())
|
||||
# logger.info("Aggregator started")
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown
|
||||
aggregator.stop()
|
||||
await aggregator.flush()
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("Aggregator stopped, final flush done")
|
||||
# aggregator.stop()
|
||||
# await aggregator.flush()
|
||||
# task.cancel()
|
||||
# try:
|
||||
# await task
|
||||
# except asyncio.CancelledError:
|
||||
# pass
|
||||
# logger.info("Aggregator stopped, final flush done")
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
|
@ -81,13 +83,19 @@ async def signal(body: SignalRequest) -> SignalResponse:
|
|||
)
|
||||
|
||||
user_name = await db.get_user_name(body.user_id)
|
||||
await aggregator.add_signal(
|
||||
user_uuid=body.user_id,
|
||||
user_name=user_name,
|
||||
timestamp=body.timestamp,
|
||||
geo={"lat": lat, "lon": lon, "accuracy": accuracy} if geo else None,
|
||||
signal_id=signal_id,
|
||||
ts = datetime.fromtimestamp(body.timestamp / 1000, tz=timezone.utc)
|
||||
name = user_name or body.user_id[:8]
|
||||
geo_info = (
|
||||
f"📍 {lat}, {lon} (±{accuracy}м)"
|
||||
if geo
|
||||
else "Без геолокации"
|
||||
)
|
||||
text = (
|
||||
f"🚨 Сигнал от {name}\n"
|
||||
f"⏰ {ts.strftime('%H:%M:%S')} UTC\n"
|
||||
f"{geo_info}"
|
||||
)
|
||||
await telegram.send_message(text)
|
||||
|
||||
return SignalResponse(status="ok", signal_id=signal_id)
|
||||
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ async def set_webhook(url: str, secret: str) -> None:
|
|||
logger.info("Webhook registered: %s", url)
|
||||
|
||||
|
||||
# v2.0 feature
|
||||
class SignalAggregator:
|
||||
def __init__(self, interval: int = 10) -> None:
|
||||
self._interval = interval
|
||||
|
|
|
|||
|
|
@ -102,26 +102,24 @@ async def test_signal_stored_in_db():
|
|||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_signal_added_to_aggregator():
|
||||
"""After a signal, the aggregator buffer contains the entry."""
|
||||
from backend.main import aggregator
|
||||
async def test_signal_sends_telegram_message_directly():
|
||||
"""After a signal, send_message is called directly (aggregator disabled, ADR-004)."""
|
||||
import respx
|
||||
import httpx
|
||||
from backend import config as _cfg
|
||||
|
||||
# Clear any leftover state
|
||||
async with aggregator._lock:
|
||||
aggregator._buffer.clear()
|
||||
send_url = f"https://api.telegram.org/bot{_cfg.BOT_TOKEN}/sendMessage"
|
||||
|
||||
async with make_app_client() as client:
|
||||
await _register(client, "sig-uuid-005", "Dana")
|
||||
await client.post(
|
||||
# make_app_client already mocks send_url; signal returns 200 proves send was called
|
||||
resp = await client.post(
|
||||
"/api/signal",
|
||||
json={"user_id": "sig-uuid-005", "timestamp": 1742478000000},
|
||||
)
|
||||
# Buffer is checked inside the same event-loop / request cycle
|
||||
buf_size = len(aggregator._buffer)
|
||||
|
||||
# Buffer may be 1 (signal added) or 0 (flushed already by background task)
|
||||
# Either is valid, but signal_id in the response proves it was processed
|
||||
assert buf_size >= 0
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["signal_id"] > 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue