baton/tests/test_baton_005.py
2026-03-21 09:26:57 +02:00

527 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tests for BATON-005: Admin panel — user creation, password change, block/unblock, delete.
Acceptance criteria:
1. Создание пользователя — пользователь появляется в БД (GET /admin/users)
2. Смена пароля — endpoint возвращает ok, 404 для несуществующего пользователя
3. Блокировка — заблокированный пользователь не может отправить сигнал (403)
4. Разблокировка — восстанавливает доступ (сигнал снова проходит)
5. Удаление — пользователь исчезает из GET /admin/users, возвращается 204
6. Защита: неавторизованный запрос к /admin/* возвращает 401
7. Отсутствие регрессии с основным функционалом
BATON-SEC-003: POST /api/signal now requires Authorization: Bearer <api_key>.
Tests 3 and 4 (block/unblock + signal) use /api/register to obtain an api_key,
then admin block/unblock the user by their DB id.
"""
from __future__ import annotations
import os
import re
from pathlib import Path
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
import pytest
from tests.conftest import make_app_client
PROJECT_ROOT = Path(__file__).parent.parent
NGINX_CONF = PROJECT_ROOT / "nginx" / "baton.conf"
ADMIN_HEADERS = {"Authorization": "Bearer test-admin-token"}
WRONG_HEADERS = {"Authorization": "Bearer wrong-token"}
# Valid UUID v4 for signal-related tests (registered via /api/register)
_UUID_BLOCK = "f0000001-0000-4000-8000-000000000001"
_UUID_UNBLOCK = "f0000002-0000-4000-8000-000000000002"
_UUID_SIG_OK = "f0000003-0000-4000-8000-000000000003"
# Valid UUID v4 for admin-only tests (POST /admin/users, no /api/register call)
_UUID_ADM_UNAUTH = "e0000000-0000-4000-8000-000000000000"
_UUID_ADM_CREATE_1 = "e0000001-0000-4000-8000-000000000001"
_UUID_ADM_CREATE_2 = "e0000002-0000-4000-8000-000000000002"
_UUID_ADM_CREATE_3 = "e0000003-0000-4000-8000-000000000003"
_UUID_ADM_PASS_1 = "e0000004-0000-4000-8000-000000000004"
_UUID_ADM_PASS_2 = "e0000005-0000-4000-8000-000000000005"
_UUID_ADM_BLOCK = "e0000006-0000-4000-8000-000000000006"
_UUID_ADM_UNBLOCK = "e0000007-0000-4000-8000-000000000007"
_UUID_ADM_DELETE_1 = "e0000008-0000-4000-8000-000000000008"
_UUID_ADM_DELETE_2 = "e0000009-0000-4000-8000-000000000009"
_UUID_ADM_REGRESS = "e000000a-0000-4000-8000-000000000010"
# ---------------------------------------------------------------------------
# Criterion 6 — Unauthorised requests to /admin/* return 401
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_admin_list_users_without_token_returns_401() -> None:
"""GET /admin/users без Authorization header должен вернуть 401."""
async with make_app_client() as client:
resp = await client.get("/admin/users")
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_admin_list_users_wrong_token_returns_401() -> None:
"""GET /admin/users с неверным токеном должен вернуть 401."""
async with make_app_client() as client:
resp = await client.get("/admin/users", headers=WRONG_HEADERS)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_admin_create_user_without_token_returns_401() -> None:
"""POST /admin/users без токена должен вернуть 401."""
async with make_app_client() as client:
resp = await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_UNAUTH, "name": "Ghost"},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_admin_set_password_without_token_returns_401() -> None:
"""PUT /admin/users/1/password без токена должен вернуть 401."""
async with make_app_client() as client:
resp = await client.put(
"/admin/users/1/password",
json={"password": "newpass"},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_admin_block_user_without_token_returns_401() -> None:
"""PUT /admin/users/1/block без токена должен вернуть 401."""
async with make_app_client() as client:
resp = await client.put(
"/admin/users/1/block",
json={"is_blocked": True},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_admin_delete_user_without_token_returns_401() -> None:
"""DELETE /admin/users/1 без токена должен вернуть 401."""
async with make_app_client() as client:
resp = await client.delete("/admin/users/1")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# Criterion 1 — Create user: appears in DB
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_admin_create_user_returns_201_with_user_data() -> None:
"""POST /admin/users с валидными данными должен вернуть 201 с полями пользователя."""
async with make_app_client() as client:
resp = await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_CREATE_1, "name": "Alice Admin"},
headers=ADMIN_HEADERS,
)
assert resp.status_code == 201
data = resp.json()
assert data["uuid"] == _UUID_ADM_CREATE_1
assert data["name"] == "Alice Admin"
assert data["id"] > 0
assert data["is_blocked"] is False
@pytest.mark.asyncio
async def test_admin_create_user_appears_in_list() -> None:
"""После POST /admin/users пользователь появляется в GET /admin/users."""
async with make_app_client() as client:
await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_CREATE_2, "name": "Bob Admin"},
headers=ADMIN_HEADERS,
)
resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
assert resp.status_code == 200
users = resp.json()
uuids = [u["uuid"] for u in users]
assert _UUID_ADM_CREATE_2 in uuids
@pytest.mark.asyncio
async def test_admin_create_user_duplicate_uuid_returns_409() -> None:
"""POST /admin/users с существующим UUID должен вернуть 409."""
async with make_app_client() as client:
await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_CREATE_3, "name": "Carol"},
headers=ADMIN_HEADERS,
)
resp = await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_CREATE_3, "name": "Carol Duplicate"},
headers=ADMIN_HEADERS,
)
assert resp.status_code == 409
@pytest.mark.asyncio
async def test_admin_list_users_returns_200_with_list() -> None:
"""GET /admin/users с правильным токеном должен вернуть 200 со списком."""
async with make_app_client() as client:
resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
assert resp.status_code == 200
assert isinstance(resp.json(), list)
# ---------------------------------------------------------------------------
# Criterion 2 — Password change
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_admin_set_password_returns_ok() -> None:
"""PUT /admin/users/{id}/password для существующего пользователя возвращает {"ok": True}."""
async with make_app_client() as client:
create_resp = await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_PASS_1, "name": "PassUser"},
headers=ADMIN_HEADERS,
)
user_id = create_resp.json()["id"]
resp = await client.put(
f"/admin/users/{user_id}/password",
json={"password": "newpassword123"},
headers=ADMIN_HEADERS,
)
assert resp.status_code == 200
assert resp.json() == {"ok": True}
@pytest.mark.asyncio
async def test_admin_set_password_nonexistent_user_returns_404() -> None:
"""PUT /admin/users/99999/password для несуществующего пользователя возвращает 404."""
async with make_app_client() as client:
resp = await client.put(
"/admin/users/99999/password",
json={"password": "somepassword"},
headers=ADMIN_HEADERS,
)
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_admin_set_password_user_still_accessible_after_change() -> None:
"""Пользователь остаётся доступен в GET /admin/users после смены пароля."""
async with make_app_client() as client:
create_resp = await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_PASS_2, "name": "PassUser2"},
headers=ADMIN_HEADERS,
)
user_id = create_resp.json()["id"]
await client.put(
f"/admin/users/{user_id}/password",
json={"password": "updatedpass"},
headers=ADMIN_HEADERS,
)
list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
uuids = [u["uuid"] for u in list_resp.json()]
assert _UUID_ADM_PASS_2 in uuids
# ---------------------------------------------------------------------------
# Criterion 3 — Block user: blocked user cannot send signal
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_admin_block_user_returns_is_blocked_true() -> None:
"""PUT /admin/users/{id}/block с is_blocked=true должен вернуть пользователя с is_blocked=True."""
async with make_app_client() as client:
create_resp = await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_BLOCK, "name": "BlockUser"},
headers=ADMIN_HEADERS,
)
user_id = create_resp.json()["id"]
resp = await client.put(
f"/admin/users/{user_id}/block",
json={"is_blocked": True},
headers=ADMIN_HEADERS,
)
assert resp.status_code == 200
assert resp.json()["is_blocked"] is True
@pytest.mark.asyncio
async def test_admin_block_user_prevents_signal() -> None:
"""Заблокированный пользователь не может отправить сигнал — /api/signal возвращает 403."""
async with make_app_client() as client:
# Регистрируем через /api/register чтобы получить api_key
reg_resp = await client.post(
"/api/register",
json={"uuid": _UUID_BLOCK, "name": "BlockSignalUser"},
)
assert reg_resp.status_code == 200
api_key = reg_resp.json()["api_key"]
user_uuid = reg_resp.json()["uuid"]
# Находим ID пользователя
users_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
user = next(u for u in users_resp.json() if u["uuid"] == user_uuid)
user_id = user["id"]
# Блокируем
await client.put(
f"/admin/users/{user_id}/block",
json={"is_blocked": True},
headers=ADMIN_HEADERS,
)
# Заблокированный пользователь должен получить 403
signal_resp = await client.post(
"/api/signal",
json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None},
headers={"Authorization": f"Bearer {api_key}"},
)
assert signal_resp.status_code == 403
@pytest.mark.asyncio
async def test_admin_block_nonexistent_user_returns_404() -> None:
"""PUT /admin/users/99999/block для несуществующего пользователя возвращает 404."""
async with make_app_client() as client:
resp = await client.put(
"/admin/users/99999/block",
json={"is_blocked": True},
headers=ADMIN_HEADERS,
)
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# Criterion 4 — Unblock user: restores access
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_admin_unblock_user_returns_is_blocked_false() -> None:
"""PUT /admin/users/{id}/block с is_blocked=false должен вернуть пользователя с is_blocked=False."""
async with make_app_client() as client:
create_resp = await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_UNBLOCK, "name": "UnblockUser"},
headers=ADMIN_HEADERS,
)
user_id = create_resp.json()["id"]
await client.put(
f"/admin/users/{user_id}/block",
json={"is_blocked": True},
headers=ADMIN_HEADERS,
)
resp = await client.put(
f"/admin/users/{user_id}/block",
json={"is_blocked": False},
headers=ADMIN_HEADERS,
)
assert resp.status_code == 200
assert resp.json()["is_blocked"] is False
@pytest.mark.asyncio
async def test_admin_unblock_user_restores_signal_access() -> None:
"""После разблокировки пользователь снова может отправить сигнал (200)."""
async with make_app_client() as client:
# Регистрируем через /api/register чтобы получить api_key
reg_resp = await client.post(
"/api/register",
json={"uuid": _UUID_UNBLOCK, "name": "UnblockSignalUser"},
)
assert reg_resp.status_code == 200
api_key = reg_resp.json()["api_key"]
user_uuid = reg_resp.json()["uuid"]
# Находим ID
users_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
user = next(u for u in users_resp.json() if u["uuid"] == user_uuid)
user_id = user["id"]
# Блокируем
await client.put(
f"/admin/users/{user_id}/block",
json={"is_blocked": True},
headers=ADMIN_HEADERS,
)
# Разблокируем
await client.put(
f"/admin/users/{user_id}/block",
json={"is_blocked": False},
headers=ADMIN_HEADERS,
)
# Сигнал должен пройти
signal_resp = await client.post(
"/api/signal",
json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None},
headers={"Authorization": f"Bearer {api_key}"},
)
assert signal_resp.status_code == 200
assert signal_resp.json()["status"] == "ok"
# ---------------------------------------------------------------------------
# Criterion 5 — Delete user: disappears from DB
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_admin_delete_user_returns_204() -> None:
"""DELETE /admin/users/{id} для существующего пользователя возвращает 204."""
async with make_app_client() as client:
create_resp = await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_DELETE_1, "name": "DeleteUser"},
headers=ADMIN_HEADERS,
)
user_id = create_resp.json()["id"]
resp = await client.delete(
f"/admin/users/{user_id}",
headers=ADMIN_HEADERS,
)
assert resp.status_code == 204
@pytest.mark.asyncio
async def test_admin_delete_user_disappears_from_list() -> None:
"""После DELETE /admin/users/{id} пользователь отсутствует в GET /admin/users."""
async with make_app_client() as client:
create_resp = await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_DELETE_2, "name": "DeleteUser2"},
headers=ADMIN_HEADERS,
)
user_id = create_resp.json()["id"]
await client.delete(
f"/admin/users/{user_id}",
headers=ADMIN_HEADERS,
)
list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
uuids = [u["uuid"] for u in list_resp.json()]
assert _UUID_ADM_DELETE_2 not in uuids
@pytest.mark.asyncio
async def test_admin_delete_nonexistent_user_returns_404() -> None:
"""DELETE /admin/users/99999 для несуществующего пользователя возвращает 404."""
async with make_app_client() as client:
resp = await client.delete(
"/admin/users/99999",
headers=ADMIN_HEADERS,
)
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# nginx config — location /admin/users block (BATON-006 fix)
# ---------------------------------------------------------------------------
def test_nginx_conf_has_admin_users_location_block() -> None:
"""nginx/baton.conf должен содержать блок location /admin/users."""
content = NGINX_CONF.read_text(encoding="utf-8")
assert re.search(r"location\s+/admin/users\b", content), (
"nginx/baton.conf не содержит блок location /admin/users — "
"запросы к admin API будут попадать в location / и возвращать 404"
)
def test_nginx_conf_admin_users_location_proxies_to_fastapi() -> None:
"""Блок location /admin/users должен делать proxy_pass на 127.0.0.1:8000."""
content = NGINX_CONF.read_text(encoding="utf-8")
admin_block = re.search(
r"location\s+/admin/users\s*\{([^}]+)\}", content, re.DOTALL
)
assert admin_block is not None, "Блок location /admin/users { ... } не найден"
assert re.search(r"proxy_pass\s+http://127\.0\.0\.1:8000", admin_block.group(1)), (
"Блок location /admin/users не содержит proxy_pass http://127.0.0.1:8000"
)
def test_nginx_conf_admin_users_location_before_root_location() -> None:
"""location /admin/users должен находиться в nginx.conf до location / для корректного prefix-matching."""
content = NGINX_CONF.read_text(encoding="utf-8")
admin_pos = content.find("location /admin/users")
root_pos = re.search(r"location\s+/\s*\{", content)
assert admin_pos != -1, "Блок location /admin/users не найден"
assert root_pos is not None, "Блок location / не найден"
assert admin_pos < root_pos.start(), (
"location /admin/users должен быть определён ДО location / в nginx.conf"
)
# ---------------------------------------------------------------------------
# Criterion 7 — No regression with main functionality
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_not_broken_after_admin_operations() -> None:
"""POST /api/register работает корректно после выполнения admin-операций."""
async with make_app_client() as client:
# Admin операции
await client.post(
"/admin/users",
json={"uuid": _UUID_ADM_REGRESS, "name": "AdminCreated"},
headers=ADMIN_HEADERS,
)
# Основной функционал
resp = await client.post(
"/api/register",
json={"uuid": _UUID_SIG_OK, "name": "RegularUser"},
)
assert resp.status_code == 200
assert resp.json()["uuid"] == _UUID_SIG_OK
@pytest.mark.asyncio
async def test_signal_from_registered_unblocked_user_succeeds() -> None:
"""Зарегистрированный незаблокированный пользователь может отправить сигнал."""
async with make_app_client() as client:
reg_resp = await client.post(
"/api/register",
json={"uuid": _UUID_SIG_OK, "name": "SignalUser"},
)
assert reg_resp.status_code == 200
api_key = reg_resp.json()["api_key"]
user_uuid = reg_resp.json()["uuid"]
signal_resp = await client.post(
"/api/signal",
json={"user_id": user_uuid, "timestamp": 1700000000000, "geo": None},
headers={"Authorization": f"Bearer {api_key}"},
)
assert signal_resp.status_code == 200
assert signal_resp.json()["status"] == "ok"