514 lines
20 KiB
Python
514 lines
20 KiB
Python
"""
|
||
Tests for BATON-005: Admin panel — user creation, password change, block/unblock, delete.
|
||
|
||
Acceptance criteria:
|
||
1. Создание пользователя — пользователь появляется в БД (GET /admin/users)
|
||
2. Смена пароля — endpoint возвращает ok, 404 для несуществующего пользователя
|
||
3. Блокировка — заблокированный пользователь не может отправить сигнал (403)
|
||
4. Разблокировка — восстанавливает доступ (сигнал снова проходит)
|
||
5. Удаление — пользователь исчезает из GET /admin/users, возвращается 204
|
||
6. Защита: неавторизованный запрос к /admin/* возвращает 401
|
||
7. Отсутствие регрессии с основным функционалом
|
||
|
||
BATON-SEC-003: POST /api/signal now requires Authorization: Bearer <api_key>.
|
||
Tests 3 and 4 (block/unblock + signal) use /api/register to obtain an api_key,
|
||
then admin block/unblock the user by their DB id.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import re
|
||
from pathlib import Path
|
||
|
||
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
|
||
os.environ.setdefault("CHAT_ID", "-1001234567890")
|
||
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
|
||
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
|
||
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
|
||
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
|
||
|
||
import pytest
|
||
|
||
from tests.conftest import make_app_client
|
||
|
||
PROJECT_ROOT = Path(__file__).parent.parent
|
||
NGINX_CONF = PROJECT_ROOT / "nginx" / "baton.conf"
|
||
|
||
ADMIN_HEADERS = {"Authorization": "Bearer test-admin-token"}
|
||
WRONG_HEADERS = {"Authorization": "Bearer wrong-token"}
|
||
|
||
# Valid UUID v4 for signal-related tests (registered via /api/register)
|
||
_UUID_BLOCK = "f0000001-0000-4000-8000-000000000001"
|
||
_UUID_UNBLOCK = "f0000002-0000-4000-8000-000000000002"
|
||
_UUID_SIG_OK = "f0000003-0000-4000-8000-000000000003"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 6 — Unauthorised requests to /admin/* return 401
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_list_users_without_token_returns_401() -> None:
|
||
"""GET /admin/users без Authorization header должен вернуть 401."""
|
||
async with make_app_client() as client:
|
||
resp = await client.get("/admin/users")
|
||
assert resp.status_code == 401
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_list_users_wrong_token_returns_401() -> None:
|
||
"""GET /admin/users с неверным токеном должен вернуть 401."""
|
||
async with make_app_client() as client:
|
||
resp = await client.get("/admin/users", headers=WRONG_HEADERS)
|
||
assert resp.status_code == 401
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_create_user_without_token_returns_401() -> None:
|
||
"""POST /admin/users без токена должен вернуть 401."""
|
||
async with make_app_client() as client:
|
||
resp = await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "unauth-uuid-001", "name": "Ghost"},
|
||
)
|
||
assert resp.status_code == 401
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_set_password_without_token_returns_401() -> None:
|
||
"""PUT /admin/users/1/password без токена должен вернуть 401."""
|
||
async with make_app_client() as client:
|
||
resp = await client.put(
|
||
"/admin/users/1/password",
|
||
json={"password": "newpass"},
|
||
)
|
||
assert resp.status_code == 401
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_block_user_without_token_returns_401() -> None:
|
||
"""PUT /admin/users/1/block без токена должен вернуть 401."""
|
||
async with make_app_client() as client:
|
||
resp = await client.put(
|
||
"/admin/users/1/block",
|
||
json={"is_blocked": True},
|
||
)
|
||
assert resp.status_code == 401
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_delete_user_without_token_returns_401() -> None:
|
||
"""DELETE /admin/users/1 без токена должен вернуть 401."""
|
||
async with make_app_client() as client:
|
||
resp = await client.delete("/admin/users/1")
|
||
assert resp.status_code == 401
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 1 — Create user: appears in DB
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_create_user_returns_201_with_user_data() -> None:
|
||
"""POST /admin/users с валидными данными должен вернуть 201 с полями пользователя."""
|
||
async with make_app_client() as client:
|
||
resp = await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "create-uuid-001", "name": "Alice Admin"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
assert resp.status_code == 201
|
||
data = resp.json()
|
||
assert data["uuid"] == "create-uuid-001"
|
||
assert data["name"] == "Alice Admin"
|
||
assert data["id"] > 0
|
||
assert data["is_blocked"] is False
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_create_user_appears_in_list() -> None:
|
||
"""После POST /admin/users пользователь появляется в GET /admin/users."""
|
||
async with make_app_client() as client:
|
||
await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "create-uuid-002", "name": "Bob Admin"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
|
||
|
||
assert resp.status_code == 200
|
||
users = resp.json()
|
||
uuids = [u["uuid"] for u in users]
|
||
assert "create-uuid-002" in uuids
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_create_user_duplicate_uuid_returns_409() -> None:
|
||
"""POST /admin/users с существующим UUID должен вернуть 409."""
|
||
async with make_app_client() as client:
|
||
await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "create-uuid-003", "name": "Carol"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
resp = await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "create-uuid-003", "name": "Carol Duplicate"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
assert resp.status_code == 409
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_list_users_returns_200_with_list() -> None:
|
||
"""GET /admin/users с правильным токеном должен вернуть 200 со списком."""
|
||
async with make_app_client() as client:
|
||
resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
|
||
assert resp.status_code == 200
|
||
assert isinstance(resp.json(), list)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 2 — Password change
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_set_password_returns_ok() -> None:
|
||
"""PUT /admin/users/{id}/password для существующего пользователя возвращает {"ok": True}."""
|
||
async with make_app_client() as client:
|
||
create_resp = await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "pass-uuid-001", "name": "PassUser"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
user_id = create_resp.json()["id"]
|
||
|
||
resp = await client.put(
|
||
f"/admin/users/{user_id}/password",
|
||
json={"password": "newpassword123"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
assert resp.status_code == 200
|
||
assert resp.json() == {"ok": True}
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_set_password_nonexistent_user_returns_404() -> None:
|
||
"""PUT /admin/users/99999/password для несуществующего пользователя возвращает 404."""
|
||
async with make_app_client() as client:
|
||
resp = await client.put(
|
||
"/admin/users/99999/password",
|
||
json={"password": "somepassword"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
assert resp.status_code == 404
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_set_password_user_still_accessible_after_change() -> None:
|
||
"""Пользователь остаётся доступен в GET /admin/users после смены пароля."""
|
||
async with make_app_client() as client:
|
||
create_resp = await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "pass-uuid-002", "name": "PassUser2"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
user_id = create_resp.json()["id"]
|
||
|
||
await client.put(
|
||
f"/admin/users/{user_id}/password",
|
||
json={"password": "updatedpass"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
|
||
list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
|
||
|
||
uuids = [u["uuid"] for u in list_resp.json()]
|
||
assert "pass-uuid-002" in uuids
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 3 — Block user: blocked user cannot send signal
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_block_user_returns_is_blocked_true() -> None:
|
||
"""PUT /admin/users/{id}/block с is_blocked=true должен вернуть пользователя с is_blocked=True."""
|
||
async with make_app_client() as client:
|
||
create_resp = await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "block-uuid-001", "name": "BlockUser"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
user_id = create_resp.json()["id"]
|
||
|
||
resp = await client.put(
|
||
f"/admin/users/{user_id}/block",
|
||
json={"is_blocked": True},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
assert resp.status_code == 200
|
||
assert resp.json()["is_blocked"] is True
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_block_user_prevents_signal() -> None:
|
||
"""Заблокированный пользователь не может отправить сигнал — /api/signal возвращает 403."""
|
||
async with make_app_client() as client:
|
||
# Регистрируем через /api/register чтобы получить api_key
|
||
reg_resp = await client.post(
|
||
"/api/register",
|
||
json={"uuid": _UUID_BLOCK, "name": "BlockSignalUser"},
|
||
)
|
||
assert reg_resp.status_code == 200
|
||
api_key = reg_resp.json()["api_key"]
|
||
user_uuid = reg_resp.json()["uuid"]
|
||
|
||
# Находим ID пользователя
|
||
users_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
|
||
user = next(u for u in users_resp.json() if u["uuid"] == user_uuid)
|
||
user_id = user["id"]
|
||
|
||
# Блокируем
|
||
await client.put(
|
||
f"/admin/users/{user_id}/block",
|
||
json={"is_blocked": True},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
|
||
# Заблокированный пользователь должен получить 403
|
||
signal_resp = await client.post(
|
||
"/api/signal",
|
||
json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None},
|
||
headers={"Authorization": f"Bearer {api_key}"},
|
||
)
|
||
assert signal_resp.status_code == 403
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_block_nonexistent_user_returns_404() -> None:
|
||
"""PUT /admin/users/99999/block для несуществующего пользователя возвращает 404."""
|
||
async with make_app_client() as client:
|
||
resp = await client.put(
|
||
"/admin/users/99999/block",
|
||
json={"is_blocked": True},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
assert resp.status_code == 404
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 4 — Unblock user: restores access
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_unblock_user_returns_is_blocked_false() -> None:
|
||
"""PUT /admin/users/{id}/block с is_blocked=false должен вернуть пользователя с is_blocked=False."""
|
||
async with make_app_client() as client:
|
||
create_resp = await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "unblock-uuid-001", "name": "UnblockUser"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
user_id = create_resp.json()["id"]
|
||
|
||
await client.put(
|
||
f"/admin/users/{user_id}/block",
|
||
json={"is_blocked": True},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
|
||
resp = await client.put(
|
||
f"/admin/users/{user_id}/block",
|
||
json={"is_blocked": False},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
assert resp.status_code == 200
|
||
assert resp.json()["is_blocked"] is False
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_unblock_user_restores_signal_access() -> None:
|
||
"""После разблокировки пользователь снова может отправить сигнал (200)."""
|
||
async with make_app_client() as client:
|
||
# Регистрируем через /api/register чтобы получить api_key
|
||
reg_resp = await client.post(
|
||
"/api/register",
|
||
json={"uuid": _UUID_UNBLOCK, "name": "UnblockSignalUser"},
|
||
)
|
||
assert reg_resp.status_code == 200
|
||
api_key = reg_resp.json()["api_key"]
|
||
user_uuid = reg_resp.json()["uuid"]
|
||
|
||
# Находим ID
|
||
users_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
|
||
user = next(u for u in users_resp.json() if u["uuid"] == user_uuid)
|
||
user_id = user["id"]
|
||
|
||
# Блокируем
|
||
await client.put(
|
||
f"/admin/users/{user_id}/block",
|
||
json={"is_blocked": True},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
|
||
# Разблокируем
|
||
await client.put(
|
||
f"/admin/users/{user_id}/block",
|
||
json={"is_blocked": False},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
|
||
# Сигнал должен пройти
|
||
signal_resp = await client.post(
|
||
"/api/signal",
|
||
json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None},
|
||
headers={"Authorization": f"Bearer {api_key}"},
|
||
)
|
||
assert signal_resp.status_code == 200
|
||
assert signal_resp.json()["status"] == "ok"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 5 — Delete user: disappears from DB
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_delete_user_returns_204() -> None:
|
||
"""DELETE /admin/users/{id} для существующего пользователя возвращает 204."""
|
||
async with make_app_client() as client:
|
||
create_resp = await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "delete-uuid-001", "name": "DeleteUser"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
user_id = create_resp.json()["id"]
|
||
|
||
resp = await client.delete(
|
||
f"/admin/users/{user_id}",
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
assert resp.status_code == 204
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_delete_user_disappears_from_list() -> None:
|
||
"""После DELETE /admin/users/{id} пользователь отсутствует в GET /admin/users."""
|
||
async with make_app_client() as client:
|
||
create_resp = await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "delete-uuid-002", "name": "DeleteUser2"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
user_id = create_resp.json()["id"]
|
||
|
||
await client.delete(
|
||
f"/admin/users/{user_id}",
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
|
||
list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
|
||
|
||
uuids = [u["uuid"] for u in list_resp.json()]
|
||
assert "delete-uuid-002" not in uuids
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_admin_delete_nonexistent_user_returns_404() -> None:
|
||
"""DELETE /admin/users/99999 для несуществующего пользователя возвращает 404."""
|
||
async with make_app_client() as client:
|
||
resp = await client.delete(
|
||
"/admin/users/99999",
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
assert resp.status_code == 404
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# nginx config — location /admin/users block (BATON-006 fix)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def test_nginx_conf_has_admin_users_location_block() -> None:
|
||
"""nginx/baton.conf должен содержать блок location /admin/users."""
|
||
content = NGINX_CONF.read_text(encoding="utf-8")
|
||
assert re.search(r"location\s+/admin/users\b", content), (
|
||
"nginx/baton.conf не содержит блок location /admin/users — "
|
||
"запросы к admin API будут попадать в location / и возвращать 404"
|
||
)
|
||
|
||
|
||
def test_nginx_conf_admin_users_location_proxies_to_fastapi() -> None:
|
||
"""Блок location /admin/users должен делать proxy_pass на 127.0.0.1:8000."""
|
||
content = NGINX_CONF.read_text(encoding="utf-8")
|
||
admin_block = re.search(
|
||
r"location\s+/admin/users\s*\{([^}]+)\}", content, re.DOTALL
|
||
)
|
||
assert admin_block is not None, "Блок location /admin/users { ... } не найден"
|
||
assert re.search(r"proxy_pass\s+http://127\.0\.0\.1:8000", admin_block.group(1)), (
|
||
"Блок location /admin/users не содержит proxy_pass http://127.0.0.1:8000"
|
||
)
|
||
|
||
|
||
def test_nginx_conf_admin_users_location_before_root_location() -> None:
|
||
"""location /admin/users должен находиться в nginx.conf до location / для корректного prefix-matching."""
|
||
content = NGINX_CONF.read_text(encoding="utf-8")
|
||
admin_pos = content.find("location /admin/users")
|
||
root_pos = re.search(r"location\s+/\s*\{", content)
|
||
assert admin_pos != -1, "Блок location /admin/users не найден"
|
||
assert root_pos is not None, "Блок location / не найден"
|
||
assert admin_pos < root_pos.start(), (
|
||
"location /admin/users должен быть определён ДО location / в nginx.conf"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 7 — No regression with main functionality
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_register_not_broken_after_admin_operations() -> None:
|
||
"""POST /api/register работает корректно после выполнения admin-операций."""
|
||
async with make_app_client() as client:
|
||
# Admin операции
|
||
await client.post(
|
||
"/admin/users",
|
||
json={"uuid": "regress-admin-uuid-001", "name": "AdminCreated"},
|
||
headers=ADMIN_HEADERS,
|
||
)
|
||
|
||
# Основной функционал
|
||
resp = await client.post(
|
||
"/api/register",
|
||
json={"uuid": _UUID_SIG_OK, "name": "RegularUser"},
|
||
)
|
||
assert resp.status_code == 200
|
||
assert resp.json()["uuid"] == _UUID_SIG_OK
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_signal_from_registered_unblocked_user_succeeds() -> None:
|
||
"""Зарегистрированный незаблокированный пользователь может отправить сигнал."""
|
||
async with make_app_client() as client:
|
||
reg_resp = await client.post(
|
||
"/api/register",
|
||
json={"uuid": _UUID_SIG_OK, "name": "SignalUser"},
|
||
)
|
||
assert reg_resp.status_code == 200
|
||
api_key = reg_resp.json()["api_key"]
|
||
user_uuid = reg_resp.json()["uuid"]
|
||
|
||
signal_resp = await client.post(
|
||
"/api/signal",
|
||
json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None},
|
||
headers={"Authorization": f"Bearer {api_key}"},
|
||
)
|
||
assert signal_resp.status_code == 200
|
||
assert signal_resp.json()["status"] == "ok"
|