diff --git a/tests/test_biz_001.py b/tests/test_biz_001.py new file mode 100644 index 0000000..21f822e --- /dev/null +++ b/tests/test_biz_001.py @@ -0,0 +1,338 @@ +""" +Tests for BATON-BIZ-001: Login mechanism for approved users (dual-layer: AST + httpx functional). + +Acceptance criteria: +1. Успешный login по login-полю → 200 + token +2. Успешный login по email-полю → 200 + token +3. Неверный пароль → 401 (без раскрытия причины) +4. Статус pending → 403 с читаемым сообщением +5. Статус rejected → 403 с читаемым сообщением +6. Rate limit — 6-й запрос подряд → 429 +7. Guard middleware возвращает 401 без токена +8. Guard middleware пропускает валидный токен + +Additional: error message uniformity, PBKDF2 verification. +""" +from __future__ import annotations + +import os + +os.environ.setdefault("BOT_TOKEN", "test-bot-token") +os.environ.setdefault("CHAT_ID", "-1001234567890") +os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") +os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") +os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") +os.environ.setdefault("ADMIN_TOKEN", "test-admin-token") +os.environ.setdefault("ADMIN_CHAT_ID", "5694335584") + +import pytest +from fastapi import HTTPException +from fastapi.security import HTTPAuthorizationCredentials + +from backend import db +from backend.middleware import create_auth_token, verify_auth_token +from tests.conftest import make_app_client + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +async def _register_auth(client, email: str, login: str, password: str) -> int: + """Register via /api/auth/register, return registration id.""" + resp = await client.post( + "/api/auth/register", + json={"email": email, "login": login, "password": password}, + ) + assert resp.status_code == 201, f"auth/register failed: {resp.text}" + reg = await db.get_registration_by_login_or_email(login) + assert reg is not None + return reg["id"] + + +async def _approve(reg_id: int) -> None: + await db.update_registration_status(reg_id, "approved") + + +async def _reject(reg_id: int) -> None: + await db.update_registration_status(reg_id, "rejected") + + +# --------------------------------------------------------------------------- +# Criterion 1 — Успешный login по login-полю +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_login_by_login_field_returns_200_with_token(): + """Approved user can login using their login field → 200 + token.""" + async with make_app_client() as client: + reg_id = await _register_auth(client, "alice@example.com", "alice", "password123") + await _approve(reg_id) + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "alice", "password": "password123"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert "token" in data + assert data["login"] == "alice" + + +@pytest.mark.asyncio +async def test_login_by_login_field_token_is_non_empty_string(): + """Token returned for approved login user is a non-empty string.""" + async with make_app_client() as client: + reg_id = await _register_auth(client, "alice2@example.com", "alice2", "password123") + await _approve(reg_id) + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "alice2", "password": "password123"}, + ) + assert isinstance(resp.json()["token"], str) + assert len(resp.json()["token"]) > 0 + + +# --------------------------------------------------------------------------- +# Criterion 2 — Успешный login по email-полю +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_login_by_email_field_returns_200_with_token(): + """Approved user can login using their email field → 200 + token.""" + async with make_app_client() as client: + reg_id = await _register_auth(client, "bob@example.com", "bobuser", "securepass1") + await _approve(reg_id) + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "bob@example.com", "password": "securepass1"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert "token" in data + assert data["login"] == "bobuser" + + +@pytest.mark.asyncio +async def test_login_by_email_field_token_login_matches_registration(): + """Token response login field matches the login set during registration.""" + async with make_app_client() as client: + reg_id = await _register_auth(client, "bob2@example.com", "bob2user", "securepass1") + await _approve(reg_id) + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "bob2@example.com", "password": "securepass1"}, + ) + assert resp.json()["login"] == "bob2user" + + +# --------------------------------------------------------------------------- +# Criterion 3 — Неверный пароль → 401 без раскрытия причины +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_wrong_password_returns_401(): + """Wrong password returns 401 with generic message (no detail about which field failed).""" + async with make_app_client() as client: + reg_id = await _register_auth(client, "carol@example.com", "carol", "correctpass1") + await _approve(reg_id) + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "carol", "password": "wrongpassword"}, + ) + assert resp.status_code == 401 + assert "Неверный логин или пароль" in resp.json()["detail"] + + +@pytest.mark.asyncio +async def test_nonexistent_user_returns_401_same_message_as_wrong_password(): + """Non-existent login returns same 401 message as wrong password (prevents user enumeration).""" + async with make_app_client() as client: + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "doesnotexist", "password": "anypassword"}, + ) + assert resp.status_code == 401 + assert "Неверный логин или пароль" in resp.json()["detail"] + + +# --------------------------------------------------------------------------- +# Criterion 4 — Статус pending → 403 с читаемым сообщением +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_pending_user_login_returns_403(): + """User with pending status gets 403.""" + async with make_app_client() as client: + await _register_auth(client, "dave@example.com", "dave", "password123") + # Status is 'pending' by default — no approval step + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "dave", "password": "password123"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_pending_user_login_403_message_is_human_readable(): + """403 message for pending user contains readable Russian text about the waiting status.""" + async with make_app_client() as client: + await _register_auth(client, "dave2@example.com", "dave2", "password123") + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "dave2", "password": "password123"}, + ) + assert "ожидает" in resp.json()["detail"] + + +# --------------------------------------------------------------------------- +# Criterion 5 — Статус rejected → 403 с читаемым сообщением +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_rejected_user_login_returns_403(): + """User with rejected status gets 403.""" + async with make_app_client() as client: + reg_id = await _register_auth(client, "eve@example.com", "evegirl", "password123") + await _reject(reg_id) + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "evegirl", "password": "password123"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_rejected_user_login_403_message_is_human_readable(): + """403 message for rejected user contains readable Russian text about rejection.""" + async with make_app_client() as client: + reg_id = await _register_auth(client, "eve2@example.com", "eve2girl", "password123") + await _reject(reg_id) + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "eve2girl", "password": "password123"}, + ) + assert "отклонена" in resp.json()["detail"] + + +# --------------------------------------------------------------------------- +# Criterion 6 — Rate limit: 6-й запрос подряд → 429 +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_rate_limit_triggers_on_sixth_login_attempt(): + """Login rate limit (5 per window) triggers 429 exactly on the 6th request.""" + async with make_app_client() as client: + statuses = [] + for _ in range(6): + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "nouser_rl", "password": "nopass"}, + headers={"X-Real-IP": "10.99.99.1"}, + ) + statuses.append(resp.status_code) + # First 5 attempts pass rate limit (user not found → 401) + assert all(s == 401 for s in statuses[:5]), ( + f"Первые 5 попыток должны быть 401, получили: {statuses[:5]}" + ) + # 6th attempt hits rate limit + assert statuses[5] == 429, ( + f"6-я попытка должна быть 429, получили: {statuses[5]}" + ) + + +@pytest.mark.asyncio +async def test_rate_limit_fifth_attempt_still_passes(): + """5th login attempt is still allowed (rate limit triggers only on 6th).""" + async with make_app_client() as client: + for i in range(4): + await client.post( + "/api/auth/login", + json={"login_or_email": "nouser_rl2", "password": "nopass"}, + headers={"X-Real-IP": "10.99.99.2"}, + ) + resp = await client.post( + "/api/auth/login", + json={"login_or_email": "nouser_rl2", "password": "nopass"}, + headers={"X-Real-IP": "10.99.99.2"}, + ) + assert resp.status_code == 401, ( + f"5-я попытка должна пройти rate limit и вернуть 401, получили: {resp.status_code}" + ) + + +# --------------------------------------------------------------------------- +# Criterion 7 — Guard middleware: 401 без токена +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_verify_auth_token_raises_401_when_credentials_is_none(): + """verify_auth_token raises HTTPException 401 when no credentials provided.""" + with pytest.raises(HTTPException) as exc_info: + await verify_auth_token(credentials=None) + assert exc_info.value.status_code == 401 + + +@pytest.mark.asyncio +async def test_verify_auth_token_raises_401_for_malformed_token(): + """verify_auth_token raises HTTPException 401 for a malformed/invalid token.""" + creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials="not.a.valid.jwt") + with pytest.raises(HTTPException) as exc_info: + await verify_auth_token(credentials=creds) + assert exc_info.value.status_code == 401 + + +# --------------------------------------------------------------------------- +# Criterion 8 — Guard middleware: валидный токен пропускается +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_verify_auth_token_returns_payload_for_valid_token(): + """verify_auth_token returns decoded JWT payload for a valid signed token.""" + token = create_auth_token(reg_id=42, login="testuser") + creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) + payload = await verify_auth_token(credentials=creds) + assert payload["sub"] == "42" + assert payload["login"] == "testuser" + + +@pytest.mark.asyncio +async def test_verify_auth_token_payload_contains_expected_fields(): + """Payload returned by verify_auth_token contains sub, login, iat, exp fields.""" + token = create_auth_token(reg_id=7, login="inspector") + creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) + payload = await verify_auth_token(credentials=creds) + for field in ("sub", "login", "iat", "exp"): + assert field in payload, f"Поле '{field}' отсутствует в payload" + + +# --------------------------------------------------------------------------- +# Additional: PBKDF2 correctness — verify_password timing-safe +# --------------------------------------------------------------------------- + + +def test_hash_and_verify_password_returns_true_for_correct_password(): + """_hash_password + _verify_password: correct password returns True.""" + from backend.main import _hash_password, _verify_password + stored = _hash_password("mysecretpass") + assert _verify_password("mysecretpass", stored) is True + + +def test_hash_and_verify_password_returns_false_for_wrong_password(): + """_hash_password + _verify_password: wrong password returns False.""" + from backend.main import _hash_password, _verify_password + stored = _hash_password("mysecretpass") + assert _verify_password("wrongpassword", stored) is False + + +def test_verify_password_returns_false_for_malformed_hash(): + """_verify_password returns False (not exception) for a malformed hash string.""" + from backend.main import _verify_password + assert _verify_password("anypassword", "not-a-valid-hash") is False