338 lines
14 KiB
Python
338 lines
14 KiB
Python
"""
|
||
Tests for BATON-BIZ-001: Login mechanism for approved users (dual-layer: AST + httpx functional).
|
||
|
||
Acceptance criteria:
|
||
1. Успешный login по login-полю → 200 + token
|
||
2. Успешный login по email-полю → 200 + token
|
||
3. Неверный пароль → 401 (без раскрытия причины)
|
||
4. Статус pending → 403 с читаемым сообщением
|
||
5. Статус rejected → 403 с читаемым сообщением
|
||
6. Rate limit — 6-й запрос подряд → 429
|
||
7. Guard middleware возвращает 401 без токена
|
||
8. Guard middleware пропускает валидный токен
|
||
|
||
Additional: error message uniformity, PBKDF2 verification.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
|
||
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
|
||
os.environ.setdefault("CHAT_ID", "-1001234567890")
|
||
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
|
||
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
|
||
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
|
||
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
|
||
os.environ.setdefault("ADMIN_CHAT_ID", "5694335584")
|
||
|
||
import pytest
|
||
from fastapi import HTTPException
|
||
from fastapi.security import HTTPAuthorizationCredentials
|
||
|
||
from backend import db
|
||
from backend.middleware import create_auth_token, verify_auth_token
|
||
from tests.conftest import make_app_client
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _register_auth(client, email: str, login: str, password: str) -> int:
|
||
"""Register via /api/auth/register, return registration id."""
|
||
resp = await client.post(
|
||
"/api/auth/register",
|
||
json={"email": email, "login": login, "password": password},
|
||
)
|
||
assert resp.status_code == 201, f"auth/register failed: {resp.text}"
|
||
reg = await db.get_registration_by_login_or_email(login)
|
||
assert reg is not None
|
||
return reg["id"]
|
||
|
||
|
||
async def _approve(reg_id: int) -> None:
|
||
await db.update_registration_status(reg_id, "approved")
|
||
|
||
|
||
async def _reject(reg_id: int) -> None:
|
||
await db.update_registration_status(reg_id, "rejected")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 1 — Успешный login по login-полю
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_login_by_login_field_returns_200_with_token():
|
||
"""Approved user can login using their login field → 200 + token."""
|
||
async with make_app_client() as client:
|
||
reg_id = await _register_auth(client, "alice@example.com", "alice", "password123")
|
||
await _approve(reg_id)
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "alice", "password": "password123"},
|
||
)
|
||
assert resp.status_code == 200
|
||
data = resp.json()
|
||
assert "token" in data
|
||
assert data["login"] == "alice"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_login_by_login_field_token_is_non_empty_string():
|
||
"""Token returned for approved login user is a non-empty string."""
|
||
async with make_app_client() as client:
|
||
reg_id = await _register_auth(client, "alice2@example.com", "alice2", "password123")
|
||
await _approve(reg_id)
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "alice2", "password": "password123"},
|
||
)
|
||
assert isinstance(resp.json()["token"], str)
|
||
assert len(resp.json()["token"]) > 0
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 2 — Успешный login по email-полю
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_login_by_email_field_returns_200_with_token():
|
||
"""Approved user can login using their email field → 200 + token."""
|
||
async with make_app_client() as client:
|
||
reg_id = await _register_auth(client, "bob@example.com", "bobuser", "securepass1")
|
||
await _approve(reg_id)
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "bob@example.com", "password": "securepass1"},
|
||
)
|
||
assert resp.status_code == 200
|
||
data = resp.json()
|
||
assert "token" in data
|
||
assert data["login"] == "bobuser"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_login_by_email_field_token_login_matches_registration():
|
||
"""Token response login field matches the login set during registration."""
|
||
async with make_app_client() as client:
|
||
reg_id = await _register_auth(client, "bob2@example.com", "bob2user", "securepass1")
|
||
await _approve(reg_id)
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "bob2@example.com", "password": "securepass1"},
|
||
)
|
||
assert resp.json()["login"] == "bob2user"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 3 — Неверный пароль → 401 без раскрытия причины
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_wrong_password_returns_401():
|
||
"""Wrong password returns 401 with generic message (no detail about which field failed)."""
|
||
async with make_app_client() as client:
|
||
reg_id = await _register_auth(client, "carol@example.com", "carol", "correctpass1")
|
||
await _approve(reg_id)
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "carol", "password": "wrongpassword"},
|
||
)
|
||
assert resp.status_code == 401
|
||
assert "Неверный логин или пароль" in resp.json()["detail"]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_nonexistent_user_returns_401_same_message_as_wrong_password():
|
||
"""Non-existent login returns same 401 message as wrong password (prevents user enumeration)."""
|
||
async with make_app_client() as client:
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "doesnotexist", "password": "anypassword"},
|
||
)
|
||
assert resp.status_code == 401
|
||
assert "Неверный логин или пароль" in resp.json()["detail"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 4 — Статус pending → 403 с читаемым сообщением
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_pending_user_login_returns_403():
|
||
"""User with pending status gets 403."""
|
||
async with make_app_client() as client:
|
||
await _register_auth(client, "dave@example.com", "dave", "password123")
|
||
# Status is 'pending' by default — no approval step
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "dave", "password": "password123"},
|
||
)
|
||
assert resp.status_code == 403
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_pending_user_login_403_message_is_human_readable():
|
||
"""403 message for pending user contains readable Russian text about the waiting status."""
|
||
async with make_app_client() as client:
|
||
await _register_auth(client, "dave2@example.com", "dave2", "password123")
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "dave2", "password": "password123"},
|
||
)
|
||
assert "ожидает" in resp.json()["detail"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 5 — Статус rejected → 403 с читаемым сообщением
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_rejected_user_login_returns_403():
|
||
"""User with rejected status gets 403."""
|
||
async with make_app_client() as client:
|
||
reg_id = await _register_auth(client, "eve@example.com", "evegirl", "password123")
|
||
await _reject(reg_id)
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "evegirl", "password": "password123"},
|
||
)
|
||
assert resp.status_code == 403
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_rejected_user_login_403_message_is_human_readable():
|
||
"""403 message for rejected user contains readable Russian text about rejection."""
|
||
async with make_app_client() as client:
|
||
reg_id = await _register_auth(client, "eve2@example.com", "eve2girl", "password123")
|
||
await _reject(reg_id)
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "eve2girl", "password": "password123"},
|
||
)
|
||
assert "отклонена" in resp.json()["detail"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 6 — Rate limit: 6-й запрос подряд → 429
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_rate_limit_triggers_on_sixth_login_attempt():
|
||
"""Login rate limit (5 per window) triggers 429 exactly on the 6th request."""
|
||
async with make_app_client() as client:
|
||
statuses = []
|
||
for _ in range(6):
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "nouser_rl", "password": "nopass"},
|
||
headers={"X-Real-IP": "10.99.99.1"},
|
||
)
|
||
statuses.append(resp.status_code)
|
||
# First 5 attempts pass rate limit (user not found → 401)
|
||
assert all(s == 401 for s in statuses[:5]), (
|
||
f"Первые 5 попыток должны быть 401, получили: {statuses[:5]}"
|
||
)
|
||
# 6th attempt hits rate limit
|
||
assert statuses[5] == 429, (
|
||
f"6-я попытка должна быть 429, получили: {statuses[5]}"
|
||
)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_rate_limit_fifth_attempt_still_passes():
|
||
"""5th login attempt is still allowed (rate limit triggers only on 6th)."""
|
||
async with make_app_client() as client:
|
||
for i in range(4):
|
||
await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "nouser_rl2", "password": "nopass"},
|
||
headers={"X-Real-IP": "10.99.99.2"},
|
||
)
|
||
resp = await client.post(
|
||
"/api/auth/login",
|
||
json={"login_or_email": "nouser_rl2", "password": "nopass"},
|
||
headers={"X-Real-IP": "10.99.99.2"},
|
||
)
|
||
assert resp.status_code == 401, (
|
||
f"5-я попытка должна пройти rate limit и вернуть 401, получили: {resp.status_code}"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 7 — Guard middleware: 401 без токена
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_verify_auth_token_raises_401_when_credentials_is_none():
|
||
"""verify_auth_token raises HTTPException 401 when no credentials provided."""
|
||
with pytest.raises(HTTPException) as exc_info:
|
||
await verify_auth_token(credentials=None)
|
||
assert exc_info.value.status_code == 401
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_verify_auth_token_raises_401_for_malformed_token():
|
||
"""verify_auth_token raises HTTPException 401 for a malformed/invalid token."""
|
||
creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials="not.a.valid.jwt")
|
||
with pytest.raises(HTTPException) as exc_info:
|
||
await verify_auth_token(credentials=creds)
|
||
assert exc_info.value.status_code == 401
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Criterion 8 — Guard middleware: валидный токен пропускается
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_verify_auth_token_returns_payload_for_valid_token():
|
||
"""verify_auth_token returns decoded JWT payload for a valid signed token."""
|
||
token = create_auth_token(reg_id=42, login="testuser")
|
||
creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
|
||
payload = await verify_auth_token(credentials=creds)
|
||
assert payload["sub"] == "42"
|
||
assert payload["login"] == "testuser"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_verify_auth_token_payload_contains_expected_fields():
|
||
"""Payload returned by verify_auth_token contains sub, login, iat, exp fields."""
|
||
token = create_auth_token(reg_id=7, login="inspector")
|
||
creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
|
||
payload = await verify_auth_token(credentials=creds)
|
||
for field in ("sub", "login", "iat", "exp"):
|
||
assert field in payload, f"Поле '{field}' отсутствует в payload"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Additional: PBKDF2 correctness — verify_password timing-safe
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def test_hash_and_verify_password_returns_true_for_correct_password():
|
||
"""_hash_password + _verify_password: correct password returns True."""
|
||
from backend.main import _hash_password, _verify_password
|
||
stored = _hash_password("mysecretpass")
|
||
assert _verify_password("mysecretpass", stored) is True
|
||
|
||
|
||
def test_hash_and_verify_password_returns_false_for_wrong_password():
|
||
"""_hash_password + _verify_password: wrong password returns False."""
|
||
from backend.main import _hash_password, _verify_password
|
||
stored = _hash_password("mysecretpass")
|
||
assert _verify_password("wrongpassword", stored) is False
|
||
|
||
|
||
def test_verify_password_returns_false_for_malformed_hash():
|
||
"""_verify_password returns False (not exception) for a malformed hash string."""
|
||
from backend.main import _verify_password
|
||
assert _verify_password("anypassword", "not-a-valid-hash") is False
|