From a8d53fa47bf4b41e35d123e1f748bffe7cf2afcd Mon Sep 17 00:00:00 2001 From: Gros Frumos Date: Fri, 20 Mar 2026 23:50:54 +0200 Subject: [PATCH] =?UTF-8?q?kin:=20BATON-005=20=D0=A1=D0=B4=D0=B5=D0=BB?= =?UTF-8?q?=D0=B0=D1=82=D1=8C=20=D0=B0=D0=B4=D0=BC=D0=B8=D0=BD=D0=BA=D1=83?= =?UTF-8?q?=20=D0=B4=D0=BB=D1=8F=20=D0=B7=D0=B0=D0=B2=D0=B5=D0=B4=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20=D0=BF=D0=BE=D0=BB=D1=8C=D0=B7=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D1=82=D0=B5=D0=BB=D0=B5=D0=B9=20=D1=81=D0=BE=20=D1=81?= =?UTF-8?q?=D0=BC=D0=B5=D0=BD=D0=BE=D0=B9=20=D0=BF=D0=B0=D1=80=D0=BE=D0=BB?= =?UTF-8?q?=D1=8F,=20=D0=B1=D0=BB=D0=BE=D0=BA=D0=B8=D1=80=D0=BE=D0=B2?= =?UTF-8?q?=D0=BA=D0=BE=D0=B9=20=D0=B8=20=D1=83=D0=B4=D0=B0=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=D0=BC=20=D0=BF=D0=BE=D0=BB=D1=8C=D0=B7=D0=BE?= =?UTF-8?q?=D0=B2=D0=B0=D1=82=D0=B5=D0=BB=D0=B5=D0=B9.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_baton_005.py | 487 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 487 insertions(+) create mode 100644 tests/test_baton_005.py diff --git a/tests/test_baton_005.py b/tests/test_baton_005.py new file mode 100644 index 0000000..1e43810 --- /dev/null +++ b/tests/test_baton_005.py @@ -0,0 +1,487 @@ +""" +Tests for BATON-005: Admin panel — user creation, password change, block/unblock, delete. + +Acceptance criteria: +1. Создание пользователя — пользователь появляется в БД (GET /admin/users) +2. Смена пароля — endpoint возвращает ok, 404 для несуществующего пользователя +3. Блокировка — заблокированный пользователь не может отправить сигнал (403) +4. Разблокировка — восстанавливает доступ (сигнал снова проходит) +5. Удаление — пользователь исчезает из GET /admin/users, возвращается 204 +6. Защита: неавторизованный запрос к /admin/* возвращает 401 +7. Отсутствие регрессии с основным функционалом +""" +from __future__ import annotations + +import os +import re +from pathlib import Path + +os.environ.setdefault("BOT_TOKEN", "test-bot-token") +os.environ.setdefault("CHAT_ID", "-1001234567890") +os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") +os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") +os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") +os.environ.setdefault("ADMIN_TOKEN", "test-admin-token") + +import pytest + +from tests.conftest import make_app_client + +PROJECT_ROOT = Path(__file__).parent.parent +NGINX_CONF = PROJECT_ROOT / "nginx" / "baton.conf" + +ADMIN_HEADERS = {"Authorization": "Bearer test-admin-token"} +WRONG_HEADERS = {"Authorization": "Bearer wrong-token"} + + +# --------------------------------------------------------------------------- +# Criterion 6 — Unauthorised requests to /admin/* return 401 +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_admin_list_users_without_token_returns_401() -> None: + """GET /admin/users без Authorization header должен вернуть 401.""" + async with make_app_client() as client: + resp = await client.get("/admin/users") + assert resp.status_code == 401 + + +@pytest.mark.asyncio +async def test_admin_list_users_wrong_token_returns_401() -> None: + """GET /admin/users с неверным токеном должен вернуть 401.""" + async with make_app_client() as client: + resp = await client.get("/admin/users", headers=WRONG_HEADERS) + assert resp.status_code == 401 + + +@pytest.mark.asyncio +async def test_admin_create_user_without_token_returns_401() -> None: + """POST /admin/users без токена должен вернуть 401.""" + async with make_app_client() as client: + resp = await client.post( + "/admin/users", + json={"uuid": "unauth-uuid-001", "name": "Ghost"}, + ) + assert resp.status_code == 401 + + +@pytest.mark.asyncio +async def test_admin_set_password_without_token_returns_401() -> None: + """PUT /admin/users/1/password без токена должен вернуть 401.""" + async with make_app_client() as client: + resp = await client.put( + "/admin/users/1/password", + json={"password": "newpass"}, + ) + assert resp.status_code == 401 + + +@pytest.mark.asyncio +async def test_admin_block_user_without_token_returns_401() -> None: + """PUT /admin/users/1/block без токена должен вернуть 401.""" + async with make_app_client() as client: + resp = await client.put( + "/admin/users/1/block", + json={"is_blocked": True}, + ) + assert resp.status_code == 401 + + +@pytest.mark.asyncio +async def test_admin_delete_user_without_token_returns_401() -> None: + """DELETE /admin/users/1 без токена должен вернуть 401.""" + async with make_app_client() as client: + resp = await client.delete("/admin/users/1") + assert resp.status_code == 401 + + +# --------------------------------------------------------------------------- +# Criterion 1 — Create user: appears in DB +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_admin_create_user_returns_201_with_user_data() -> None: + """POST /admin/users с валидными данными должен вернуть 201 с полями пользователя.""" + async with make_app_client() as client: + resp = await client.post( + "/admin/users", + json={"uuid": "create-uuid-001", "name": "Alice Admin"}, + headers=ADMIN_HEADERS, + ) + assert resp.status_code == 201 + data = resp.json() + assert data["uuid"] == "create-uuid-001" + assert data["name"] == "Alice Admin" + assert data["id"] > 0 + assert data["is_blocked"] is False + + +@pytest.mark.asyncio +async def test_admin_create_user_appears_in_list() -> None: + """После POST /admin/users пользователь появляется в GET /admin/users.""" + async with make_app_client() as client: + await client.post( + "/admin/users", + json={"uuid": "create-uuid-002", "name": "Bob Admin"}, + headers=ADMIN_HEADERS, + ) + resp = await client.get("/admin/users", headers=ADMIN_HEADERS) + + assert resp.status_code == 200 + users = resp.json() + uuids = [u["uuid"] for u in users] + assert "create-uuid-002" in uuids + + +@pytest.mark.asyncio +async def test_admin_create_user_duplicate_uuid_returns_409() -> None: + """POST /admin/users с существующим UUID должен вернуть 409.""" + async with make_app_client() as client: + await client.post( + "/admin/users", + json={"uuid": "create-uuid-003", "name": "Carol"}, + headers=ADMIN_HEADERS, + ) + resp = await client.post( + "/admin/users", + json={"uuid": "create-uuid-003", "name": "Carol Duplicate"}, + headers=ADMIN_HEADERS, + ) + assert resp.status_code == 409 + + +@pytest.mark.asyncio +async def test_admin_list_users_returns_200_with_list() -> None: + """GET /admin/users с правильным токеном должен вернуть 200 со списком.""" + async with make_app_client() as client: + resp = await client.get("/admin/users", headers=ADMIN_HEADERS) + assert resp.status_code == 200 + assert isinstance(resp.json(), list) + + +# --------------------------------------------------------------------------- +# Criterion 2 — Password change +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_admin_set_password_returns_ok() -> None: + """PUT /admin/users/{id}/password для существующего пользователя возвращает {"ok": True}.""" + async with make_app_client() as client: + create_resp = await client.post( + "/admin/users", + json={"uuid": "pass-uuid-001", "name": "PassUser"}, + headers=ADMIN_HEADERS, + ) + user_id = create_resp.json()["id"] + + resp = await client.put( + f"/admin/users/{user_id}/password", + json={"password": "newpassword123"}, + headers=ADMIN_HEADERS, + ) + assert resp.status_code == 200 + assert resp.json() == {"ok": True} + + +@pytest.mark.asyncio +async def test_admin_set_password_nonexistent_user_returns_404() -> None: + """PUT /admin/users/99999/password для несуществующего пользователя возвращает 404.""" + async with make_app_client() as client: + resp = await client.put( + "/admin/users/99999/password", + json={"password": "somepassword"}, + headers=ADMIN_HEADERS, + ) + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_admin_set_password_user_still_accessible_after_change() -> None: + """Пользователь остаётся доступен в GET /admin/users после смены пароля.""" + async with make_app_client() as client: + create_resp = await client.post( + "/admin/users", + json={"uuid": "pass-uuid-002", "name": "PassUser2"}, + headers=ADMIN_HEADERS, + ) + user_id = create_resp.json()["id"] + + await client.put( + f"/admin/users/{user_id}/password", + json={"password": "updatedpass"}, + headers=ADMIN_HEADERS, + ) + + list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS) + + uuids = [u["uuid"] for u in list_resp.json()] + assert "pass-uuid-002" in uuids + + +# --------------------------------------------------------------------------- +# Criterion 3 — Block user: blocked user cannot send signal +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_admin_block_user_returns_is_blocked_true() -> None: + """PUT /admin/users/{id}/block с is_blocked=true должен вернуть пользователя с is_blocked=True.""" + async with make_app_client() as client: + create_resp = await client.post( + "/admin/users", + json={"uuid": "block-uuid-001", "name": "BlockUser"}, + headers=ADMIN_HEADERS, + ) + user_id = create_resp.json()["id"] + + resp = await client.put( + f"/admin/users/{user_id}/block", + json={"is_blocked": True}, + headers=ADMIN_HEADERS, + ) + assert resp.status_code == 200 + assert resp.json()["is_blocked"] is True + + +@pytest.mark.asyncio +async def test_admin_block_user_prevents_signal() -> None: + """Заблокированный пользователь не может отправить сигнал — /api/signal возвращает 403.""" + async with make_app_client() as client: + create_resp = await client.post( + "/admin/users", + json={"uuid": "block-uuid-002", "name": "BlockSignalUser"}, + headers=ADMIN_HEADERS, + ) + user_id = create_resp.json()["id"] + user_uuid = create_resp.json()["uuid"] + + await client.put( + f"/admin/users/{user_id}/block", + json={"is_blocked": True}, + headers=ADMIN_HEADERS, + ) + + signal_resp = await client.post( + "/api/signal", + json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None}, + ) + assert signal_resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_admin_block_nonexistent_user_returns_404() -> None: + """PUT /admin/users/99999/block для несуществующего пользователя возвращает 404.""" + async with make_app_client() as client: + resp = await client.put( + "/admin/users/99999/block", + json={"is_blocked": True}, + headers=ADMIN_HEADERS, + ) + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# Criterion 4 — Unblock user: restores access +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_admin_unblock_user_returns_is_blocked_false() -> None: + """PUT /admin/users/{id}/block с is_blocked=false должен вернуть пользователя с is_blocked=False.""" + async with make_app_client() as client: + create_resp = await client.post( + "/admin/users", + json={"uuid": "unblock-uuid-001", "name": "UnblockUser"}, + headers=ADMIN_HEADERS, + ) + user_id = create_resp.json()["id"] + + await client.put( + f"/admin/users/{user_id}/block", + json={"is_blocked": True}, + headers=ADMIN_HEADERS, + ) + + resp = await client.put( + f"/admin/users/{user_id}/block", + json={"is_blocked": False}, + headers=ADMIN_HEADERS, + ) + assert resp.status_code == 200 + assert resp.json()["is_blocked"] is False + + +@pytest.mark.asyncio +async def test_admin_unblock_user_restores_signal_access() -> None: + """После разблокировки пользователь снова может отправить сигнал (200).""" + async with make_app_client() as client: + create_resp = await client.post( + "/admin/users", + json={"uuid": "unblock-uuid-002", "name": "UnblockSignalUser"}, + headers=ADMIN_HEADERS, + ) + user_id = create_resp.json()["id"] + user_uuid = create_resp.json()["uuid"] + + # Блокируем + await client.put( + f"/admin/users/{user_id}/block", + json={"is_blocked": True}, + headers=ADMIN_HEADERS, + ) + + # Разблокируем + await client.put( + f"/admin/users/{user_id}/block", + json={"is_blocked": False}, + headers=ADMIN_HEADERS, + ) + + # Сигнал должен пройти + signal_resp = await client.post( + "/api/signal", + json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None}, + ) + assert signal_resp.status_code == 200 + assert signal_resp.json()["status"] == "ok" + + +# --------------------------------------------------------------------------- +# Criterion 5 — Delete user: disappears from DB +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_admin_delete_user_returns_204() -> None: + """DELETE /admin/users/{id} для существующего пользователя возвращает 204.""" + async with make_app_client() as client: + create_resp = await client.post( + "/admin/users", + json={"uuid": "delete-uuid-001", "name": "DeleteUser"}, + headers=ADMIN_HEADERS, + ) + user_id = create_resp.json()["id"] + + resp = await client.delete( + f"/admin/users/{user_id}", + headers=ADMIN_HEADERS, + ) + assert resp.status_code == 204 + + +@pytest.mark.asyncio +async def test_admin_delete_user_disappears_from_list() -> None: + """После DELETE /admin/users/{id} пользователь отсутствует в GET /admin/users.""" + async with make_app_client() as client: + create_resp = await client.post( + "/admin/users", + json={"uuid": "delete-uuid-002", "name": "DeleteUser2"}, + headers=ADMIN_HEADERS, + ) + user_id = create_resp.json()["id"] + + await client.delete( + f"/admin/users/{user_id}", + headers=ADMIN_HEADERS, + ) + + list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS) + + uuids = [u["uuid"] for u in list_resp.json()] + assert "delete-uuid-002" not in uuids + + +@pytest.mark.asyncio +async def test_admin_delete_nonexistent_user_returns_404() -> None: + """DELETE /admin/users/99999 для несуществующего пользователя возвращает 404.""" + async with make_app_client() as client: + resp = await client.delete( + "/admin/users/99999", + headers=ADMIN_HEADERS, + ) + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# nginx config — location /admin/users block (BATON-006 fix) +# --------------------------------------------------------------------------- + + +def test_nginx_conf_has_admin_users_location_block() -> None: + """nginx/baton.conf должен содержать блок location /admin/users.""" + content = NGINX_CONF.read_text(encoding="utf-8") + assert re.search(r"location\s+/admin/users\b", content), ( + "nginx/baton.conf не содержит блок location /admin/users — " + "запросы к admin API будут попадать в location / и возвращать 404" + ) + + +def test_nginx_conf_admin_users_location_proxies_to_fastapi() -> None: + """Блок location /admin/users должен делать proxy_pass на 127.0.0.1:8000.""" + content = NGINX_CONF.read_text(encoding="utf-8") + admin_block = re.search( + r"location\s+/admin/users\s*\{([^}]+)\}", content, re.DOTALL + ) + assert admin_block is not None, "Блок location /admin/users { ... } не найден" + assert re.search(r"proxy_pass\s+http://127\.0\.0\.1:8000", admin_block.group(1)), ( + "Блок location /admin/users не содержит proxy_pass http://127.0.0.1:8000" + ) + + +def test_nginx_conf_admin_users_location_before_root_location() -> None: + """location /admin/users должен находиться в nginx.conf до location / для корректного prefix-matching.""" + content = NGINX_CONF.read_text(encoding="utf-8") + admin_pos = content.find("location /admin/users") + root_pos = re.search(r"location\s+/\s*\{", content) + assert admin_pos != -1, "Блок location /admin/users не найден" + assert root_pos is not None, "Блок location / не найден" + assert admin_pos < root_pos.start(), ( + "location /admin/users должен быть определён ДО location / в nginx.conf" + ) + + +# --------------------------------------------------------------------------- +# Criterion 7 — No regression with main functionality +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_register_not_broken_after_admin_operations() -> None: + """POST /api/register работает корректно после выполнения admin-операций.""" + async with make_app_client() as client: + # Admin операции + await client.post( + "/admin/users", + json={"uuid": "regress-admin-uuid-001", "name": "AdminCreated"}, + headers=ADMIN_HEADERS, + ) + + # Основной функционал + resp = await client.post( + "/api/register", + json={"uuid": "regress-user-uuid-001", "name": "RegularUser"}, + ) + assert resp.status_code == 200 + assert resp.json()["uuid"] == "regress-user-uuid-001" + + +@pytest.mark.asyncio +async def test_signal_from_unblocked_user_succeeds() -> None: + """Незаблокированный пользователь, созданный через admin API, может отправить сигнал.""" + async with make_app_client() as client: + create_resp = await client.post( + "/admin/users", + json={"uuid": "regress-signal-uuid-001", "name": "SignalUser"}, + headers=ADMIN_HEADERS, + ) + user_uuid = create_resp.json()["uuid"] + + signal_resp = await client.post( + "/api/signal", + json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None}, + ) + assert signal_resp.status_code == 200 + assert signal_resp.json()["status"] == "ok"