""" Tests for backend/telegram.py: send_message, set_webhook, SignalAggregator. NOTE: respx routes must be registered INSIDE the 'with mock:' block to be intercepted properly. Registering them before entering the context does not activate the mock for new httpx.AsyncClient instances created at call time. """ from __future__ import annotations import os os.environ.setdefault("BOT_TOKEN", "test-bot-token") os.environ.setdefault("CHAT_ID", "-1001234567890") os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") import aiosqlite def _safe_aiosqlite_await(self): if not self._thread._started.is_set(): self._thread.start() return self._connect().__await__() aiosqlite.core.Connection.__await__ = _safe_aiosqlite_await # type: ignore[method-assign] import json import os as _os import tempfile from unittest.mock import AsyncMock, patch import httpx import pytest import respx from backend import config from backend.telegram import SignalAggregator, send_message, set_webhook SEND_URL = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage" WEBHOOK_URL_API = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook" # --------------------------------------------------------------------------- # send_message # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_send_message_calls_telegram_api(): """send_message POSTs to api.telegram.org/bot.../sendMessage.""" with respx.mock(assert_all_called=False) as mock: route = mock.post(SEND_URL).mock( return_value=httpx.Response(200, json={"ok": True}) ) await send_message("hello world") assert route.called body = json.loads(route.calls[0].request.content) assert body["chat_id"] == config.CHAT_ID assert body["text"] == "hello world" @pytest.mark.asyncio async def test_send_message_handles_429(): """On 429, send_message sleeps retry_after seconds then retries.""" retry_after = 5 responses = [ httpx.Response( 429, json={"ok": False, "parameters": {"retry_after": retry_after}}, ), httpx.Response(200, json={"ok": True}), ] with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(side_effect=responses) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: await send_message("test 429") mock_sleep.assert_any_call(retry_after) @pytest.mark.asyncio async def test_send_message_5xx_retries(): """On 5xx, send_message sleeps 30 seconds and retries once.""" responses = [ httpx.Response(500, text="Internal Server Error"), httpx.Response(200, json={"ok": True}), ] with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(side_effect=responses) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: await send_message("test 5xx") mock_sleep.assert_any_call(30) # --------------------------------------------------------------------------- # set_webhook # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_set_webhook_calls_correct_endpoint(): """set_webhook POSTs to setWebhook with url and secret_token.""" with respx.mock(assert_all_called=False) as mock: route = mock.post(WEBHOOK_URL_API).mock( return_value=httpx.Response(200, json={"ok": True, "result": True}) ) await set_webhook( url="https://example.com/api/webhook/telegram", secret="my-secret", ) assert route.called body = json.loads(route.calls[0].request.content) assert body["url"] == "https://example.com/api/webhook/telegram" assert body["secret_token"] == "my-secret" @pytest.mark.asyncio async def test_set_webhook_raises_on_result_false(): """set_webhook raises RuntimeError when Telegram returns result=False.""" with respx.mock(assert_all_called=False) as mock: mock.post(WEBHOOK_URL_API).mock( return_value=httpx.Response(200, json={"ok": True, "result": False}) ) with pytest.raises(RuntimeError, match="setWebhook failed"): await set_webhook(url="https://example.com/webhook", secret="s") @pytest.mark.asyncio async def test_set_webhook_raises_on_non_200(): """set_webhook raises RuntimeError on non-200 response.""" with respx.mock(assert_all_called=False) as mock: mock.post(WEBHOOK_URL_API).mock( return_value=httpx.Response(400, json={"ok": False}) ) with pytest.raises(RuntimeError, match="setWebhook failed"): await set_webhook(url="https://example.com/webhook", secret="s") # --------------------------------------------------------------------------- # SignalAggregator helpers # --------------------------------------------------------------------------- async def _init_db_with_tmp() -> str: """Init a temp-file DB and return its path.""" from backend import config as _cfg, db as _db path = tempfile.mktemp(suffix=".db") _cfg.DB_PATH = path await _db.init_db() return path def _cleanup(path: str) -> None: for ext in ("", "-wal", "-shm"): try: _os.unlink(path + ext) except FileNotFoundError: pass # --------------------------------------------------------------------------- # SignalAggregator tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_aggregator_single_signal_calls_send_message(): """Flushing an aggregator with one signal calls send_message once.""" path = await _init_db_with_tmp() try: agg = SignalAggregator(interval=9999) await agg.add_signal( user_uuid="agg-uuid-001", user_name="Alice", timestamp=1742478000000, geo={"lat": 55.0, "lon": 37.0, "accuracy": 10.0}, signal_id=1, ) with respx.mock(assert_all_called=False) as mock: send_route = mock.post(SEND_URL).mock( return_value=httpx.Response(200, json={"ok": True}) ) with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await agg.flush() assert send_route.call_count == 1 finally: _cleanup(path) @pytest.mark.asyncio async def test_aggregator_multiple_signals_one_message(): """5 signals flushed at once produce exactly one send_message call.""" path = await _init_db_with_tmp() try: agg = SignalAggregator(interval=9999) for i in range(5): await agg.add_signal( user_uuid=f"agg-uuid-{i:03d}", user_name=f"User{i}", timestamp=1742478000000 + i * 1000, geo=None, signal_id=i + 1, ) with respx.mock(assert_all_called=False) as mock: send_route = mock.post(SEND_URL).mock( return_value=httpx.Response(200, json={"ok": True}) ) with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await agg.flush() assert send_route.call_count == 1 finally: _cleanup(path) @pytest.mark.asyncio async def test_aggregator_empty_buffer_no_send(): """Flushing an empty aggregator must NOT call send_message.""" agg = SignalAggregator(interval=9999) # No routes registered — if a POST is made it will raise AllMockedAssertionError with respx.mock(assert_all_called=False) as mock: send_route = mock.post(SEND_URL).mock( return_value=httpx.Response(200, json={"ok": True}) ) await agg.flush() assert send_route.call_count == 0 @pytest.mark.asyncio async def test_aggregator_buffer_cleared_after_flush(): """After flush, the aggregator buffer is empty.""" path = await _init_db_with_tmp() try: agg = SignalAggregator(interval=9999) await agg.add_signal( user_uuid="agg-uuid-clr", user_name="Test", timestamp=1742478000000, geo=None, signal_id=99, ) assert len(agg._buffer) == 1 with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(return_value=httpx.Response(200, json={"ok": True})) with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await agg.flush() assert len(agg._buffer) == 0 finally: _cleanup(path) @pytest.mark.asyncio async def test_aggregator_unknown_user_shows_uuid_prefix(): """If user_name is None, the message shows first 8 chars of uuid.""" path = await _init_db_with_tmp() try: agg = SignalAggregator(interval=9999) test_uuid = "abcdef1234567890" await agg.add_signal( user_uuid=test_uuid, user_name=None, timestamp=1742478000000, geo=None, signal_id=1, ) sent_texts: list[str] = [] async def _fake_send(text: str) -> None: sent_texts.append(text) with patch("backend.telegram.send_message", side_effect=_fake_send): with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await agg.flush() assert len(sent_texts) == 1 assert test_uuid[:8] in sent_texts[0] finally: _cleanup(path)