""" Tests for BATON-005: Admin panel — user creation, password change, block/unblock, delete. Acceptance criteria: 1. Создание пользователя — пользователь появляется в БД (GET /admin/users) 2. Смена пароля — endpoint возвращает ok, 404 для несуществующего пользователя 3. Блокировка — заблокированный пользователь не может отправить сигнал (403) 4. Разблокировка — восстанавливает доступ (сигнал снова проходит) 5. Удаление — пользователь исчезает из GET /admin/users, возвращается 204 6. Защита: неавторизованный запрос к /admin/* возвращает 401 7. Отсутствие регрессии с основным функционалом """ from __future__ import annotations import os import re from pathlib import Path os.environ.setdefault("BOT_TOKEN", "test-bot-token") os.environ.setdefault("CHAT_ID", "-1001234567890") os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") os.environ.setdefault("ADMIN_TOKEN", "test-admin-token") import pytest from tests.conftest import make_app_client PROJECT_ROOT = Path(__file__).parent.parent NGINX_CONF = PROJECT_ROOT / "nginx" / "baton.conf" ADMIN_HEADERS = {"Authorization": "Bearer test-admin-token"} WRONG_HEADERS = {"Authorization": "Bearer wrong-token"} # --------------------------------------------------------------------------- # Criterion 6 — Unauthorised requests to /admin/* return 401 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_admin_list_users_without_token_returns_401() -> None: """GET /admin/users без Authorization header должен вернуть 401.""" async with make_app_client() as client: resp = await client.get("/admin/users") assert resp.status_code == 401 @pytest.mark.asyncio async def test_admin_list_users_wrong_token_returns_401() -> None: """GET /admin/users с неверным токеном должен вернуть 401.""" async with make_app_client() as client: resp = await client.get("/admin/users", headers=WRONG_HEADERS) assert resp.status_code == 401 @pytest.mark.asyncio async def test_admin_create_user_without_token_returns_401() -> None: """POST /admin/users без токена должен вернуть 401.""" async with make_app_client() as client: resp = await client.post( "/admin/users", json={"uuid": "unauth-uuid-001", "name": "Ghost"}, ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_admin_set_password_without_token_returns_401() -> None: """PUT /admin/users/1/password без токена должен вернуть 401.""" async with make_app_client() as client: resp = await client.put( "/admin/users/1/password", json={"password": "newpass"}, ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_admin_block_user_without_token_returns_401() -> None: """PUT /admin/users/1/block без токена должен вернуть 401.""" async with make_app_client() as client: resp = await client.put( "/admin/users/1/block", json={"is_blocked": True}, ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_admin_delete_user_without_token_returns_401() -> None: """DELETE /admin/users/1 без токена должен вернуть 401.""" async with make_app_client() as client: resp = await client.delete("/admin/users/1") assert resp.status_code == 401 # --------------------------------------------------------------------------- # Criterion 1 — Create user: appears in DB # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_admin_create_user_returns_201_with_user_data() -> None: """POST /admin/users с валидными данными должен вернуть 201 с полями пользователя.""" async with make_app_client() as client: resp = await client.post( "/admin/users", json={"uuid": "create-uuid-001", "name": "Alice Admin"}, headers=ADMIN_HEADERS, ) assert resp.status_code == 201 data = resp.json() assert data["uuid"] == "create-uuid-001" assert data["name"] == "Alice Admin" assert data["id"] > 0 assert data["is_blocked"] is False @pytest.mark.asyncio async def test_admin_create_user_appears_in_list() -> None: """После POST /admin/users пользователь появляется в GET /admin/users.""" async with make_app_client() as client: await client.post( "/admin/users", json={"uuid": "create-uuid-002", "name": "Bob Admin"}, headers=ADMIN_HEADERS, ) resp = await client.get("/admin/users", headers=ADMIN_HEADERS) assert resp.status_code == 200 users = resp.json() uuids = [u["uuid"] for u in users] assert "create-uuid-002" in uuids @pytest.mark.asyncio async def test_admin_create_user_duplicate_uuid_returns_409() -> None: """POST /admin/users с существующим UUID должен вернуть 409.""" async with make_app_client() as client: await client.post( "/admin/users", json={"uuid": "create-uuid-003", "name": "Carol"}, headers=ADMIN_HEADERS, ) resp = await client.post( "/admin/users", json={"uuid": "create-uuid-003", "name": "Carol Duplicate"}, headers=ADMIN_HEADERS, ) assert resp.status_code == 409 @pytest.mark.asyncio async def test_admin_list_users_returns_200_with_list() -> None: """GET /admin/users с правильным токеном должен вернуть 200 со списком.""" async with make_app_client() as client: resp = await client.get("/admin/users", headers=ADMIN_HEADERS) assert resp.status_code == 200 assert isinstance(resp.json(), list) # --------------------------------------------------------------------------- # Criterion 2 — Password change # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_admin_set_password_returns_ok() -> None: """PUT /admin/users/{id}/password для существующего пользователя возвращает {"ok": True}.""" async with make_app_client() as client: create_resp = await client.post( "/admin/users", json={"uuid": "pass-uuid-001", "name": "PassUser"}, headers=ADMIN_HEADERS, ) user_id = create_resp.json()["id"] resp = await client.put( f"/admin/users/{user_id}/password", json={"password": "newpassword123"}, headers=ADMIN_HEADERS, ) assert resp.status_code == 200 assert resp.json() == {"ok": True} @pytest.mark.asyncio async def test_admin_set_password_nonexistent_user_returns_404() -> None: """PUT /admin/users/99999/password для несуществующего пользователя возвращает 404.""" async with make_app_client() as client: resp = await client.put( "/admin/users/99999/password", json={"password": "somepassword"}, headers=ADMIN_HEADERS, ) assert resp.status_code == 404 @pytest.mark.asyncio async def test_admin_set_password_user_still_accessible_after_change() -> None: """Пользователь остаётся доступен в GET /admin/users после смены пароля.""" async with make_app_client() as client: create_resp = await client.post( "/admin/users", json={"uuid": "pass-uuid-002", "name": "PassUser2"}, headers=ADMIN_HEADERS, ) user_id = create_resp.json()["id"] await client.put( f"/admin/users/{user_id}/password", json={"password": "updatedpass"}, headers=ADMIN_HEADERS, ) list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS) uuids = [u["uuid"] for u in list_resp.json()] assert "pass-uuid-002" in uuids # --------------------------------------------------------------------------- # Criterion 3 — Block user: blocked user cannot send signal # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_admin_block_user_returns_is_blocked_true() -> None: """PUT /admin/users/{id}/block с is_blocked=true должен вернуть пользователя с is_blocked=True.""" async with make_app_client() as client: create_resp = await client.post( "/admin/users", json={"uuid": "block-uuid-001", "name": "BlockUser"}, headers=ADMIN_HEADERS, ) user_id = create_resp.json()["id"] resp = await client.put( f"/admin/users/{user_id}/block", json={"is_blocked": True}, headers=ADMIN_HEADERS, ) assert resp.status_code == 200 assert resp.json()["is_blocked"] is True @pytest.mark.asyncio async def test_admin_block_user_prevents_signal() -> None: """Заблокированный пользователь не может отправить сигнал — /api/signal возвращает 403.""" async with make_app_client() as client: create_resp = await client.post( "/admin/users", json={"uuid": "block-uuid-002", "name": "BlockSignalUser"}, headers=ADMIN_HEADERS, ) user_id = create_resp.json()["id"] user_uuid = create_resp.json()["uuid"] await client.put( f"/admin/users/{user_id}/block", json={"is_blocked": True}, headers=ADMIN_HEADERS, ) signal_resp = await client.post( "/api/signal", json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None}, ) assert signal_resp.status_code == 403 @pytest.mark.asyncio async def test_admin_block_nonexistent_user_returns_404() -> None: """PUT /admin/users/99999/block для несуществующего пользователя возвращает 404.""" async with make_app_client() as client: resp = await client.put( "/admin/users/99999/block", json={"is_blocked": True}, headers=ADMIN_HEADERS, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # Criterion 4 — Unblock user: restores access # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_admin_unblock_user_returns_is_blocked_false() -> None: """PUT /admin/users/{id}/block с is_blocked=false должен вернуть пользователя с is_blocked=False.""" async with make_app_client() as client: create_resp = await client.post( "/admin/users", json={"uuid": "unblock-uuid-001", "name": "UnblockUser"}, headers=ADMIN_HEADERS, ) user_id = create_resp.json()["id"] await client.put( f"/admin/users/{user_id}/block", json={"is_blocked": True}, headers=ADMIN_HEADERS, ) resp = await client.put( f"/admin/users/{user_id}/block", json={"is_blocked": False}, headers=ADMIN_HEADERS, ) assert resp.status_code == 200 assert resp.json()["is_blocked"] is False @pytest.mark.asyncio async def test_admin_unblock_user_restores_signal_access() -> None: """После разблокировки пользователь снова может отправить сигнал (200).""" async with make_app_client() as client: create_resp = await client.post( "/admin/users", json={"uuid": "unblock-uuid-002", "name": "UnblockSignalUser"}, headers=ADMIN_HEADERS, ) user_id = create_resp.json()["id"] user_uuid = create_resp.json()["uuid"] # Блокируем await client.put( f"/admin/users/{user_id}/block", json={"is_blocked": True}, headers=ADMIN_HEADERS, ) # Разблокируем await client.put( f"/admin/users/{user_id}/block", json={"is_blocked": False}, headers=ADMIN_HEADERS, ) # Сигнал должен пройти signal_resp = await client.post( "/api/signal", json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None}, ) assert signal_resp.status_code == 200 assert signal_resp.json()["status"] == "ok" # --------------------------------------------------------------------------- # Criterion 5 — Delete user: disappears from DB # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_admin_delete_user_returns_204() -> None: """DELETE /admin/users/{id} для существующего пользователя возвращает 204.""" async with make_app_client() as client: create_resp = await client.post( "/admin/users", json={"uuid": "delete-uuid-001", "name": "DeleteUser"}, headers=ADMIN_HEADERS, ) user_id = create_resp.json()["id"] resp = await client.delete( f"/admin/users/{user_id}", headers=ADMIN_HEADERS, ) assert resp.status_code == 204 @pytest.mark.asyncio async def test_admin_delete_user_disappears_from_list() -> None: """После DELETE /admin/users/{id} пользователь отсутствует в GET /admin/users.""" async with make_app_client() as client: create_resp = await client.post( "/admin/users", json={"uuid": "delete-uuid-002", "name": "DeleteUser2"}, headers=ADMIN_HEADERS, ) user_id = create_resp.json()["id"] await client.delete( f"/admin/users/{user_id}", headers=ADMIN_HEADERS, ) list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS) uuids = [u["uuid"] for u in list_resp.json()] assert "delete-uuid-002" not in uuids @pytest.mark.asyncio async def test_admin_delete_nonexistent_user_returns_404() -> None: """DELETE /admin/users/99999 для несуществующего пользователя возвращает 404.""" async with make_app_client() as client: resp = await client.delete( "/admin/users/99999", headers=ADMIN_HEADERS, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # nginx config — location /admin/users block (BATON-006 fix) # --------------------------------------------------------------------------- def test_nginx_conf_has_admin_users_location_block() -> None: """nginx/baton.conf должен содержать блок location /admin/users.""" content = NGINX_CONF.read_text(encoding="utf-8") assert re.search(r"location\s+/admin/users\b", content), ( "nginx/baton.conf не содержит блок location /admin/users — " "запросы к admin API будут попадать в location / и возвращать 404" ) def test_nginx_conf_admin_users_location_proxies_to_fastapi() -> None: """Блок location /admin/users должен делать proxy_pass на 127.0.0.1:8000.""" content = NGINX_CONF.read_text(encoding="utf-8") admin_block = re.search( r"location\s+/admin/users\s*\{([^}]+)\}", content, re.DOTALL ) assert admin_block is not None, "Блок location /admin/users { ... } не найден" assert re.search(r"proxy_pass\s+http://127\.0\.0\.1:8000", admin_block.group(1)), ( "Блок location /admin/users не содержит proxy_pass http://127.0.0.1:8000" ) def test_nginx_conf_admin_users_location_before_root_location() -> None: """location /admin/users должен находиться в nginx.conf до location / для корректного prefix-matching.""" content = NGINX_CONF.read_text(encoding="utf-8") admin_pos = content.find("location /admin/users") root_pos = re.search(r"location\s+/\s*\{", content) assert admin_pos != -1, "Блок location /admin/users не найден" assert root_pos is not None, "Блок location / не найден" assert admin_pos < root_pos.start(), ( "location /admin/users должен быть определён ДО location / в nginx.conf" ) # --------------------------------------------------------------------------- # Criterion 7 — No regression with main functionality # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_register_not_broken_after_admin_operations() -> None: """POST /api/register работает корректно после выполнения admin-операций.""" async with make_app_client() as client: # Admin операции await client.post( "/admin/users", json={"uuid": "regress-admin-uuid-001", "name": "AdminCreated"}, headers=ADMIN_HEADERS, ) # Основной функционал resp = await client.post( "/api/register", json={"uuid": "regress-user-uuid-001", "name": "RegularUser"}, ) assert resp.status_code == 200 assert resp.json()["uuid"] == "regress-user-uuid-001" @pytest.mark.asyncio async def test_signal_from_unblocked_user_succeeds() -> None: """Незаблокированный пользователь, созданный через admin API, может отправить сигнал.""" async with make_app_client() as client: create_resp = await client.post( "/admin/users", json={"uuid": "regress-signal-uuid-001", "name": "SignalUser"}, headers=ADMIN_HEADERS, ) user_uuid = create_resp.json()["uuid"] signal_resp = await client.post( "/api/signal", json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None}, ) assert signal_resp.status_code == 200 assert signal_resp.json()["status"] == "ok"