""" Tests for BATON-BIZ-001: Login mechanism for approved users (dual-layer: AST + httpx functional). Acceptance criteria: 1. Успешный login по login-полю → 200 + token 2. Успешный login по email-полю → 200 + token 3. Неверный пароль → 401 (без раскрытия причины) 4. Статус pending → 403 с читаемым сообщением 5. Статус rejected → 403 с читаемым сообщением 6. Rate limit — 6-й запрос подряд → 429 7. Guard middleware возвращает 401 без токена 8. Guard middleware пропускает валидный токен Additional: error message uniformity, PBKDF2 verification. """ from __future__ import annotations import os os.environ.setdefault("BOT_TOKEN", "test-bot-token") os.environ.setdefault("CHAT_ID", "-1001234567890") os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") os.environ.setdefault("ADMIN_TOKEN", "test-admin-token") os.environ.setdefault("ADMIN_CHAT_ID", "5694335584") import pytest from fastapi import HTTPException from fastapi.security import HTTPAuthorizationCredentials from backend import db from backend.middleware import create_auth_token, verify_auth_token from tests.conftest import make_app_client # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- async def _register_auth(client, email: str, login: str, password: str) -> int: """Register via /api/auth/register, return registration id.""" resp = await client.post( "/api/auth/register", json={"email": email, "login": login, "password": password}, ) assert resp.status_code == 201, f"auth/register failed: {resp.text}" reg = await db.get_registration_by_login_or_email(login) assert reg is not None return reg["id"] async def _approve(reg_id: int) -> None: await db.update_registration_status(reg_id, "approved") async def _reject(reg_id: int) -> None: await db.update_registration_status(reg_id, "rejected") # --------------------------------------------------------------------------- # Criterion 1 — Успешный login по login-полю # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_login_by_login_field_returns_200_with_token(): """Approved user can login using their login field → 200 + token.""" async with make_app_client() as client: reg_id = await _register_auth(client, "alice@example.com", "alice", "password123") await _approve(reg_id) resp = await client.post( "/api/auth/login", json={"login_or_email": "alice", "password": "password123"}, ) assert resp.status_code == 200 data = resp.json() assert "token" in data assert data["login"] == "alice" @pytest.mark.asyncio async def test_login_by_login_field_token_is_non_empty_string(): """Token returned for approved login user is a non-empty string.""" async with make_app_client() as client: reg_id = await _register_auth(client, "alice2@example.com", "alice2", "password123") await _approve(reg_id) resp = await client.post( "/api/auth/login", json={"login_or_email": "alice2", "password": "password123"}, ) assert isinstance(resp.json()["token"], str) assert len(resp.json()["token"]) > 0 # --------------------------------------------------------------------------- # Criterion 2 — Успешный login по email-полю # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_login_by_email_field_returns_200_with_token(): """Approved user can login using their email field → 200 + token.""" async with make_app_client() as client: reg_id = await _register_auth(client, "bob@example.com", "bobuser", "securepass1") await _approve(reg_id) resp = await client.post( "/api/auth/login", json={"login_or_email": "bob@example.com", "password": "securepass1"}, ) assert resp.status_code == 200 data = resp.json() assert "token" in data assert data["login"] == "bobuser" @pytest.mark.asyncio async def test_login_by_email_field_token_login_matches_registration(): """Token response login field matches the login set during registration.""" async with make_app_client() as client: reg_id = await _register_auth(client, "bob2@example.com", "bob2user", "securepass1") await _approve(reg_id) resp = await client.post( "/api/auth/login", json={"login_or_email": "bob2@example.com", "password": "securepass1"}, ) assert resp.json()["login"] == "bob2user" # --------------------------------------------------------------------------- # Criterion 3 — Неверный пароль → 401 без раскрытия причины # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_wrong_password_returns_401(): """Wrong password returns 401 with generic message (no detail about which field failed).""" async with make_app_client() as client: reg_id = await _register_auth(client, "carol@example.com", "carol", "correctpass1") await _approve(reg_id) resp = await client.post( "/api/auth/login", json={"login_or_email": "carol", "password": "wrongpassword"}, ) assert resp.status_code == 401 assert "Неверный логин или пароль" in resp.json()["detail"] @pytest.mark.asyncio async def test_nonexistent_user_returns_401_same_message_as_wrong_password(): """Non-existent login returns same 401 message as wrong password (prevents user enumeration).""" async with make_app_client() as client: resp = await client.post( "/api/auth/login", json={"login_or_email": "doesnotexist", "password": "anypassword"}, ) assert resp.status_code == 401 assert "Неверный логин или пароль" in resp.json()["detail"] # --------------------------------------------------------------------------- # Criterion 4 — Статус pending → 403 с читаемым сообщением # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pending_user_login_returns_403(): """User with pending status gets 403.""" async with make_app_client() as client: await _register_auth(client, "dave@example.com", "dave", "password123") # Status is 'pending' by default — no approval step resp = await client.post( "/api/auth/login", json={"login_or_email": "dave", "password": "password123"}, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_pending_user_login_403_message_is_human_readable(): """403 message for pending user contains readable Russian text about the waiting status.""" async with make_app_client() as client: await _register_auth(client, "dave2@example.com", "dave2", "password123") resp = await client.post( "/api/auth/login", json={"login_or_email": "dave2", "password": "password123"}, ) assert "ожидает" in resp.json()["detail"] # --------------------------------------------------------------------------- # Criterion 5 — Статус rejected → 403 с читаемым сообщением # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_rejected_user_login_returns_403(): """User with rejected status gets 403.""" async with make_app_client() as client: reg_id = await _register_auth(client, "eve@example.com", "evegirl", "password123") await _reject(reg_id) resp = await client.post( "/api/auth/login", json={"login_or_email": "evegirl", "password": "password123"}, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_rejected_user_login_403_message_is_human_readable(): """403 message for rejected user contains readable Russian text about rejection.""" async with make_app_client() as client: reg_id = await _register_auth(client, "eve2@example.com", "eve2girl", "password123") await _reject(reg_id) resp = await client.post( "/api/auth/login", json={"login_or_email": "eve2girl", "password": "password123"}, ) assert "отклонена" in resp.json()["detail"] # --------------------------------------------------------------------------- # Criterion 6 — Rate limit: 6-й запрос подряд → 429 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_rate_limit_triggers_on_sixth_login_attempt(): """Login rate limit (5 per window) triggers 429 exactly on the 6th request.""" async with make_app_client() as client: statuses = [] for _ in range(6): resp = await client.post( "/api/auth/login", json={"login_or_email": "nouser_rl", "password": "nopass"}, headers={"X-Real-IP": "10.99.99.1"}, ) statuses.append(resp.status_code) # First 5 attempts pass rate limit (user not found → 401) assert all(s == 401 for s in statuses[:5]), ( f"Первые 5 попыток должны быть 401, получили: {statuses[:5]}" ) # 6th attempt hits rate limit assert statuses[5] == 429, ( f"6-я попытка должна быть 429, получили: {statuses[5]}" ) @pytest.mark.asyncio async def test_rate_limit_fifth_attempt_still_passes(): """5th login attempt is still allowed (rate limit triggers only on 6th).""" async with make_app_client() as client: for i in range(4): await client.post( "/api/auth/login", json={"login_or_email": "nouser_rl2", "password": "nopass"}, headers={"X-Real-IP": "10.99.99.2"}, ) resp = await client.post( "/api/auth/login", json={"login_or_email": "nouser_rl2", "password": "nopass"}, headers={"X-Real-IP": "10.99.99.2"}, ) assert resp.status_code == 401, ( f"5-я попытка должна пройти rate limit и вернуть 401, получили: {resp.status_code}" ) # --------------------------------------------------------------------------- # Criterion 7 — Guard middleware: 401 без токена # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_verify_auth_token_raises_401_when_credentials_is_none(): """verify_auth_token raises HTTPException 401 when no credentials provided.""" with pytest.raises(HTTPException) as exc_info: await verify_auth_token(credentials=None) assert exc_info.value.status_code == 401 @pytest.mark.asyncio async def test_verify_auth_token_raises_401_for_malformed_token(): """verify_auth_token raises HTTPException 401 for a malformed/invalid token.""" creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials="not.a.valid.jwt") with pytest.raises(HTTPException) as exc_info: await verify_auth_token(credentials=creds) assert exc_info.value.status_code == 401 # --------------------------------------------------------------------------- # Criterion 8 — Guard middleware: валидный токен пропускается # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_verify_auth_token_returns_payload_for_valid_token(): """verify_auth_token returns decoded JWT payload for a valid signed token.""" token = create_auth_token(reg_id=42, login="testuser") creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) payload = await verify_auth_token(credentials=creds) assert payload["sub"] == "42" assert payload["login"] == "testuser" @pytest.mark.asyncio async def test_verify_auth_token_payload_contains_expected_fields(): """Payload returned by verify_auth_token contains sub, login, iat, exp fields.""" token = create_auth_token(reg_id=7, login="inspector") creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) payload = await verify_auth_token(credentials=creds) for field in ("sub", "login", "iat", "exp"): assert field in payload, f"Поле '{field}' отсутствует в payload" # --------------------------------------------------------------------------- # Additional: PBKDF2 correctness — verify_password timing-safe # --------------------------------------------------------------------------- def test_hash_and_verify_password_returns_true_for_correct_password(): """_hash_password + _verify_password: correct password returns True.""" from backend.main import _hash_password, _verify_password stored = _hash_password("mysecretpass") assert _verify_password("mysecretpass", stored) is True def test_hash_and_verify_password_returns_false_for_wrong_password(): """_hash_password + _verify_password: wrong password returns False.""" from backend.main import _hash_password, _verify_password stored = _hash_password("mysecretpass") assert _verify_password("wrongpassword", stored) is False def test_verify_password_returns_false_for_malformed_hash(): """_verify_password returns False (not exception) for a malformed hash string.""" from backend.main import _verify_password assert _verify_password("anypassword", "not-a-valid-hash") is False