baton/tests/conftest.py
2026-03-21 09:34:21 +02:00

132 lines
5.5 KiB
Python

"""
Shared fixtures for the baton backend test suite.
IMPORTANT: Environment variables and the aiosqlite monkey-patch must be
applied before any backend module is imported. This module is loaded first
by pytest and all assignments happen at module-level.
Python 3.14 incompatibility with aiosqlite <= 0.22.1:
Connection.__await__ unconditionally calls self._thread.start().
When 'async with await conn' is used, the thread is already running by
the time __aenter__ tries to start it again → RuntimeError.
The monkey-patch below guards the start so threads are only started once.
"""
from __future__ import annotations
import os
# ── 1. Env vars — must precede all backend imports ──────────────────────────
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
os.environ.setdefault("ADMIN_CHAT_ID", "5694335584")
# ── 2. aiosqlite monkey-patch ────────────────────────────────────────────────
import aiosqlite
def _safe_aiosqlite_await(self): # type: ignore[override]
"""Start the worker thread only if it has not been started yet."""
if not self._thread._started.is_set():
self._thread.start()
return self._connect().__await__()
aiosqlite.core.Connection.__await__ = _safe_aiosqlite_await # type: ignore[method-assign]
# ── 3. Normal imports ────────────────────────────────────────────────────────
import tempfile
import contextlib
from typing import AsyncGenerator
import httpx
import pytest
import pytest_asyncio
import respx
from httpx import AsyncClient, ASGITransport
from backend import config
# ── 4. DB-path helper ────────────────────────────────────────────────────────
@contextlib.contextmanager
def temp_db():
"""Context manager that sets config.DB_PATH to a temp file and cleans up."""
path = tempfile.mktemp(suffix=".db")
original = config.DB_PATH
config.DB_PATH = path
try:
yield path
finally:
config.DB_PATH = original
for ext in ("", "-wal", "-shm"):
try:
os.unlink(path + ext)
except FileNotFoundError:
pass
# ── 5. App client factory ────────────────────────────────────────────────────
def make_app_client(capture_send_requests: list | None = None):
"""
Async context manager that:
1. Assigns a fresh temp-file DB path
2. Mocks Telegram setWebhook, sendMessage, answerCallbackQuery, editMessageText
3. Runs the FastAPI lifespan (startup → test → shutdown)
4. Yields an httpx.AsyncClient wired to the app
Args:
capture_send_requests: if provided, each sendMessage request body (dict) is
appended to this list, enabling HTTP-level assertions on chat_id, text, etc.
"""
import json as _json
tg_set_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook"
send_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage"
get_me_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/getMe"
answer_cb_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/answerCallbackQuery"
edit_msg_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/editMessageText"
@contextlib.asynccontextmanager
async def _ctx():
with temp_db():
from backend.main import app
mock_router = respx.mock(assert_all_called=False)
mock_router.get(get_me_url).mock(
return_value=httpx.Response(200, json={"ok": True, "result": {"username": "testbot"}})
)
mock_router.post(tg_set_url).mock(
return_value=httpx.Response(200, json={"ok": True, "result": True})
)
if capture_send_requests is not None:
def _capture_send(request: httpx.Request) -> httpx.Response:
try:
capture_send_requests.append(_json.loads(request.content))
except Exception:
pass
return httpx.Response(200, json={"ok": True})
mock_router.post(send_url).mock(side_effect=_capture_send)
else:
mock_router.post(send_url).mock(
return_value=httpx.Response(200, json={"ok": True})
)
mock_router.post(answer_cb_url).mock(
return_value=httpx.Response(200, json={"ok": True})
)
mock_router.post(edit_msg_url).mock(
return_value=httpx.Response(200, json={"ok": True})
)
with mock_router:
async with app.router.lifespan_context(app):
transport = ASGITransport(app=app)
async with AsyncClient(
transport=transport, base_url="http://testserver"
) as client:
yield client
return _ctx()