kin: BATON-008 На главной странице под логином сделать кнопку модулем регистрации - указать почту, логин и пароль, нажать зарегистрироваться. После этого сообщение о регистрации приходит в чат администратору 5694335584 и кнопка апрув или не апрув, если апрув то отправителя улетает пуш на pwa что он зарегистрирован, если отказ то ничего не происходит

This commit is contained in:
Gros Frumos 2026-03-21 12:50:49 +02:00
parent baf05b6d84
commit 1adcabf3a6

338
tests/test_biz_001.py Normal file
View file

@ -0,0 +1,338 @@
"""
Tests for BATON-BIZ-001: Login mechanism for approved users (dual-layer: AST + httpx functional).
Acceptance criteria:
1. Успешный login по login-полю 200 + token
2. Успешный login по email-полю 200 + token
3. Неверный пароль 401 (без раскрытия причины)
4. Статус pending 403 с читаемым сообщением
5. Статус rejected 403 с читаемым сообщением
6. Rate limit 6-й запрос подряд 429
7. Guard middleware возвращает 401 без токена
8. Guard middleware пропускает валидный токен
Additional: error message uniformity, PBKDF2 verification.
"""
from __future__ import annotations
import os
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
os.environ.setdefault("ADMIN_CHAT_ID", "5694335584")
import pytest
from fastapi import HTTPException
from fastapi.security import HTTPAuthorizationCredentials
from backend import db
from backend.middleware import create_auth_token, verify_auth_token
from tests.conftest import make_app_client
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _register_auth(client, email: str, login: str, password: str) -> int:
"""Register via /api/auth/register, return registration id."""
resp = await client.post(
"/api/auth/register",
json={"email": email, "login": login, "password": password},
)
assert resp.status_code == 201, f"auth/register failed: {resp.text}"
reg = await db.get_registration_by_login_or_email(login)
assert reg is not None
return reg["id"]
async def _approve(reg_id: int) -> None:
await db.update_registration_status(reg_id, "approved")
async def _reject(reg_id: int) -> None:
await db.update_registration_status(reg_id, "rejected")
# ---------------------------------------------------------------------------
# Criterion 1 — Успешный login по login-полю
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_login_by_login_field_returns_200_with_token():
"""Approved user can login using their login field → 200 + token."""
async with make_app_client() as client:
reg_id = await _register_auth(client, "alice@example.com", "alice", "password123")
await _approve(reg_id)
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "alice", "password": "password123"},
)
assert resp.status_code == 200
data = resp.json()
assert "token" in data
assert data["login"] == "alice"
@pytest.mark.asyncio
async def test_login_by_login_field_token_is_non_empty_string():
"""Token returned for approved login user is a non-empty string."""
async with make_app_client() as client:
reg_id = await _register_auth(client, "alice2@example.com", "alice2", "password123")
await _approve(reg_id)
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "alice2", "password": "password123"},
)
assert isinstance(resp.json()["token"], str)
assert len(resp.json()["token"]) > 0
# ---------------------------------------------------------------------------
# Criterion 2 — Успешный login по email-полю
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_login_by_email_field_returns_200_with_token():
"""Approved user can login using their email field → 200 + token."""
async with make_app_client() as client:
reg_id = await _register_auth(client, "bob@example.com", "bobuser", "securepass1")
await _approve(reg_id)
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "bob@example.com", "password": "securepass1"},
)
assert resp.status_code == 200
data = resp.json()
assert "token" in data
assert data["login"] == "bobuser"
@pytest.mark.asyncio
async def test_login_by_email_field_token_login_matches_registration():
"""Token response login field matches the login set during registration."""
async with make_app_client() as client:
reg_id = await _register_auth(client, "bob2@example.com", "bob2user", "securepass1")
await _approve(reg_id)
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "bob2@example.com", "password": "securepass1"},
)
assert resp.json()["login"] == "bob2user"
# ---------------------------------------------------------------------------
# Criterion 3 — Неверный пароль → 401 без раскрытия причины
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_wrong_password_returns_401():
"""Wrong password returns 401 with generic message (no detail about which field failed)."""
async with make_app_client() as client:
reg_id = await _register_auth(client, "carol@example.com", "carol", "correctpass1")
await _approve(reg_id)
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "carol", "password": "wrongpassword"},
)
assert resp.status_code == 401
assert "Неверный логин или пароль" in resp.json()["detail"]
@pytest.mark.asyncio
async def test_nonexistent_user_returns_401_same_message_as_wrong_password():
"""Non-existent login returns same 401 message as wrong password (prevents user enumeration)."""
async with make_app_client() as client:
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "doesnotexist", "password": "anypassword"},
)
assert resp.status_code == 401
assert "Неверный логин или пароль" in resp.json()["detail"]
# ---------------------------------------------------------------------------
# Criterion 4 — Статус pending → 403 с читаемым сообщением
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pending_user_login_returns_403():
"""User with pending status gets 403."""
async with make_app_client() as client:
await _register_auth(client, "dave@example.com", "dave", "password123")
# Status is 'pending' by default — no approval step
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "dave", "password": "password123"},
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_pending_user_login_403_message_is_human_readable():
"""403 message for pending user contains readable Russian text about the waiting status."""
async with make_app_client() as client:
await _register_auth(client, "dave2@example.com", "dave2", "password123")
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "dave2", "password": "password123"},
)
assert "ожидает" in resp.json()["detail"]
# ---------------------------------------------------------------------------
# Criterion 5 — Статус rejected → 403 с читаемым сообщением
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_rejected_user_login_returns_403():
"""User with rejected status gets 403."""
async with make_app_client() as client:
reg_id = await _register_auth(client, "eve@example.com", "evegirl", "password123")
await _reject(reg_id)
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "evegirl", "password": "password123"},
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_rejected_user_login_403_message_is_human_readable():
"""403 message for rejected user contains readable Russian text about rejection."""
async with make_app_client() as client:
reg_id = await _register_auth(client, "eve2@example.com", "eve2girl", "password123")
await _reject(reg_id)
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "eve2girl", "password": "password123"},
)
assert "отклонена" in resp.json()["detail"]
# ---------------------------------------------------------------------------
# Criterion 6 — Rate limit: 6-й запрос подряд → 429
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_rate_limit_triggers_on_sixth_login_attempt():
"""Login rate limit (5 per window) triggers 429 exactly on the 6th request."""
async with make_app_client() as client:
statuses = []
for _ in range(6):
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "nouser_rl", "password": "nopass"},
headers={"X-Real-IP": "10.99.99.1"},
)
statuses.append(resp.status_code)
# First 5 attempts pass rate limit (user not found → 401)
assert all(s == 401 for s in statuses[:5]), (
f"Первые 5 попыток должны быть 401, получили: {statuses[:5]}"
)
# 6th attempt hits rate limit
assert statuses[5] == 429, (
f"6-я попытка должна быть 429, получили: {statuses[5]}"
)
@pytest.mark.asyncio
async def test_rate_limit_fifth_attempt_still_passes():
"""5th login attempt is still allowed (rate limit triggers only on 6th)."""
async with make_app_client() as client:
for i in range(4):
await client.post(
"/api/auth/login",
json={"login_or_email": "nouser_rl2", "password": "nopass"},
headers={"X-Real-IP": "10.99.99.2"},
)
resp = await client.post(
"/api/auth/login",
json={"login_or_email": "nouser_rl2", "password": "nopass"},
headers={"X-Real-IP": "10.99.99.2"},
)
assert resp.status_code == 401, (
f"5-я попытка должна пройти rate limit и вернуть 401, получили: {resp.status_code}"
)
# ---------------------------------------------------------------------------
# Criterion 7 — Guard middleware: 401 без токена
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_verify_auth_token_raises_401_when_credentials_is_none():
"""verify_auth_token raises HTTPException 401 when no credentials provided."""
with pytest.raises(HTTPException) as exc_info:
await verify_auth_token(credentials=None)
assert exc_info.value.status_code == 401
@pytest.mark.asyncio
async def test_verify_auth_token_raises_401_for_malformed_token():
"""verify_auth_token raises HTTPException 401 for a malformed/invalid token."""
creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials="not.a.valid.jwt")
with pytest.raises(HTTPException) as exc_info:
await verify_auth_token(credentials=creds)
assert exc_info.value.status_code == 401
# ---------------------------------------------------------------------------
# Criterion 8 — Guard middleware: валидный токен пропускается
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_verify_auth_token_returns_payload_for_valid_token():
"""verify_auth_token returns decoded JWT payload for a valid signed token."""
token = create_auth_token(reg_id=42, login="testuser")
creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
payload = await verify_auth_token(credentials=creds)
assert payload["sub"] == "42"
assert payload["login"] == "testuser"
@pytest.mark.asyncio
async def test_verify_auth_token_payload_contains_expected_fields():
"""Payload returned by verify_auth_token contains sub, login, iat, exp fields."""
token = create_auth_token(reg_id=7, login="inspector")
creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
payload = await verify_auth_token(credentials=creds)
for field in ("sub", "login", "iat", "exp"):
assert field in payload, f"Поле '{field}' отсутствует в payload"
# ---------------------------------------------------------------------------
# Additional: PBKDF2 correctness — verify_password timing-safe
# ---------------------------------------------------------------------------
def test_hash_and_verify_password_returns_true_for_correct_password():
"""_hash_password + _verify_password: correct password returns True."""
from backend.main import _hash_password, _verify_password
stored = _hash_password("mysecretpass")
assert _verify_password("mysecretpass", stored) is True
def test_hash_and_verify_password_returns_false_for_wrong_password():
"""_hash_password + _verify_password: wrong password returns False."""
from backend.main import _hash_password, _verify_password
stored = _hash_password("mysecretpass")
assert _verify_password("wrongpassword", stored) is False
def test_verify_password_returns_false_for_malformed_hash():
"""_verify_password returns False (not exception) for a malformed hash string."""
from backend.main import _verify_password
assert _verify_password("anypassword", "not-a-valid-hash") is False