""" Tests for backend/telegram.py: send_message, set_webhook, SignalAggregator. NOTE: respx routes must be registered INSIDE the 'with mock:' block to be intercepted properly. Registering them before entering the context does not activate the mock for new httpx.AsyncClient instances created at call time. """ from __future__ import annotations import os os.environ.setdefault("BOT_TOKEN", "test-bot-token") os.environ.setdefault("CHAT_ID", "-1001234567890") os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") import aiosqlite def _safe_aiosqlite_await(self): if not self._thread._started.is_set(): self._thread.start() return self._connect().__await__() aiosqlite.core.Connection.__await__ = _safe_aiosqlite_await # type: ignore[method-assign] import json import os as _os import tempfile from unittest.mock import AsyncMock, patch import httpx import pytest import respx from backend import config from backend.telegram import SignalAggregator, send_message, set_webhook, validate_bot_token SEND_URL = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage" WEBHOOK_URL_API = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook" GET_ME_URL = f"https://api.telegram.org/bot{config.BOT_TOKEN}/getMe" # --------------------------------------------------------------------------- # validate_bot_token # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_validate_bot_token_returns_true_on_200(): """validate_bot_token returns True when getMe responds 200.""" with respx.mock(assert_all_called=False) as mock: mock.get(GET_ME_URL).mock( return_value=httpx.Response(200, json={"ok": True, "result": {"username": "batonbot"}}) ) result = await validate_bot_token() assert result is True @pytest.mark.asyncio async def test_validate_bot_token_returns_false_on_401(caplog): """validate_bot_token returns False and logs ERROR when getMe responds 401.""" import logging with respx.mock(assert_all_called=False) as mock: mock.get(GET_ME_URL).mock( return_value=httpx.Response(401, json={"ok": False, "description": "Unauthorized"}) ) with caplog.at_level(logging.ERROR, logger="backend.telegram"): result = await validate_bot_token() assert result is False assert any("401" in record.message for record in caplog.records) @pytest.mark.asyncio async def test_validate_bot_token_returns_false_on_network_error(caplog): """validate_bot_token returns False and logs ERROR on network failure — never raises.""" import logging with respx.mock(assert_all_called=False) as mock: mock.get(GET_ME_URL).mock(side_effect=httpx.ConnectError("connection refused")) with caplog.at_level(logging.ERROR, logger="backend.telegram"): result = await validate_bot_token() assert result is False assert len(caplog.records) >= 1 # --------------------------------------------------------------------------- # send_message # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_send_message_calls_telegram_api(): """send_message POSTs to api.telegram.org/bot.../sendMessage.""" with respx.mock(assert_all_called=False) as mock: route = mock.post(SEND_URL).mock( return_value=httpx.Response(200, json={"ok": True}) ) await send_message("hello world") assert route.called body = json.loads(route.calls[0].request.content) assert body["chat_id"] == config.CHAT_ID assert body["text"] == "hello world" @pytest.mark.asyncio async def test_send_message_handles_429(): """On 429, send_message sleeps retry_after seconds then retries.""" retry_after = 5 responses = [ httpx.Response( 429, json={"ok": False, "parameters": {"retry_after": retry_after}}, ), httpx.Response(200, json={"ok": True}), ] with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(side_effect=responses) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: await send_message("test 429") mock_sleep.assert_any_call(retry_after) @pytest.mark.asyncio async def test_send_message_5xx_retries(): """On 5xx, send_message sleeps 30 seconds and retries once.""" responses = [ httpx.Response(500, text="Internal Server Error"), httpx.Response(200, json={"ok": True}), ] with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(side_effect=responses) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: await send_message("test 5xx") mock_sleep.assert_any_call(30) # --------------------------------------------------------------------------- # set_webhook # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_set_webhook_calls_correct_endpoint(): """set_webhook POSTs to setWebhook with url and secret_token.""" with respx.mock(assert_all_called=False) as mock: route = mock.post(WEBHOOK_URL_API).mock( return_value=httpx.Response(200, json={"ok": True, "result": True}) ) await set_webhook( url="https://example.com/api/webhook/telegram", secret="my-secret", ) assert route.called body = json.loads(route.calls[0].request.content) assert body["url"] == "https://example.com/api/webhook/telegram" assert body["secret_token"] == "my-secret" @pytest.mark.asyncio async def test_set_webhook_raises_on_result_false(): """set_webhook raises RuntimeError when Telegram returns result=False.""" with respx.mock(assert_all_called=False) as mock: mock.post(WEBHOOK_URL_API).mock( return_value=httpx.Response(200, json={"ok": True, "result": False}) ) with pytest.raises(RuntimeError, match="setWebhook failed"): await set_webhook(url="https://example.com/webhook", secret="s") @pytest.mark.asyncio async def test_set_webhook_raises_on_non_200(): """set_webhook raises RuntimeError on non-200 response.""" with respx.mock(assert_all_called=False) as mock: mock.post(WEBHOOK_URL_API).mock( return_value=httpx.Response(400, json={"ok": False}) ) with pytest.raises(RuntimeError, match="setWebhook failed"): await set_webhook(url="https://example.com/webhook", secret="s") # --------------------------------------------------------------------------- # SignalAggregator helpers # --------------------------------------------------------------------------- async def _init_db_with_tmp() -> str: """Init a temp-file DB and return its path.""" from backend import config as _cfg, db as _db path = tempfile.mktemp(suffix=".db") _cfg.DB_PATH = path await _db.init_db() return path def _cleanup(path: str) -> None: for ext in ("", "-wal", "-shm"): try: _os.unlink(path + ext) except FileNotFoundError: pass # --------------------------------------------------------------------------- # SignalAggregator tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_aggregator_single_signal_calls_send_message(): """Flushing an aggregator with one signal calls send_message once.""" path = await _init_db_with_tmp() try: agg = SignalAggregator(interval=9999) await agg.add_signal( user_uuid="a9900001-0000-4000-8000-000000000001", user_name="Alice", timestamp=1742478000000, geo={"lat": 55.0, "lon": 37.0, "accuracy": 10.0}, signal_id=1, ) with respx.mock(assert_all_called=False) as mock: send_route = mock.post(SEND_URL).mock( return_value=httpx.Response(200, json={"ok": True}) ) with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await agg.flush() assert send_route.call_count == 1 finally: _cleanup(path) @pytest.mark.asyncio async def test_aggregator_multiple_signals_one_message(): """5 signals flushed at once produce exactly one send_message call.""" path = await _init_db_with_tmp() try: agg = SignalAggregator(interval=9999) for i in range(5): await agg.add_signal( user_uuid=f"a990000{i}-0000-4000-8000-00000000000{i}", user_name=f"User{i}", timestamp=1742478000000 + i * 1000, geo=None, signal_id=i + 1, ) with respx.mock(assert_all_called=False) as mock: send_route = mock.post(SEND_URL).mock( return_value=httpx.Response(200, json={"ok": True}) ) with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await agg.flush() assert send_route.call_count == 1 finally: _cleanup(path) @pytest.mark.asyncio async def test_aggregator_empty_buffer_no_send(): """Flushing an empty aggregator must NOT call send_message.""" agg = SignalAggregator(interval=9999) # No routes registered — if a POST is made it will raise AllMockedAssertionError with respx.mock(assert_all_called=False) as mock: send_route = mock.post(SEND_URL).mock( return_value=httpx.Response(200, json={"ok": True}) ) await agg.flush() assert send_route.call_count == 0 @pytest.mark.asyncio async def test_aggregator_buffer_cleared_after_flush(): """After flush, the aggregator buffer is empty.""" path = await _init_db_with_tmp() try: agg = SignalAggregator(interval=9999) await agg.add_signal( user_uuid="a9900099-0000-4000-8000-000000000099", user_name="Test", timestamp=1742478000000, geo=None, signal_id=99, ) assert len(agg._buffer) == 1 with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock(return_value=httpx.Response(200, json={"ok": True})) with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await agg.flush() assert len(agg._buffer) == 0 finally: _cleanup(path) # --------------------------------------------------------------------------- # BATON-007: 400 "chat not found" handling # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_send_message_400_chat_not_found_does_not_raise(): """400 'chat not found' must not raise an exception (service stays alive).""" with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock( return_value=httpx.Response( 400, json={"ok": False, "error_code": 400, "description": "Bad Request: chat not found"}, ) ) # Must not raise — service must stay alive even with wrong CHAT_ID await send_message("test") @pytest.mark.asyncio async def test_send_message_400_chat_not_found_logs_error(caplog): """400 response from Telegram must be logged as ERROR with the status code.""" import logging with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock( return_value=httpx.Response( 400, json={"ok": False, "error_code": 400, "description": "Bad Request: chat not found"}, ) ) with caplog.at_level(logging.ERROR, logger="backend.telegram"): await send_message("test chat not found") assert any("400" in record.message for record in caplog.records), ( "Expected ERROR log containing '400' but got: " + str([r.message for r in caplog.records]) ) @pytest.mark.asyncio async def test_send_message_400_breaks_after_first_attempt(): """On 400, send_message breaks immediately (no retry loop) — only one HTTP call made.""" with respx.mock(assert_all_called=False) as mock: route = mock.post(SEND_URL).mock( return_value=httpx.Response( 400, json={"ok": False, "error_code": 400, "description": "Bad Request: chat not found"}, ) ) await send_message("test no retry on 400") assert route.call_count == 1, f"Expected 1 call on 400, got {route.call_count}" @pytest.mark.asyncio async def test_send_message_all_5xx_retries_exhausted_does_not_raise(): """When all 3 attempts fail with 5xx, send_message logs error but does NOT raise.""" with respx.mock(assert_all_called=False) as mock: mock.post(SEND_URL).mock( return_value=httpx.Response(500, text="Internal Server Error") ) with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): # Must not raise — message is dropped, service stays alive await send_message("test all retries exhausted") @pytest.mark.asyncio async def test_aggregator_unknown_user_shows_uuid_prefix(): """If user_name is None, the message shows first 8 chars of uuid.""" path = await _init_db_with_tmp() try: agg = SignalAggregator(interval=9999) test_uuid = "abcdef1234567890" await agg.add_signal( user_uuid=test_uuid, user_name=None, timestamp=1742478000000, geo=None, signal_id=1, ) sent_texts: list[str] = [] async def _fake_send(text: str) -> None: sent_texts.append(text) with patch("backend.telegram.send_message", side_effect=_fake_send): with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): await agg.flush() assert len(sent_texts) == 1 assert test_uuid[:8] in sent_texts[0] finally: _cleanup(path)