diff --git a/.env.example b/.env.example index 6d8ac36..cf447e0 100644 --- a/.env.example +++ b/.env.example @@ -11,8 +11,3 @@ DB_PATH=baton.db # CORS FRONTEND_ORIGIN=https://yourdomain.com - -# VAPID Push Notifications (generate with: python -c "from py_vapid import Vapid; v=Vapid(); v.generate_keys(); print(v.public_key, v.private_key)") -VAPID_PUBLIC_KEY= -VAPID_PRIVATE_KEY= -VAPID_CLAIMS_EMAIL= diff --git a/backend/config.py b/backend/config.py index 305d05e..7535cc0 100644 --- a/backend/config.py +++ b/backend/config.py @@ -26,5 +26,3 @@ ADMIN_CHAT_ID: str = _require("ADMIN_CHAT_ID") VAPID_PRIVATE_KEY: str = os.getenv("VAPID_PRIVATE_KEY", "") VAPID_PUBLIC_KEY: str = os.getenv("VAPID_PUBLIC_KEY", "") VAPID_CLAIMS_EMAIL: str = os.getenv("VAPID_CLAIMS_EMAIL", "") -JWT_SECRET: str = os.getenv("JWT_SECRET", "") -JWT_TOKEN_EXPIRE_SECONDS: int = int(os.getenv("JWT_TOKEN_EXPIRE_SECONDS", "2592000")) # 30 days diff --git a/backend/db.py b/backend/db.py index 5e2541c..d733006 100644 --- a/backend/db.py +++ b/backend/db.py @@ -352,30 +352,6 @@ async def update_registration_status(reg_id: int, status: str) -> bool: return changed -async def get_registration_by_login_or_email(login_or_email: str) -> Optional[dict]: - async with _get_conn() as conn: - async with conn.execute( - """ - SELECT id, email, login, password_hash, status, push_subscription, created_at - FROM registrations - WHERE login = ? OR email = ? - """, - (login_or_email, login_or_email), - ) as cur: - row = await cur.fetchone() - if row is None: - return None - return { - "id": row["id"], - "email": row["email"], - "login": row["login"], - "password_hash": row["password_hash"], - "status": row["status"], - "push_subscription": row["push_subscription"], - "created_at": row["created_at"], - } - - async def save_telegram_batch( message_text: str, signals_count: int, diff --git a/backend/main.py b/backend/main.py index d064b40..7f17501 100644 --- a/backend/main.py +++ b/backend/main.py @@ -2,7 +2,6 @@ from __future__ import annotations import asyncio import hashlib -import hmac import logging import os import secrets @@ -17,21 +16,11 @@ from fastapi.responses import JSONResponse from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from backend import config, db, push, telegram -from backend.middleware import ( - create_auth_token, - rate_limit_auth_login, - rate_limit_auth_register, - rate_limit_register, - rate_limit_signal, - verify_admin_token, - verify_webhook_secret, -) +from backend.middleware import rate_limit_auth_register, rate_limit_register, rate_limit_signal, verify_admin_token, verify_webhook_secret from backend.models import ( AdminBlockRequest, AdminCreateUserRequest, AdminSetPasswordRequest, - AuthLoginRequest, - AuthLoginResponse, AuthRegisterRequest, AuthRegisterResponse, RegisterRequest, @@ -62,18 +51,6 @@ def _hash_password(password: str) -> str: dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 260_000) return f"{salt.hex()}:{dk.hex()}" -def _verify_password(password: str, stored_hash: str) -> bool: - """Verify a password against a stored PBKDF2-HMAC-SHA256 hash (salt_hex:dk_hex).""" - try: - salt_hex, dk_hex = stored_hash.split(":", 1) - salt = bytes.fromhex(salt_hex) - expected_dk = bytes.fromhex(dk_hex) - actual_dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 260_000) - return hmac.compare_digest(actual_dk, expected_dk) - except Exception: - return False - - # aggregator = telegram.SignalAggregator(interval=10) # v2.0 feature — отключено в v1 (ADR-004) _KEEPALIVE_INTERVAL = 600 # 10 минут @@ -155,12 +132,6 @@ async def health() -> dict[str, Any]: return {"status": "ok"} -@app.get("/api/vapid-public-key") -@app.get("/api/push/public-key") -async def vapid_public_key() -> dict[str, str]: - return {"vapid_public_key": config.VAPID_PUBLIC_KEY} - - @app.post("/api/register", response_model=RegisterResponse) async def register(body: RegisterRequest, _: None = Depends(rate_limit_register)) -> RegisterResponse: api_key = secrets.token_hex(32) @@ -248,24 +219,6 @@ async def auth_register( return AuthRegisterResponse(status="pending", message="Заявка отправлена на рассмотрение") -@app.post("/api/auth/login", response_model=AuthLoginResponse) -async def auth_login( - body: AuthLoginRequest, - _: None = Depends(rate_limit_auth_login), -) -> AuthLoginResponse: - reg = await db.get_registration_by_login_or_email(body.login_or_email) - if reg is None or not _verify_password(body.password, reg["password_hash"]): - raise HTTPException(status_code=401, detail="Неверный логин или пароль") - if reg["status"] == "pending": - raise HTTPException(status_code=403, detail="Ваша заявка ожидает рассмотрения") - if reg["status"] == "rejected": - raise HTTPException(status_code=403, detail="Ваша заявка отклонена") - if reg["status"] != "approved": - raise HTTPException(status_code=403, detail="Доступ запрещён") - token = create_auth_token(reg["id"], reg["login"]) - return AuthLoginResponse(token=token, login=reg["login"]) - - async def _handle_callback_query(cb: dict) -> None: """Process approve/reject callback from admin Telegram inline buttons.""" data = cb.get("data", "") diff --git a/backend/middleware.py b/backend/middleware.py index 27f1fd3..1d183a9 100644 --- a/backend/middleware.py +++ b/backend/middleware.py @@ -1,11 +1,6 @@ from __future__ import annotations -import base64 -import hashlib -import hmac -import json import secrets -import time from typing import Optional from fastapi import Depends, Header, HTTPException, Request @@ -13,12 +8,6 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from backend import config, db -# JWT secret: stable across restarts if JWT_SECRET env var is set; random per-process otherwise -_JWT_SECRET: str = config.JWT_SECRET or secrets.token_hex(32) -_JWT_HEADER_B64: str = ( - base64.urlsafe_b64encode(b'{"alg":"HS256","typ":"JWT"}').rstrip(b"=").decode() -) - _bearer = HTTPBearer(auto_error=False) _RATE_LIMIT = 5 @@ -76,74 +65,3 @@ async def rate_limit_auth_register(request: Request) -> None: count = await db.rate_limit_increment(key, _AUTH_REGISTER_RATE_WINDOW) if count > _AUTH_REGISTER_RATE_LIMIT: raise HTTPException(status_code=429, detail="Too Many Requests") - - -_AUTH_LOGIN_RATE_LIMIT = 5 -_AUTH_LOGIN_RATE_WINDOW = 900 # 15 minutes - - -def _b64url_encode(data: bytes) -> str: - return base64.urlsafe_b64encode(data).rstrip(b"=").decode() - - -def _b64url_decode(s: str) -> bytes: - padding = 4 - len(s) % 4 - if padding != 4: - s += "=" * padding - return base64.urlsafe_b64decode(s) - - -def create_auth_token(reg_id: int, login: str) -> str: - """Create a signed HS256 JWT for an approved registration.""" - now = int(time.time()) - payload = { - "sub": str(reg_id), - "login": login, - "iat": now, - "exp": now + config.JWT_TOKEN_EXPIRE_SECONDS, - } - payload_b64 = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode()) - signing_input = f"{_JWT_HEADER_B64}.{payload_b64}" - sig = hmac.new( - _JWT_SECRET.encode(), signing_input.encode(), hashlib.sha256 - ).digest() - return f"{signing_input}.{_b64url_encode(sig)}" - - -def _verify_jwt_token(token: str) -> dict: - """Verify token signature and expiry. Returns payload dict on success.""" - parts = token.split(".") - if len(parts) != 3: - raise ValueError("Invalid token format") - header_b64, payload_b64, sig_b64 = parts - signing_input = f"{header_b64}.{payload_b64}" - expected_sig = hmac.new( - _JWT_SECRET.encode(), signing_input.encode(), hashlib.sha256 - ).digest() - actual_sig = _b64url_decode(sig_b64) - if not hmac.compare_digest(expected_sig, actual_sig): - raise ValueError("Invalid signature") - payload = json.loads(_b64url_decode(payload_b64)) - if payload.get("exp", 0) < time.time(): - raise ValueError("Token expired") - return payload - - -async def rate_limit_auth_login(request: Request) -> None: - key = f"login:{_get_client_ip(request)}" - count = await db.rate_limit_increment(key, _AUTH_LOGIN_RATE_WINDOW) - if count > _AUTH_LOGIN_RATE_LIMIT: - raise HTTPException(status_code=429, detail="Too Many Requests") - - -async def verify_auth_token( - credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer), -) -> dict: - """Dependency for protected endpoints — verifies Bearer JWT, returns payload.""" - if credentials is None: - raise HTTPException(status_code=401, detail="Unauthorized") - try: - payload = _verify_jwt_token(credentials.credentials) - except Exception: - raise HTTPException(status_code=401, detail="Unauthorized") - return payload diff --git a/backend/models.py b/backend/models.py index b3d847a..065d0c8 100644 --- a/backend/models.py +++ b/backend/models.py @@ -66,13 +66,3 @@ class AuthRegisterRequest(BaseModel): class AuthRegisterResponse(BaseModel): status: str message: str - - -class AuthLoginRequest(BaseModel): - login_or_email: str = Field(..., min_length=1, max_length=255) - password: str = Field(..., min_length=1, max_length=128) - - -class AuthLoginResponse(BaseModel): - token: str - login: str diff --git a/backend/telegram.py b/backend/telegram.py index db604f5..4b37a7e 100644 --- a/backend/telegram.py +++ b/backend/telegram.py @@ -11,6 +11,11 @@ from backend import config, db logger = logging.getLogger(__name__) +# Suppress httpx/httpcore transport-level logging to prevent BOT_TOKEN URL leakage. +# httpx logs request URLs (which embed the token) at DEBUG/INFO level depending on version. +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) + _TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}" @@ -143,3 +148,76 @@ async def set_webhook(url: str, secret: str) -> None: raise RuntimeError(f"setWebhook failed: {resp.text}") logger.info("Webhook registered: %s", url) + +# v2.0 feature +class SignalAggregator: + def __init__(self, interval: int = 10) -> None: + self._interval = interval + self._buffer: list[dict] = [] + self._lock = asyncio.Lock() + self._stopped = False + + async def add_signal( + self, + user_uuid: str, + user_name: Optional[str], + timestamp: int, + geo: Optional[dict], + signal_id: int, + ) -> None: + async with self._lock: + self._buffer.append( + { + "user_uuid": user_uuid, + "user_name": user_name, + "timestamp": timestamp, + "geo": geo, + "signal_id": signal_id, + } + ) + + async def flush(self) -> None: + async with self._lock: + if not self._buffer: + return + items = self._buffer[:] + self._buffer.clear() + + signal_ids = [item["signal_id"] for item in items] + timestamps = [item["timestamp"] for item in items] + ts_start = datetime.fromtimestamp(min(timestamps) / 1000, tz=timezone.utc) + ts_end = datetime.fromtimestamp(max(timestamps) / 1000, tz=timezone.utc) + t_fmt = "%H:%M:%S" + + names = [] + for item in items: + name = item["user_name"] + label = name if name else item["user_uuid"][:8] + names.append(label) + + geo_count = sum(1 for item in items if item["geo"]) + n = len(items) + + text = ( + f"\U0001f6a8 Получено {n} сигнал{'ов' if n != 1 else ''} " + f"[{ts_start.strftime(t_fmt)}—{ts_end.strftime(t_fmt)}]\n" + f"Пользователи: {', '.join(names)}\n" + f"\U0001f4cd С геолокацией: {geo_count} из {n}" + ) + + try: + await send_message(text) + await db.save_telegram_batch(text, n, signal_ids) + # rate-limit: 1 msg/sec max (#1014) + await asyncio.sleep(1) + except Exception: + logger.exception("Failed to flush aggregator batch") + + async def run(self) -> None: + while not self._stopped: + await asyncio.sleep(self._interval) + if self._buffer: + await self.flush() + + def stop(self) -> None: + self._stopped = True diff --git a/frontend/app.js b/frontend/app.js index de54b6f..e457ee7 100644 --- a/frontend/app.js +++ b/frontend/app.js @@ -92,21 +92,6 @@ function _setStatus(msg, cls) { el.hidden = !msg; } -function _setRegStatus(msg, cls) { - const el = document.getElementById('reg-status'); - if (!el) return; - el.textContent = msg; - el.className = 'reg-status' + (cls ? ' reg-status--' + cls : ''); - el.hidden = !msg; -} - -function _showView(id) { - ['view-login', 'view-register'].forEach((vid) => { - const el = document.getElementById(vid); - if (el) el.hidden = vid !== id; - }); -} - function _updateNetworkIndicator() { const el = document.getElementById('indicator-network'); if (!el) return; @@ -225,7 +210,6 @@ async function _handleSignal() { function _showOnboarding() { _showScreen('screen-onboarding'); - _showView('view-login'); const input = document.getElementById('name-input'); const btn = document.getElementById('btn-confirm'); @@ -237,24 +221,6 @@ function _showOnboarding() { if (e.key === 'Enter' && !btn.disabled) _handleRegister(); }); btn.addEventListener('click', _handleRegister); - - const btnToRegister = document.getElementById('btn-switch-to-register'); - if (btnToRegister) { - btnToRegister.addEventListener('click', () => { - _setRegStatus('', ''); - _showView('view-register'); - }); - } - - const btnToLogin = document.getElementById('btn-switch-to-login'); - if (btnToLogin) { - btnToLogin.addEventListener('click', () => _showView('view-login')); - } - - const btnRegister = document.getElementById('btn-register'); - if (btnRegister) { - btnRegister.addEventListener('click', _handleSignUp); - } } function _showMain() { @@ -277,127 +243,6 @@ function _registerSW() { }); } -// ========== VAPID / Push subscription ========== - -async function _fetchVapidPublicKey() { - try { - const res = await fetch('/api/push/public-key'); - if (!res.ok) { - console.warn('[baton] /api/push/public-key returned', res.status); - return null; - } - const data = await res.json(); - return data.vapid_public_key || null; - } catch (err) { - console.warn('[baton] Failed to fetch VAPID public key:', err); - return null; - } -} - -function _urlBase64ToUint8Array(base64String) { - const padding = '='.repeat((4 - (base64String.length % 4)) % 4); - const base64 = (base64String + padding).replace(/-/g, '+').replace(/_/g, '/'); - const raw = atob(base64); - const output = new Uint8Array(raw.length); - for (let i = 0; i < raw.length; i++) { - output[i] = raw.charCodeAt(i); - } - return output; -} - -async function _initPushSubscription(vapidPublicKey) { - if (!vapidPublicKey) { - console.warn('[baton] VAPID public key not available — push subscription skipped'); - return; - } - if (!('serviceWorker' in navigator) || !('PushManager' in window)) { - return; - } - try { - const registration = await navigator.serviceWorker.ready; - const existing = await registration.pushManager.getSubscription(); - if (existing) return; - const applicationServerKey = _urlBase64ToUint8Array(vapidPublicKey); - const subscription = await registration.pushManager.subscribe({ - userVisibleOnly: true, - applicationServerKey, - }); - _storage.setItem('baton_push_subscription', JSON.stringify(subscription)); - console.info('[baton] Push subscription created'); - } catch (err) { - console.warn('[baton] Push subscription failed:', err); - } -} - -// ========== Registration (account sign-up) ========== - -async function _getPushSubscriptionForReg() { - if (!('serviceWorker' in navigator) || !('PushManager' in window)) return null; - try { - const vapidKey = await _fetchVapidPublicKey(); - if (!vapidKey) return null; - const registration = await navigator.serviceWorker.ready; - const existing = await registration.pushManager.getSubscription(); - if (existing) return existing.toJSON(); - const applicationServerKey = _urlBase64ToUint8Array(vapidKey); - const subscription = await registration.pushManager.subscribe({ - userVisibleOnly: true, - applicationServerKey, - }); - return subscription.toJSON(); - } catch (err) { - console.warn('[baton] Push subscription for registration failed:', err); - return null; - } -} - -async function _handleSignUp() { - const emailInput = document.getElementById('reg-email'); - const loginInput = document.getElementById('reg-login'); - const passwordInput = document.getElementById('reg-password'); - const btn = document.getElementById('btn-register'); - if (!emailInput || !loginInput || !passwordInput || !btn) return; - - const email = emailInput.value.trim(); - const login = loginInput.value.trim(); - const password = passwordInput.value; - - if (!email || !login || !password) { - _setRegStatus('Заполните все поля.', 'error'); - return; - } - if (!/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email)) { - _setRegStatus('Введите корректный email.', 'error'); - return; - } - - btn.disabled = true; - const originalText = btn.textContent.trim(); - btn.textContent = '...'; - _setRegStatus('', ''); - - try { - const push_subscription = await _getPushSubscriptionForReg().catch(() => null); - await _apiPost('/api/auth/register', { email, login, password, push_subscription }); - passwordInput.value = ''; - _setRegStatus('Заявка отправлена. Ожидайте подтверждения администратора.', 'success'); - } catch (err) { - let msg = 'Ошибка. Попробуйте ещё раз.'; - if (err && err.message) { - const colonIdx = err.message.indexOf(': '); - if (colonIdx !== -1) { - try { - const parsed = JSON.parse(err.message.slice(colonIdx + 2)); - if (parsed.detail) msg = parsed.detail; - } catch (_) {} - } - } - _setRegStatus(msg, 'error'); - btn.disabled = false; - btn.textContent = originalText; - } -} - // ========== Init ========== function _init() { @@ -424,11 +269,6 @@ function _init() { } else { _showOnboarding(); } - - // Fire-and-forget: fetch VAPID key from API and subscribe to push (non-blocking) - _fetchVapidPublicKey().then(_initPushSubscription).catch((err) => { - console.warn('[baton] Push init error:', err); - }); } document.addEventListener('DOMContentLoaded', () => { diff --git a/frontend/index.html b/frontend/index.html index f04d171..e5fe30e 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -36,9 +36,7 @@
- - -
+
Confirm -
- - -
- - - - - -
-
-
diff --git a/frontend/style.css b/frontend/style.css index 36f7685..487a443 100644 --- a/frontend/style.css +++ b/frontend/style.css @@ -198,35 +198,3 @@ body { .status[hidden] { display: none; } .status--error { color: #f87171; } .status--success { color: #4ade80; } - -/* ===== Registration form ===== */ - -/* Override display:flex so [hidden] works on screen-content divs */ -.screen-content[hidden] { display: none; } - -.btn-link { - background: none; - border: none; - color: var(--muted); - font-size: 14px; - cursor: pointer; - padding: 4px 0; - text-decoration: underline; - text-underline-offset: 2px; - -webkit-tap-highlight-color: transparent; -} - -.btn-link:active { color: var(--text); } - -.reg-status { - width: 100%; - max-width: 320px; - font-size: 14px; - text-align: center; - line-height: 1.5; - padding: 4px 0; -} - -.reg-status[hidden] { display: none; } -.reg-status--error { color: #f87171; } -.reg-status--success { color: #4ade80; } diff --git a/tests/test_arch_002.py b/tests/test_arch_002.py index dd737b9..c979b1d 100644 --- a/tests/test_arch_002.py +++ b/tests/test_arch_002.py @@ -168,17 +168,25 @@ async def test_signal_message_contains_utc_marker(): # --------------------------------------------------------------------------- -# Criterion 3 — SignalAggregator removed (BATON-BIZ-004: dead code cleanup) +# Criterion 3 — SignalAggregator preserved with '# v2.0 feature' marker (static) # --------------------------------------------------------------------------- -def test_signal_aggregator_class_removed_from_telegram(): - """SignalAggregator must be removed from telegram.py (BATON-BIZ-004).""" +def test_signal_aggregator_class_preserved_in_telegram(): + """SignalAggregator class must still exist in telegram.py (ADR-004, reserved for v2).""" source = (_BACKEND_DIR / "telegram.py").read_text() - assert "class SignalAggregator" not in source + assert "class SignalAggregator" in source -def test_signal_aggregator_not_referenced_in_telegram(): - """telegram.py must not reference SignalAggregator at all (BATON-BIZ-004).""" - source = (_BACKEND_DIR / "telegram.py").read_text() - assert "SignalAggregator" not in source +def test_signal_aggregator_has_v2_feature_comment(): + """The line immediately before 'class SignalAggregator' must contain '# v2.0 feature'.""" + lines = (_BACKEND_DIR / "telegram.py").read_text().splitlines() + class_line_idx = next( + (i for i, line in enumerate(lines) if "class SignalAggregator" in line), None + ) + assert class_line_idx is not None, "class SignalAggregator not found in telegram.py" + assert class_line_idx > 0, "SignalAggregator is on the first line — no preceding comment line" + preceding_line = lines[class_line_idx - 1] + assert "# v2.0 feature" in preceding_line, ( + f"Expected '# v2.0 feature' on line before class SignalAggregator, got: {preceding_line!r}" + ) diff --git a/tests/test_baton_008.py b/tests/test_baton_008.py index b1e0c03..f572cd8 100644 --- a/tests/test_baton_008.py +++ b/tests/test_baton_008.py @@ -583,306 +583,3 @@ async def test_password_hash_stored_in_pbkdf2_format(): assert len(dk_hex) == 64, f"Expected 64-char dk hex (SHA-256), got {len(dk_hex)}" int(salt_hex, 16) # raises ValueError if not valid hex int(dk_hex, 16) - - -# --------------------------------------------------------------------------- -# 15. State machine — повторное нажатие approve на уже approved -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_webhook_callback_double_approve_does_not_send_push(): - """Second approve on already-approved registration must NOT fire push.""" - push_sub = { - "endpoint": "https://fcm.googleapis.com/fcm/send/test2", - "keys": {"p256dh": "BQDEF", "auth": "abc"}, - } - async with make_app_client() as client: - with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock): - reg_resp = await client.post( - "/api/auth/register", - json={**_VALID_PAYLOAD, "email": "double@example.com", "login": "doubleuser", "push_subscription": push_sub}, - ) - assert reg_resp.status_code == 201 - - from backend import config as _cfg - import aiosqlite - async with aiosqlite.connect(_cfg.DB_PATH) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute("SELECT id FROM registrations LIMIT 1") as cur: - row = await cur.fetchone() - reg_id = row["id"] if row else None - assert reg_id is not None - - cb_payload = { - "callback_query": { - "id": "cq_d1", - "data": f"approve:{reg_id}", - "message": {"message_id": 60, "chat": {"id": 5694335584}}, - } - } - - # First approve — should succeed - await client.post("/api/webhook/telegram", json=cb_payload, headers=_WEBHOOK_HEADERS) - await asyncio.sleep(0) - - # Second approve — push must NOT be fired - push_calls: list = [] - - async def _capture_push(sub_json, title, body): - push_calls.append(sub_json) - - cb_payload2 = {**cb_payload, "callback_query": {**cb_payload["callback_query"], "id": "cq_d2"}} - with patch("backend.push.send_push", side_effect=_capture_push): - await client.post("/api/webhook/telegram", json=cb_payload2, headers=_WEBHOOK_HEADERS) - await asyncio.sleep(0) - - assert len(push_calls) == 0, f"Second approve must not fire push, got {len(push_calls)} calls" - - # Also verify status is still 'approved' - from backend import db as _db - # Can't check here as client context is closed; DB assertion was covered by state machine logic - - -@pytest.mark.asyncio -async def test_webhook_callback_double_approve_status_stays_approved(): - """Status remains 'approved' after a second approve callback.""" - from backend import db as _db - - async with make_app_client() as client: - with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock): - reg_resp = await client.post( - "/api/auth/register", - json={**_VALID_PAYLOAD, "email": "stay@example.com", "login": "stayuser"}, - ) - assert reg_resp.status_code == 201 - - from backend import config as _cfg - import aiosqlite - async with aiosqlite.connect(_cfg.DB_PATH) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute("SELECT id FROM registrations LIMIT 1") as cur: - row = await cur.fetchone() - reg_id = row["id"] if row else None - assert reg_id is not None - - cb = { - "callback_query": { - "id": "cq_s1", - "data": f"approve:{reg_id}", - "message": {"message_id": 70, "chat": {"id": 5694335584}}, - } - } - await client.post("/api/webhook/telegram", json=cb, headers=_WEBHOOK_HEADERS) - await asyncio.sleep(0) - - cb2 = {**cb, "callback_query": {**cb["callback_query"], "id": "cq_s2"}} - await client.post("/api/webhook/telegram", json=cb2, headers=_WEBHOOK_HEADERS) - await asyncio.sleep(0) - - reg = await _db.get_registration(reg_id) - assert reg["status"] == "approved", f"Expected 'approved', got {reg['status']!r}" - - -# --------------------------------------------------------------------------- -# 16. State machine — approve после reject -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_webhook_callback_approve_after_reject_status_stays_rejected(): - """Approve after reject must NOT change status — remains 'rejected'.""" - from backend import db as _db - - async with make_app_client() as client: - with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock): - reg_resp = await client.post( - "/api/auth/register", - json={**_VALID_PAYLOAD, "email": "artest@example.com", "login": "artestuser"}, - ) - assert reg_resp.status_code == 201 - - from backend import config as _cfg - import aiosqlite - async with aiosqlite.connect(_cfg.DB_PATH) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute("SELECT id FROM registrations LIMIT 1") as cur: - row = await cur.fetchone() - reg_id = row["id"] if row else None - assert reg_id is not None - - # First: reject - rej_cb = { - "callback_query": { - "id": "cq_ar1", - "data": f"reject:{reg_id}", - "message": {"message_id": 80, "chat": {"id": 5694335584}}, - } - } - await client.post("/api/webhook/telegram", json=rej_cb, headers=_WEBHOOK_HEADERS) - await asyncio.sleep(0) - - # Then: approve — must be ignored - push_calls: list = [] - - async def _capture_push(sub_json, title, body): - push_calls.append(sub_json) - - app_cb = { - "callback_query": { - "id": "cq_ar2", - "data": f"approve:{reg_id}", - "message": {"message_id": 81, "chat": {"id": 5694335584}}, - } - } - with patch("backend.push.send_push", side_effect=_capture_push): - await client.post("/api/webhook/telegram", json=app_cb, headers=_WEBHOOK_HEADERS) - await asyncio.sleep(0) - - reg = await _db.get_registration(reg_id) - assert reg["status"] == "rejected", f"Expected 'rejected', got {reg['status']!r}" - - assert len(push_calls) == 0, f"Approve after reject must not fire push, got {len(push_calls)}" - - -# --------------------------------------------------------------------------- -# 17. Rate limiting — 4th request returns 429 -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_auth_register_rate_limit_fourth_request_returns_429(): - """4th registration request from same IP within the window returns 429.""" - async with make_app_client() as client: - with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock): - for i in range(3): - r = await client.post( - "/api/auth/register", - json={ - "email": f"ratetest{i}@example.com", - "login": f"ratetest{i}", - "password": "strongpassword123", - }, - ) - assert r.status_code == 201, f"Request {i+1} should succeed, got {r.status_code}" - - # 4th request — must be rate-limited - r4 = await client.post( - "/api/auth/register", - json={ - "email": "ratetest4@example.com", - "login": "ratetest4", - "password": "strongpassword123", - }, - ) - - assert r4.status_code == 429, f"Expected 429 on 4th request, got {r4.status_code}" - - -# --------------------------------------------------------------------------- -# 18. VAPID public key endpoint /api/push/public-key -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_vapid_public_key_new_endpoint_returns_200(): - """GET /api/push/public-key returns 200 with vapid_public_key field.""" - async with make_app_client() as client: - resp = await client.get("/api/push/public-key") - - assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text}" - body = resp.json() - assert "vapid_public_key" in body, f"Expected 'vapid_public_key' in response, got {body}" - - -# --------------------------------------------------------------------------- -# 19. Password max length — 129 chars → 422 -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_auth_register_422_password_too_long(): - """Password of 129 characters returns 422.""" - async with make_app_client() as client: - resp = await client.post( - "/api/auth/register", - json={**_VALID_PAYLOAD, "password": "a" * 129}, - ) - assert resp.status_code == 422, f"Expected 422 on 129-char password, got {resp.status_code}" - - -# --------------------------------------------------------------------------- -# 20. Login max length — 31 chars → 422 -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_auth_register_422_login_too_long(): - """Login of 31 characters returns 422.""" - async with make_app_client() as client: - resp = await client.post( - "/api/auth/register", - json={**_VALID_PAYLOAD, "login": "a" * 31}, - ) - assert resp.status_code == 422, f"Expected 422 on 31-char login, got {resp.status_code}" - - -# --------------------------------------------------------------------------- -# 21. Empty body — POST /api/auth/register with {} → 422 -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_auth_register_422_empty_body(): - """Empty JSON body returns 422.""" - async with make_app_client() as client: - resp = await client.post("/api/auth/register", json={}) - assert resp.status_code == 422, f"Expected 422 on empty body, got {resp.status_code}" - - -# --------------------------------------------------------------------------- -# 22. Malformed callback_data — no colon → ok:True without crash -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_webhook_callback_malformed_data_no_colon_returns_ok(): - """callback_query with data='garbage' (no colon) returns ok:True gracefully.""" - async with make_app_client() as client: - cb_payload = { - "callback_query": { - "id": "cq_mal1", - "data": "garbage", - "message": {"message_id": 90, "chat": {"id": 5694335584}}, - } - } - resp = await client.post( - "/api/webhook/telegram", json=cb_payload, headers=_WEBHOOK_HEADERS - ) - - assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" - assert resp.json() == {"ok": True}, f"Expected ok:True, got {resp.json()}" - - -# --------------------------------------------------------------------------- -# 23. Non-numeric reg_id — data='approve:abc' → ok:True without crash -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_webhook_callback_non_numeric_reg_id_returns_ok(): - """callback_query with data='approve:abc' (non-numeric reg_id) returns ok:True.""" - async with make_app_client() as client: - cb_payload = { - "callback_query": { - "id": "cq_nan1", - "data": "approve:abc", - "message": {"message_id": 91, "chat": {"id": 5694335584}}, - } - } - resp = await client.post( - "/api/webhook/telegram", json=cb_payload, headers=_WEBHOOK_HEADERS - ) - - assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" - assert resp.json() == {"ok": True}, f"Expected ok:True, got {resp.json()}" diff --git a/tests/test_baton_008_frontend.py b/tests/test_baton_008_frontend.py deleted file mode 100644 index c19bd96..0000000 --- a/tests/test_baton_008_frontend.py +++ /dev/null @@ -1,439 +0,0 @@ -""" -Tests for BATON-008: Frontend registration module. - -Acceptance criteria: -1. index.html — форма регистрации с полями email, login, password присутствует -2. index.html — НЕТ захардкоженных VAPID-ключей в HTML-атрибутах (decision #1333) -3. app.js — вызов /api/push/public-key (не старый /api/vapid-public-key) (decision #1331) -4. app.js — guard для PushManager (decision #1332) -5. app.js — обработчик для кнопки регистрации (#btn-register → _handleSignUp) -6. app.js — переключение между view-login и view-register -7. app.js — показ ошибок пользователю (_setRegStatus) -8. GET /api/push/public-key → 200 с vapid_public_key (API контракт) -9. POST /api/auth/register с валидными данными → 201 (API контракт) -10. POST /api/auth/register с дублирующим email → 409 -11. POST /api/auth/register с дублирующим login → 409 -12. POST /api/auth/register с невалидным email → 422 -""" -from __future__ import annotations - -import re -from pathlib import Path -from unittest.mock import AsyncMock, patch - -import pytest - -PROJECT_ROOT = Path(__file__).parent.parent -INDEX_HTML = PROJECT_ROOT / "frontend" / "index.html" -APP_JS = PROJECT_ROOT / "frontend" / "app.js" - -from tests.conftest import make_app_client - -_VALID_PAYLOAD = { - "email": "frontend_test@example.com", - "login": "frontenduser", - "password": "strongpassword123", -} - - -# --------------------------------------------------------------------------- -# HTML static analysis — Criterion 1: поля формы регистрации -# --------------------------------------------------------------------------- - - -def test_index_html_has_email_field() -> None: - """index.html должен содержать поле email для регистрации (id=reg-email).""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert 'id="reg-email"' in content, ( - "index.html не содержит поле с id='reg-email'" - ) - - -def test_index_html_has_login_field() -> None: - """index.html должен содержать поле логина для регистрации (id=reg-login).""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert 'id="reg-login"' in content, ( - "index.html не содержит поле с id='reg-login'" - ) - - -def test_index_html_has_password_field() -> None: - """index.html должен содержать поле пароля для регистрации (id=reg-password).""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert 'id="reg-password"' in content, ( - "index.html не содержит поле с id='reg-password'" - ) - - -def test_index_html_email_field_has_correct_type() -> None: - """Поле email регистрации должно иметь type='email'.""" - content = INDEX_HTML.read_text(encoding="utf-8") - # Ищем input с id=reg-email и type=email в любом порядке атрибутов - email_input_block = re.search( - r']*id="reg-email"[^>]*>', content, re.DOTALL - ) - assert email_input_block is not None, "Не найден input с id='reg-email'" - assert 'type="email"' in email_input_block.group(0), ( - "Поле reg-email не имеет type='email'" - ) - - -def test_index_html_password_field_has_correct_type() -> None: - """Поле пароля регистрации должно иметь type='password'.""" - content = INDEX_HTML.read_text(encoding="utf-8") - password_input_block = re.search( - r']*id="reg-password"[^>]*>', content, re.DOTALL - ) - assert password_input_block is not None, "Не найден input с id='reg-password'" - assert 'type="password"' in password_input_block.group(0), ( - "Поле reg-password не имеет type='password'" - ) - - -def test_index_html_has_register_button() -> None: - """index.html должен содержать кнопку регистрации (id=btn-register).""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert 'id="btn-register"' in content, ( - "index.html не содержит кнопку с id='btn-register'" - ) - - -def test_index_html_has_switch_to_register_button() -> None: - """index.html должен содержать кнопку переключения на форму регистрации (id=btn-switch-to-register).""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert 'id="btn-switch-to-register"' in content, ( - "index.html не содержит кнопку с id='btn-switch-to-register'" - ) - - -def test_index_html_has_view_register_div() -> None: - """index.html должен содержать блок view-register для формы регистрации.""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert 'id="view-register"' in content, ( - "index.html не содержит блок с id='view-register'" - ) - - -def test_index_html_has_view_login_div() -> None: - """index.html должен содержать блок view-login для онбординга.""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert 'id="view-login"' in content, ( - "index.html не содержит блок с id='view-login'" - ) - - -def test_index_html_has_reg_status_element() -> None: - """index.html должен содержать элемент статуса регистрации (id=reg-status).""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert 'id="reg-status"' in content, ( - "index.html не содержит элемент с id='reg-status'" - ) - - -# --------------------------------------------------------------------------- -# HTML static analysis — Criterion 2: НЕТ захардкоженного VAPID в HTML (decision #1333) -# --------------------------------------------------------------------------- - - -def test_index_html_no_hardcoded_vapid_key_in_meta() -> None: - """index.html НЕ должен содержать VAPID-ключ захардкоженным в meta-теге (decision #1333).""" - content = INDEX_HTML.read_text(encoding="utf-8") - # VAPID public key — это URL-safe base64 строка длиной 87 символов (без padding) - # Ищем характерный паттерн в meta-атрибутах - vapid_in_meta = re.search( - r']+content\s*=\s*["\'][A-Za-z0-9_\-]{60,}["\'][^>]*>', - content, - ) - assert vapid_in_meta is None, ( - f"Найден meta-тег с длинной строкой (возможный VAPID-ключ): " - f"{vapid_in_meta.group(0) if vapid_in_meta else ''}" - ) - - -def test_index_html_no_vapid_key_attribute_pattern() -> None: - """index.html НЕ должен содержать data-vapid-key или аналогичные атрибуты.""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert "vapid" not in content.lower(), ( - "index.html содержит упоминание 'vapid' — VAPID ключ должен читаться через API, " - "а не быть захардкожен в HTML (decision #1333)" - ) - - -# --------------------------------------------------------------------------- -# app.js static analysis — Criterion 3: /api/push/public-key endpoint (decision #1331) -# --------------------------------------------------------------------------- - - -def test_app_js_uses_new_vapid_endpoint() -> None: - """app.js должен обращаться к /api/push/public-key (decision #1331).""" - content = APP_JS.read_text(encoding="utf-8") - assert "/api/push/public-key" in content, ( - "app.js не содержит endpoint '/api/push/public-key'" - ) - - -def test_app_js_does_not_use_old_vapid_endpoint() -> None: - """app.js НЕ должен использовать устаревший /api/vapid-public-key (decision #1331).""" - content = APP_JS.read_text(encoding="utf-8") - assert "/api/vapid-public-key" not in content, ( - "app.js содержит устаревший endpoint '/api/vapid-public-key' — " - "нарушение decision #1331, должен использоваться '/api/push/public-key'" - ) - - -# --------------------------------------------------------------------------- -# app.js static analysis — Criterion 4: PushManager guard (decision #1332) -# --------------------------------------------------------------------------- - - -def test_app_js_has_push_manager_guard_in_registration_flow() -> None: - """app.js должен содержать guard 'PushManager' in window (decision #1332).""" - content = APP_JS.read_text(encoding="utf-8") - assert "'PushManager' in window" in content, ( - "app.js не содержит guard \"'PushManager' in window\" — " - "нарушение decision #1332" - ) - - -def test_app_js_push_manager_guard_combined_with_service_worker_check() -> None: - """Guard PushManager должен сочетаться с проверкой serviceWorker.""" - content = APP_JS.read_text(encoding="utf-8") - # Ищем паттерн совместной проверки serviceWorker + PushManager - assert re.search( - r"serviceWorker.*PushManager|PushManager.*serviceWorker", - content, - re.DOTALL, - ), ( - "app.js не содержит совместной проверки 'serviceWorker' и 'PushManager' — " - "guard неполный (decision #1332)" - ) - - -# --------------------------------------------------------------------------- -# app.js static analysis — Criterion 5: обработчик кнопки регистрации -# --------------------------------------------------------------------------- - - -def test_app_js_has_handle_sign_up_function() -> None: - """app.js должен содержать функцию _handleSignUp.""" - content = APP_JS.read_text(encoding="utf-8") - assert "_handleSignUp" in content, ( - "app.js не содержит функцию '_handleSignUp'" - ) - - -def test_app_js_registers_click_handler_for_btn_register() -> None: - """app.js должен добавлять click-обработчик на btn-register → _handleSignUp.""" - content = APP_JS.read_text(encoding="utf-8") - # Ищем addEventListener на элементе btn-register с вызовом _handleSignUp - assert re.search( - r'btn-register.*addEventListener|addEventListener.*btn-register', - content, - re.DOTALL, - ), ( - "app.js не содержит addEventListener для кнопки 'btn-register'" - ) - # Проверяем что именно _handleSignUp привязан к кнопке - assert re.search( - r'btn[Rr]egister.*_handleSignUp|_handleSignUp.*btn[Rr]egister', - content, - re.DOTALL, - ), ( - "app.js не связывает кнопку 'btn-register' с функцией '_handleSignUp'" - ) - - -# --------------------------------------------------------------------------- -# app.js static analysis — Criterion 6: переключение view-login / view-register -# --------------------------------------------------------------------------- - - -def test_app_js_has_show_view_function() -> None: - """app.js должен содержать функцию _showView для переключения видов.""" - content = APP_JS.read_text(encoding="utf-8") - assert "_showView" in content, ( - "app.js не содержит функцию '_showView'" - ) - - -def test_app_js_show_view_handles_view_login() -> None: - """_showView в app.js должна обрабатывать view-login.""" - content = APP_JS.read_text(encoding="utf-8") - assert "view-login" in content, ( - "app.js не содержит id 'view-login' — нет переключения на вид логина" - ) - - -def test_app_js_show_view_handles_view_register() -> None: - """_showView в app.js должна обрабатывать view-register.""" - content = APP_JS.read_text(encoding="utf-8") - assert "view-register" in content, ( - "app.js не содержит id 'view-register' — нет переключения на вид регистрации" - ) - - -def test_app_js_has_btn_switch_to_register_handler() -> None: - """app.js должен содержать обработчик для btn-switch-to-register.""" - content = APP_JS.read_text(encoding="utf-8") - assert "btn-switch-to-register" in content, ( - "app.js не содержит ссылку на 'btn-switch-to-register'" - ) - - -def test_app_js_has_btn_switch_to_login_handler() -> None: - """app.js должен содержать обработчик для btn-switch-to-login (назад).""" - content = APP_JS.read_text(encoding="utf-8") - assert "btn-switch-to-login" in content, ( - "app.js не содержит ссылку на 'btn-switch-to-login'" - ) - - -# --------------------------------------------------------------------------- -# app.js static analysis — Criterion 7: обработка ошибок / показ сообщения пользователю -# --------------------------------------------------------------------------- - - -def test_app_js_has_set_reg_status_function() -> None: - """app.js должен содержать _setRegStatus для показа статуса в форме регистрации.""" - content = APP_JS.read_text(encoding="utf-8") - assert "_setRegStatus" in content, ( - "app.js не содержит функцию '_setRegStatus'" - ) - - -def test_app_js_handle_sign_up_shows_error_on_empty_fields() -> None: - """_handleSignUp должна вызывать _setRegStatus с ошибкой при пустых полях.""" - content = APP_JS.read_text(encoding="utf-8") - # Проверяем наличие валидации пустых полей внутри _handleSignUp-подобного блока - assert re.search( - r"_setRegStatus\s*\([^)]*error", - content, - ), ( - "app.js не содержит вызов _setRegStatus с классом 'error' " - "— ошибки не отображаются пользователю" - ) - - -def test_app_js_handle_sign_up_shows_success_on_ok() -> None: - """_handleSignUp должна вызывать _setRegStatus с success при успешной регистрации.""" - content = APP_JS.read_text(encoding="utf-8") - assert re.search( - r"_setRegStatus\s*\([^)]*success", - content, - ), ( - "app.js не содержит вызов _setRegStatus с классом 'success' " - "— пользователь не уведомляется об успехе регистрации" - ) - - -def test_app_js_clears_password_after_successful_signup() -> None: - """_handleSignUp должна очищать поле пароля после успешной отправки.""" - content = APP_JS.read_text(encoding="utf-8") - # Ищем сброс значения пароля - assert re.search( - r"passwordInput\.value\s*=\s*['\"][\s]*['\"]", - content, - ), ( - "app.js не очищает поле пароля после успешной регистрации — " - "пароль остаётся в DOM (security concern)" - ) - - -def test_app_js_uses_api_auth_register_endpoint() -> None: - """app.js должен отправлять форму на /api/auth/register.""" - content = APP_JS.read_text(encoding="utf-8") - assert "/api/auth/register" in content, ( - "app.js не содержит endpoint '/api/auth/register'" - ) - - -# --------------------------------------------------------------------------- -# Integration tests — API контракты (Criteria 8–12) -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_vapid_public_key_endpoint_returns_200_with_key(): - """GET /api/push/public-key → 200 с полем vapid_public_key.""" - async with make_app_client() as client: - resp = await client.get("/api/push/public-key") - - assert resp.status_code == 200, ( - f"GET /api/push/public-key вернул {resp.status_code}, ожидался 200" - ) - body = resp.json() - assert "vapid_public_key" in body, ( - f"Ответ /api/push/public-key не содержит 'vapid_public_key': {body}" - ) - assert isinstance(body["vapid_public_key"], str), ( - "vapid_public_key должен быть строкой" - ) - - -@pytest.mark.asyncio -async def test_register_valid_payload_returns_201_pending(): - """POST /api/auth/register с валидными данными → 201 status=pending.""" - async with make_app_client() as client: - with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock): - resp = await client.post("/api/auth/register", json=_VALID_PAYLOAD) - - assert resp.status_code == 201, ( - f"POST /api/auth/register вернул {resp.status_code}: {resp.text}" - ) - body = resp.json() - assert body.get("status") == "pending", ( - f"Ожидался status='pending', получено: {body}" - ) - assert "message" in body, ( - f"Ответ не содержит поле 'message': {body}" - ) - - -@pytest.mark.asyncio -async def test_register_duplicate_email_returns_409(): - """POST /api/auth/register с дублирующим email → 409.""" - async with make_app_client() as client: - with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock): - r1 = await client.post("/api/auth/register", json=_VALID_PAYLOAD) - assert r1.status_code == 201, f"Первая регистрация не прошла: {r1.text}" - - r2 = await client.post( - "/api/auth/register", - json={**_VALID_PAYLOAD, "login": "anotherlogin"}, - ) - - assert r2.status_code == 409, ( - f"Дублирующий email должен вернуть 409, получено {r2.status_code}" - ) - - -@pytest.mark.asyncio -async def test_register_duplicate_login_returns_409(): - """POST /api/auth/register с дублирующим login → 409.""" - async with make_app_client() as client: - with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock): - r1 = await client.post("/api/auth/register", json=_VALID_PAYLOAD) - assert r1.status_code == 201, f"Первая регистрация не прошла: {r1.text}" - - r2 = await client.post( - "/api/auth/register", - json={**_VALID_PAYLOAD, "email": "another@example.com"}, - ) - - assert r2.status_code == 409, ( - f"Дублирующий login должен вернуть 409, получено {r2.status_code}" - ) - - -@pytest.mark.asyncio -async def test_register_invalid_email_returns_422(): - """POST /api/auth/register с невалидным email → 422.""" - async with make_app_client() as client: - resp = await client.post( - "/api/auth/register", - json={**_VALID_PAYLOAD, "email": "not-an-email"}, - ) - - assert resp.status_code == 422, ( - f"Невалидный email должен вернуть 422, получено {resp.status_code}" - ) diff --git a/tests/test_biz_001.py b/tests/test_biz_001.py deleted file mode 100644 index 21f822e..0000000 --- a/tests/test_biz_001.py +++ /dev/null @@ -1,338 +0,0 @@ -""" -Tests for BATON-BIZ-001: Login mechanism for approved users (dual-layer: AST + httpx functional). - -Acceptance criteria: -1. Успешный login по login-полю → 200 + token -2. Успешный login по email-полю → 200 + token -3. Неверный пароль → 401 (без раскрытия причины) -4. Статус pending → 403 с читаемым сообщением -5. Статус rejected → 403 с читаемым сообщением -6. Rate limit — 6-й запрос подряд → 429 -7. Guard middleware возвращает 401 без токена -8. Guard middleware пропускает валидный токен - -Additional: error message uniformity, PBKDF2 verification. -""" -from __future__ import annotations - -import os - -os.environ.setdefault("BOT_TOKEN", "test-bot-token") -os.environ.setdefault("CHAT_ID", "-1001234567890") -os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") -os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") -os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") -os.environ.setdefault("ADMIN_TOKEN", "test-admin-token") -os.environ.setdefault("ADMIN_CHAT_ID", "5694335584") - -import pytest -from fastapi import HTTPException -from fastapi.security import HTTPAuthorizationCredentials - -from backend import db -from backend.middleware import create_auth_token, verify_auth_token -from tests.conftest import make_app_client - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -async def _register_auth(client, email: str, login: str, password: str) -> int: - """Register via /api/auth/register, return registration id.""" - resp = await client.post( - "/api/auth/register", - json={"email": email, "login": login, "password": password}, - ) - assert resp.status_code == 201, f"auth/register failed: {resp.text}" - reg = await db.get_registration_by_login_or_email(login) - assert reg is not None - return reg["id"] - - -async def _approve(reg_id: int) -> None: - await db.update_registration_status(reg_id, "approved") - - -async def _reject(reg_id: int) -> None: - await db.update_registration_status(reg_id, "rejected") - - -# --------------------------------------------------------------------------- -# Criterion 1 — Успешный login по login-полю -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_login_by_login_field_returns_200_with_token(): - """Approved user can login using their login field → 200 + token.""" - async with make_app_client() as client: - reg_id = await _register_auth(client, "alice@example.com", "alice", "password123") - await _approve(reg_id) - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "alice", "password": "password123"}, - ) - assert resp.status_code == 200 - data = resp.json() - assert "token" in data - assert data["login"] == "alice" - - -@pytest.mark.asyncio -async def test_login_by_login_field_token_is_non_empty_string(): - """Token returned for approved login user is a non-empty string.""" - async with make_app_client() as client: - reg_id = await _register_auth(client, "alice2@example.com", "alice2", "password123") - await _approve(reg_id) - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "alice2", "password": "password123"}, - ) - assert isinstance(resp.json()["token"], str) - assert len(resp.json()["token"]) > 0 - - -# --------------------------------------------------------------------------- -# Criterion 2 — Успешный login по email-полю -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_login_by_email_field_returns_200_with_token(): - """Approved user can login using their email field → 200 + token.""" - async with make_app_client() as client: - reg_id = await _register_auth(client, "bob@example.com", "bobuser", "securepass1") - await _approve(reg_id) - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "bob@example.com", "password": "securepass1"}, - ) - assert resp.status_code == 200 - data = resp.json() - assert "token" in data - assert data["login"] == "bobuser" - - -@pytest.mark.asyncio -async def test_login_by_email_field_token_login_matches_registration(): - """Token response login field matches the login set during registration.""" - async with make_app_client() as client: - reg_id = await _register_auth(client, "bob2@example.com", "bob2user", "securepass1") - await _approve(reg_id) - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "bob2@example.com", "password": "securepass1"}, - ) - assert resp.json()["login"] == "bob2user" - - -# --------------------------------------------------------------------------- -# Criterion 3 — Неверный пароль → 401 без раскрытия причины -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_wrong_password_returns_401(): - """Wrong password returns 401 with generic message (no detail about which field failed).""" - async with make_app_client() as client: - reg_id = await _register_auth(client, "carol@example.com", "carol", "correctpass1") - await _approve(reg_id) - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "carol", "password": "wrongpassword"}, - ) - assert resp.status_code == 401 - assert "Неверный логин или пароль" in resp.json()["detail"] - - -@pytest.mark.asyncio -async def test_nonexistent_user_returns_401_same_message_as_wrong_password(): - """Non-existent login returns same 401 message as wrong password (prevents user enumeration).""" - async with make_app_client() as client: - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "doesnotexist", "password": "anypassword"}, - ) - assert resp.status_code == 401 - assert "Неверный логин или пароль" in resp.json()["detail"] - - -# --------------------------------------------------------------------------- -# Criterion 4 — Статус pending → 403 с читаемым сообщением -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_pending_user_login_returns_403(): - """User with pending status gets 403.""" - async with make_app_client() as client: - await _register_auth(client, "dave@example.com", "dave", "password123") - # Status is 'pending' by default — no approval step - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "dave", "password": "password123"}, - ) - assert resp.status_code == 403 - - -@pytest.mark.asyncio -async def test_pending_user_login_403_message_is_human_readable(): - """403 message for pending user contains readable Russian text about the waiting status.""" - async with make_app_client() as client: - await _register_auth(client, "dave2@example.com", "dave2", "password123") - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "dave2", "password": "password123"}, - ) - assert "ожидает" in resp.json()["detail"] - - -# --------------------------------------------------------------------------- -# Criterion 5 — Статус rejected → 403 с читаемым сообщением -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_rejected_user_login_returns_403(): - """User with rejected status gets 403.""" - async with make_app_client() as client: - reg_id = await _register_auth(client, "eve@example.com", "evegirl", "password123") - await _reject(reg_id) - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "evegirl", "password": "password123"}, - ) - assert resp.status_code == 403 - - -@pytest.mark.asyncio -async def test_rejected_user_login_403_message_is_human_readable(): - """403 message for rejected user contains readable Russian text about rejection.""" - async with make_app_client() as client: - reg_id = await _register_auth(client, "eve2@example.com", "eve2girl", "password123") - await _reject(reg_id) - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "eve2girl", "password": "password123"}, - ) - assert "отклонена" in resp.json()["detail"] - - -# --------------------------------------------------------------------------- -# Criterion 6 — Rate limit: 6-й запрос подряд → 429 -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_rate_limit_triggers_on_sixth_login_attempt(): - """Login rate limit (5 per window) triggers 429 exactly on the 6th request.""" - async with make_app_client() as client: - statuses = [] - for _ in range(6): - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "nouser_rl", "password": "nopass"}, - headers={"X-Real-IP": "10.99.99.1"}, - ) - statuses.append(resp.status_code) - # First 5 attempts pass rate limit (user not found → 401) - assert all(s == 401 for s in statuses[:5]), ( - f"Первые 5 попыток должны быть 401, получили: {statuses[:5]}" - ) - # 6th attempt hits rate limit - assert statuses[5] == 429, ( - f"6-я попытка должна быть 429, получили: {statuses[5]}" - ) - - -@pytest.mark.asyncio -async def test_rate_limit_fifth_attempt_still_passes(): - """5th login attempt is still allowed (rate limit triggers only on 6th).""" - async with make_app_client() as client: - for i in range(4): - await client.post( - "/api/auth/login", - json={"login_or_email": "nouser_rl2", "password": "nopass"}, - headers={"X-Real-IP": "10.99.99.2"}, - ) - resp = await client.post( - "/api/auth/login", - json={"login_or_email": "nouser_rl2", "password": "nopass"}, - headers={"X-Real-IP": "10.99.99.2"}, - ) - assert resp.status_code == 401, ( - f"5-я попытка должна пройти rate limit и вернуть 401, получили: {resp.status_code}" - ) - - -# --------------------------------------------------------------------------- -# Criterion 7 — Guard middleware: 401 без токена -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_verify_auth_token_raises_401_when_credentials_is_none(): - """verify_auth_token raises HTTPException 401 when no credentials provided.""" - with pytest.raises(HTTPException) as exc_info: - await verify_auth_token(credentials=None) - assert exc_info.value.status_code == 401 - - -@pytest.mark.asyncio -async def test_verify_auth_token_raises_401_for_malformed_token(): - """verify_auth_token raises HTTPException 401 for a malformed/invalid token.""" - creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials="not.a.valid.jwt") - with pytest.raises(HTTPException) as exc_info: - await verify_auth_token(credentials=creds) - assert exc_info.value.status_code == 401 - - -# --------------------------------------------------------------------------- -# Criterion 8 — Guard middleware: валидный токен пропускается -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_verify_auth_token_returns_payload_for_valid_token(): - """verify_auth_token returns decoded JWT payload for a valid signed token.""" - token = create_auth_token(reg_id=42, login="testuser") - creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) - payload = await verify_auth_token(credentials=creds) - assert payload["sub"] == "42" - assert payload["login"] == "testuser" - - -@pytest.mark.asyncio -async def test_verify_auth_token_payload_contains_expected_fields(): - """Payload returned by verify_auth_token contains sub, login, iat, exp fields.""" - token = create_auth_token(reg_id=7, login="inspector") - creds = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) - payload = await verify_auth_token(credentials=creds) - for field in ("sub", "login", "iat", "exp"): - assert field in payload, f"Поле '{field}' отсутствует в payload" - - -# --------------------------------------------------------------------------- -# Additional: PBKDF2 correctness — verify_password timing-safe -# --------------------------------------------------------------------------- - - -def test_hash_and_verify_password_returns_true_for_correct_password(): - """_hash_password + _verify_password: correct password returns True.""" - from backend.main import _hash_password, _verify_password - stored = _hash_password("mysecretpass") - assert _verify_password("mysecretpass", stored) is True - - -def test_hash_and_verify_password_returns_false_for_wrong_password(): - """_hash_password + _verify_password: wrong password returns False.""" - from backend.main import _hash_password, _verify_password - stored = _hash_password("mysecretpass") - assert _verify_password("wrongpassword", stored) is False - - -def test_verify_password_returns_false_for_malformed_hash(): - """_verify_password returns False (not exception) for a malformed hash string.""" - from backend.main import _verify_password - assert _verify_password("anypassword", "not-a-valid-hash") is False diff --git a/tests/test_biz_002.py b/tests/test_biz_002.py deleted file mode 100644 index 0136df7..0000000 --- a/tests/test_biz_002.py +++ /dev/null @@ -1,203 +0,0 @@ -""" -Tests for BATON-BIZ-002: Убрать hardcoded VAPID key из meta-тега, читать с /api/push/public-key - -Acceptance criteria: -1. Meta-тег vapid-public-key полностью отсутствует в frontend/index.html (decision #1333). -2. app.js использует canonical URL /api/push/public-key для получения VAPID ключа. -3. Graceful fallback: endpoint недоступен → функция возвращает null, не бросает исключение. -4. Graceful fallback: ключ пустой → _initPushSubscription не выполняется (guard на null). -5. GET /api/push/public-key возвращает HTTP 200 с полем vapid_public_key. -6. GET /api/push/public-key возвращает правильное значение из конфига. -""" -from __future__ import annotations - -import os -import re -from pathlib import Path -from unittest.mock import patch - -os.environ.setdefault("BOT_TOKEN", "test-bot-token") -os.environ.setdefault("CHAT_ID", "-1001234567890") -os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") -os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") -os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") -os.environ.setdefault("ADMIN_TOKEN", "test-admin-token") -os.environ.setdefault("ADMIN_CHAT_ID", "5694335584") - -import pytest - -from tests.conftest import make_app_client - -PROJECT_ROOT = Path(__file__).parent.parent -INDEX_HTML = PROJECT_ROOT / "frontend" / "index.html" -APP_JS = PROJECT_ROOT / "frontend" / "app.js" - -_TEST_VAPID_KEY = "BFakeVapidPublicKeyForBiz002TestingBase64UrlEncoded" - - -# --------------------------------------------------------------------------- -# Criterion 1 — AST: meta-тег vapid-public-key полностью отсутствует -# --------------------------------------------------------------------------- - - -def test_index_html_has_no_meta_tag_named_vapid_public_key() -> None: - """index.html не должен содержать вообще (decision #1333).""" - content = INDEX_HTML.read_text(encoding="utf-8") - match = re.search( - r']+name\s*=\s*["\']vapid-public-key["\']', - content, - re.IGNORECASE, - ) - assert match is None, ( - f"index.html содержит удалённый тег : {match.group(0)!r}" - ) - - -def test_index_html_has_no_vapid_meta_tag_with_empty_or_any_content() -> None: - """index.html не должен содержать ни пустой, ни непустой VAPID ключ в meta content.""" - content = INDEX_HTML.read_text(encoding="utf-8") - match = re.search( - r']*(?:vapid|application-server-key)[^>]*content\s*=', - content, - re.IGNORECASE, - ) - assert match is None, ( - f"index.html содержит -тег с VAPID-связанным атрибутом content: {match.group(0)!r}" - ) - - -# --------------------------------------------------------------------------- -# Criterion 2 — AST: app.js использует canonical /api/push/public-key -# --------------------------------------------------------------------------- - - -def test_app_js_fetch_vapid_uses_canonical_push_public_key_url() -> None: - """_fetchVapidPublicKey в app.js должна использовать /api/push/public-key (canonical URL).""" - content = APP_JS.read_text(encoding="utf-8") - assert "/api/push/public-key" in content, ( - "app.js не содержит canonical URL '/api/push/public-key' — " - "ключ не читается через правильный endpoint" - ) - - -def test_app_js_fetch_vapid_returns_vapid_public_key_field() -> None: - """_fetchVapidPublicKey должна читать поле vapid_public_key из JSON-ответа.""" - content = APP_JS.read_text(encoding="utf-8") - assert re.search(r"data\.vapid_public_key", content), ( - "app.js не читает поле 'data.vapid_public_key' из ответа API" - ) - - -# --------------------------------------------------------------------------- -# Criterion 3 — AST: graceful fallback когда endpoint недоступен -# --------------------------------------------------------------------------- - - -def test_app_js_fetch_vapid_returns_null_on_http_error() -> None: - """_fetchVapidPublicKey должна возвращать null при res.ok === false (HTTP-ошибка).""" - content = APP_JS.read_text(encoding="utf-8") - assert re.search(r"if\s*\(\s*!\s*res\.ok\s*\)", content), ( - "app.js не содержит проверку 'if (!res.ok)' — " - "HTTP-ошибки не обрабатываются gracefully в _fetchVapidPublicKey" - ) - - -def test_app_js_fetch_vapid_catches_network_errors() -> None: - """_fetchVapidPublicKey должна оборачивать fetch в try/catch и возвращать null при сетевой ошибке.""" - content = APP_JS.read_text(encoding="utf-8") - # Проверяем паттерн try { fetch ... } catch (err) { return null; } внутри функции - func_match = re.search( - r"async function _fetchVapidPublicKey\(\).*?(?=^(?:async )?function |\Z)", - content, - re.DOTALL | re.MULTILINE, - ) - assert func_match, "Функция _fetchVapidPublicKey не найдена в app.js" - func_body = func_match.group(0) - assert "catch" in func_body, ( - "app.js: _fetchVapidPublicKey не содержит блок catch — " - "сетевые ошибки при fetch не обрабатываются" - ) - assert re.search(r"return\s+null", func_body), ( - "app.js: _fetchVapidPublicKey не возвращает null при ошибке — " - "upstream код получит исключение вместо null" - ) - - -# --------------------------------------------------------------------------- -# Criterion 4 — AST: graceful fallback когда ключ пустой (decision #1332) -# --------------------------------------------------------------------------- - - -def test_app_js_fetch_vapid_returns_null_on_empty_key() -> None: - """_fetchVapidPublicKey должна возвращать null когда vapid_public_key пустой.""" - content = APP_JS.read_text(encoding="utf-8") - assert re.search(r"data\.vapid_public_key\s*\|\|\s*null", content), ( - "app.js не содержит 'data.vapid_public_key || null' — " - "пустой ключ не преобразуется в null" - ) - - -def test_app_js_init_push_subscription_guard_skips_on_null_key() -> None: - """_initPushSubscription должна ранним возвратом пропускать подписку при null ключе (decision #1332).""" - content = APP_JS.read_text(encoding="utf-8") - assert re.search(r"if\s*\(\s*!\s*vapidPublicKey\s*\)", content), ( - "app.js: _initPushSubscription не содержит guard 'if (!vapidPublicKey)' — " - "подписка может быть создана без ключа" - ) - - -# --------------------------------------------------------------------------- -# Criterion 5 — HTTP: GET /api/push/public-key → 200 + vapid_public_key -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_push_public_key_endpoint_returns_200() -> None: - """GET /api/push/public-key должен вернуть HTTP 200.""" - async with make_app_client() as client: - response = await client.get("/api/push/public-key") - assert response.status_code == 200, ( - f"GET /api/push/public-key вернул {response.status_code}, ожидался 200" - ) - - -@pytest.mark.asyncio -async def test_push_public_key_endpoint_returns_json_with_vapid_field() -> None: - """GET /api/push/public-key должен вернуть JSON с полем vapid_public_key.""" - async with make_app_client() as client: - response = await client.get("/api/push/public-key") - data = response.json() - assert "vapid_public_key" in data, ( - f"Ответ /api/push/public-key не содержит поле 'vapid_public_key': {data!r}" - ) - - -# --------------------------------------------------------------------------- -# Criterion 6 — HTTP: возвращает правильное значение из конфига -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_push_public_key_endpoint_returns_configured_value() -> None: - """GET /api/push/public-key возвращает значение из VAPID_PUBLIC_KEY конфига.""" - with patch("backend.config.VAPID_PUBLIC_KEY", _TEST_VAPID_KEY): - async with make_app_client() as client: - response = await client.get("/api/push/public-key") - data = response.json() - assert data.get("vapid_public_key") == _TEST_VAPID_KEY, ( - f"vapid_public_key должен быть '{_TEST_VAPID_KEY}', " - f"получили: {data.get('vapid_public_key')!r}" - ) - - -@pytest.mark.asyncio -async def test_push_public_key_endpoint_returns_empty_string_when_not_configured() -> None: - """GET /api/push/public-key возвращает пустую строку (не ошибку) если ключ не настроен.""" - with patch("backend.config.VAPID_PUBLIC_KEY", ""): - async with make_app_client() as client: - response = await client.get("/api/push/public-key") - assert response.status_code == 200, ( - f"Endpoint вернул {response.status_code} при пустом ключе, ожидался 200" - ) - data = response.json() - assert "vapid_public_key" in data, "Поле vapid_public_key отсутствует при пустом конфиге" diff --git a/tests/test_biz_004.py b/tests/test_biz_004.py deleted file mode 100644 index 228f2ac..0000000 --- a/tests/test_biz_004.py +++ /dev/null @@ -1,96 +0,0 @@ -""" -BATON-BIZ-004: Verify removal of dead code from backend/telegram.py. - -Acceptance criteria: -1. telegram.py does NOT contain duplicate logging setLevel calls for httpx/httpcore. -2. telegram.py does NOT contain the SignalAggregator class. -3. httpx/httpcore logging suppression is still configured in main.py (globally). -4. SignalAggregator is NOT importable from backend.telegram. -""" -from __future__ import annotations - -import ast -import importlib -import inspect -import logging -import os -from pathlib import Path - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -_BACKEND_DIR = Path(__file__).parent.parent / "backend" -_TELEGRAM_SRC = (_BACKEND_DIR / "telegram.py").read_text(encoding="utf-8") -_MAIN_SRC = (_BACKEND_DIR / "main.py").read_text(encoding="utf-8") - - -# --------------------------------------------------------------------------- -# Criteria 1 — no setLevel for httpx/httpcore in telegram.py -# --------------------------------------------------------------------------- - -def test_telegram_has_no_httpx_setlevel(): - """telegram.py must not set log level for 'httpx'.""" - assert 'getLogger("httpx").setLevel' not in _TELEGRAM_SRC - assert "getLogger('httpx').setLevel" not in _TELEGRAM_SRC - - -def test_telegram_has_no_httpcore_setlevel(): - """telegram.py must not set log level for 'httpcore'.""" - assert 'getLogger("httpcore").setLevel' not in _TELEGRAM_SRC - assert "getLogger('httpcore').setLevel" not in _TELEGRAM_SRC - - -# --------------------------------------------------------------------------- -# Criteria 2 — SignalAggregator absent from telegram.py source -# --------------------------------------------------------------------------- - -def test_telegram_source_has_no_signal_aggregator_class(): - """telegram.py source text must not contain the class definition.""" - assert "class SignalAggregator" not in _TELEGRAM_SRC - - -def test_telegram_source_has_no_signal_aggregator_reference(): - """telegram.py source text must not reference SignalAggregator at all.""" - assert "SignalAggregator" not in _TELEGRAM_SRC - - -# --------------------------------------------------------------------------- -# Criteria 3 — httpx/httpcore suppression still lives in main.py -# --------------------------------------------------------------------------- - -def test_main_suppresses_httpx_logging(): - """main.py must call getLogger('httpx').setLevel to suppress noise.""" - assert ( - 'getLogger("httpx").setLevel' in _MAIN_SRC - or "getLogger('httpx').setLevel" in _MAIN_SRC - ) - - -def test_main_suppresses_httpcore_logging(): - """main.py must call getLogger('httpcore').setLevel to suppress noise.""" - assert ( - 'getLogger("httpcore").setLevel' in _MAIN_SRC - or "getLogger('httpcore').setLevel" in _MAIN_SRC - ) - - -# --------------------------------------------------------------------------- -# Criteria 4 — SignalAggregator not importable from backend.telegram -# --------------------------------------------------------------------------- - -def test_signal_aggregator_not_importable_from_telegram(): - """Importing SignalAggregator from backend.telegram must raise ImportError.""" - import importlib - import sys - - # Force a fresh import so changes to the module are reflected - mod_name = "backend.telegram" - if mod_name in sys.modules: - del sys.modules[mod_name] - - import backend.telegram as tg_mod # noqa: F401 - assert not hasattr(tg_mod, "SignalAggregator"), ( - "SignalAggregator should not be an attribute of backend.telegram" - ) diff --git a/tests/test_fix_016.py b/tests/test_fix_016.py deleted file mode 100644 index e4748ba..0000000 --- a/tests/test_fix_016.py +++ /dev/null @@ -1,163 +0,0 @@ -""" -Tests for BATON-FIX-016: VAPID public key — убедиться, что ключ не вшит -как пустая строка в frontend-коде и читается через API. - -Acceptance criteria: -1. В frontend-коде нет хардкода пустой строки в качестве VAPID key в -теге. -2. frontend читает ключ через API /api/vapid-public-key (_fetchVapidPublicKey). -3. GET /api/vapid-public-key возвращает HTTP 200. -4. GET /api/vapid-public-key возвращает JSON с полем vapid_public_key. -5. При наличии конфигурации VAPID_PUBLIC_KEY — ответ содержит непустое значение. -""" -from __future__ import annotations - -import os -import re -from pathlib import Path -from unittest.mock import patch - -os.environ.setdefault("BOT_TOKEN", "test-bot-token") -os.environ.setdefault("CHAT_ID", "-1001234567890") -os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret") -os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram") -os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000") -os.environ.setdefault("ADMIN_TOKEN", "test-admin-token") -os.environ.setdefault("ADMIN_CHAT_ID", "5694335584") - -import pytest - -from tests.conftest import make_app_client - -PROJECT_ROOT = Path(__file__).parent.parent -FRONTEND_DIR = PROJECT_ROOT / "frontend" -INDEX_HTML = FRONTEND_DIR / "index.html" -APP_JS = FRONTEND_DIR / "app.js" - -_TEST_VAPID_PUBLIC_KEY = "BFakeVapidPublicKeyForTestingPurposesOnlyBase64UrlEncoded" - - -# --------------------------------------------------------------------------- -# Criterion 1 — AST: no hardcoded empty VAPID key in tag (index.html) -# --------------------------------------------------------------------------- - - -def test_index_html_has_no_vapid_meta_tag_with_empty_content() -> None: - """index.html не должен содержать -тег с application-server-key и пустым content.""" - content = INDEX_HTML.read_text(encoding="utf-8") - match = re.search( - r']*(?:application-server-key|vapid)[^>]*content\s*=\s*["\']["\']', - content, - re.IGNORECASE, - ) - assert match is None, ( - f"index.html содержит -тег с пустым VAPID ключом: {match.group(0)!r}" - ) - - -def test_index_html_has_no_hardcoded_application_server_key_attribute() -> None: - """index.html не должен содержать атрибут application-server-key вообще.""" - content = INDEX_HTML.read_text(encoding="utf-8") - assert "application-server-key" not in content.lower(), ( - "index.html содержит атрибут 'application-server-key' — " - "VAPID ключ не должен быть вшит в HTML" - ) - - -# --------------------------------------------------------------------------- -# Criterion 2 — AST: frontend reads key through API (app.js) -# --------------------------------------------------------------------------- - - -def test_app_js_contains_fetch_vapid_public_key_function() -> None: - """app.js должен содержать функцию _fetchVapidPublicKey.""" - content = APP_JS.read_text(encoding="utf-8") - assert "_fetchVapidPublicKey" in content, ( - "app.js не содержит функцию _fetchVapidPublicKey — " - "чтение VAPID ключа через API не реализовано" - ) - - -def test_app_js_fetch_vapid_calls_api_endpoint() -> None: - """_fetchVapidPublicKey в app.js должна обращаться к /api/push/public-key (canonical URL).""" - content = APP_JS.read_text(encoding="utf-8") - assert "/api/push/public-key" in content, ( - "app.js не содержит URL '/api/push/public-key' — VAPID ключ не читается через API" - ) - - -def test_app_js_init_push_subscription_has_null_guard() -> None: - """_initPushSubscription в app.js должна содержать guard против null ключа.""" - content = APP_JS.read_text(encoding="utf-8") - assert re.search(r"if\s*\(\s*!\s*vapidPublicKey\s*\)", content), ( - "app.js: _initPushSubscription не содержит guard 'if (!vapidPublicKey)' — " - "подписка может быть создана без ключа" - ) - - -def test_app_js_init_chains_fetch_vapid_then_init_subscription() -> None: - """_init() в app.js должна вызывать _fetchVapidPublicKey().then(_initPushSubscription).""" - content = APP_JS.read_text(encoding="utf-8") - assert re.search( - r"_fetchVapidPublicKey\(\)\s*\.\s*then\s*\(\s*_initPushSubscription\s*\)", - content, - ), ( - "app.js: _init() не содержит цепочку _fetchVapidPublicKey().then(_initPushSubscription)" - ) - - -def test_app_js_has_no_empty_string_hardcoded_as_application_server_key() -> None: - """app.js не должен содержать хардкода пустой строки для applicationServerKey.""" - content = APP_JS.read_text(encoding="utf-8") - match = re.search(r"applicationServerKey\s*[=:]\s*[\"']{2}", content) - assert match is None, ( - f"app.js содержит хардкод пустой строки для applicationServerKey: {match.group(0)!r}" - ) - - -# --------------------------------------------------------------------------- -# Criterion 3 — HTTP: GET /api/vapid-public-key returns 200 -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_vapid_public_key_endpoint_returns_200() -> None: - """GET /api/vapid-public-key должен вернуть HTTP 200.""" - async with make_app_client() as client: - response = await client.get("/api/vapid-public-key") - assert response.status_code == 200, ( - f"GET /api/vapid-public-key вернул {response.status_code}, ожидался 200" - ) - - -# --------------------------------------------------------------------------- -# Criterion 4 — HTTP: response JSON contains vapid_public_key field -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_vapid_public_key_endpoint_returns_json_with_field() -> None: - """GET /api/vapid-public-key должен вернуть JSON с полем vapid_public_key.""" - async with make_app_client() as client: - response = await client.get("/api/vapid-public-key") - data = response.json() - assert "vapid_public_key" in data, ( - f"Ответ /api/vapid-public-key не содержит поле 'vapid_public_key': {data!r}" - ) - - -# --------------------------------------------------------------------------- -# Criterion 5 — HTTP: non-empty vapid_public_key when env var is configured -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_vapid_public_key_endpoint_returns_configured_value() -> None: - """GET /api/vapid-public-key возвращает непустой ключ, когда VAPID_PUBLIC_KEY задан.""" - with patch("backend.config.VAPID_PUBLIC_KEY", _TEST_VAPID_PUBLIC_KEY): - async with make_app_client() as client: - response = await client.get("/api/vapid-public-key") - data = response.json() - assert data.get("vapid_public_key") == _TEST_VAPID_PUBLIC_KEY, ( - f"vapid_public_key должен быть '{_TEST_VAPID_PUBLIC_KEY}', " - f"получили: {data.get('vapid_public_key')!r}" - ) diff --git a/tests/test_telegram.py b/tests/test_telegram.py index bd46f51..e1467a0 100644 --- a/tests/test_telegram.py +++ b/tests/test_telegram.py @@ -1,5 +1,5 @@ """ -Tests for backend/telegram.py: send_message, set_webhook, validate_bot_token. +Tests for backend/telegram.py: send_message, set_webhook, SignalAggregator. NOTE: respx routes must be registered INSIDE the 'with mock:' block to be intercepted properly. Registering them before entering the context does not @@ -25,6 +25,8 @@ def _safe_aiosqlite_await(self): aiosqlite.core.Connection.__await__ = _safe_aiosqlite_await # type: ignore[method-assign] import json +import os as _os +import tempfile from unittest.mock import AsyncMock, patch import httpx @@ -32,7 +34,7 @@ import pytest import respx from backend import config -from backend.telegram import send_message, set_webhook, validate_bot_token +from backend.telegram import SignalAggregator, send_message, set_webhook, validate_bot_token SEND_URL = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage" @@ -184,6 +186,127 @@ async def test_set_webhook_raises_on_non_200(): await set_webhook(url="https://example.com/webhook", secret="s") +# --------------------------------------------------------------------------- +# SignalAggregator helpers +# --------------------------------------------------------------------------- + +async def _init_db_with_tmp() -> str: + """Init a temp-file DB and return its path.""" + from backend import config as _cfg, db as _db + path = tempfile.mktemp(suffix=".db") + _cfg.DB_PATH = path + await _db.init_db() + return path + + +def _cleanup(path: str) -> None: + for ext in ("", "-wal", "-shm"): + try: + _os.unlink(path + ext) + except FileNotFoundError: + pass + + +# --------------------------------------------------------------------------- +# SignalAggregator tests +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_aggregator_single_signal_calls_send_message(): + """Flushing an aggregator with one signal calls send_message once.""" + path = await _init_db_with_tmp() + try: + agg = SignalAggregator(interval=9999) + await agg.add_signal( + user_uuid="a9900001-0000-4000-8000-000000000001", + user_name="Alice", + timestamp=1742478000000, + geo={"lat": 55.0, "lon": 37.0, "accuracy": 10.0}, + signal_id=1, + ) + + with respx.mock(assert_all_called=False) as mock: + send_route = mock.post(SEND_URL).mock( + return_value=httpx.Response(200, json={"ok": True}) + ) + with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): + with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): + await agg.flush() + + assert send_route.call_count == 1 + finally: + _cleanup(path) + + +@pytest.mark.asyncio +async def test_aggregator_multiple_signals_one_message(): + """5 signals flushed at once produce exactly one send_message call.""" + path = await _init_db_with_tmp() + try: + agg = SignalAggregator(interval=9999) + for i in range(5): + await agg.add_signal( + user_uuid=f"a990000{i}-0000-4000-8000-00000000000{i}", + user_name=f"User{i}", + timestamp=1742478000000 + i * 1000, + geo=None, + signal_id=i + 1, + ) + + with respx.mock(assert_all_called=False) as mock: + send_route = mock.post(SEND_URL).mock( + return_value=httpx.Response(200, json={"ok": True}) + ) + with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): + with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): + await agg.flush() + + assert send_route.call_count == 1 + finally: + _cleanup(path) + + +@pytest.mark.asyncio +async def test_aggregator_empty_buffer_no_send(): + """Flushing an empty aggregator must NOT call send_message.""" + agg = SignalAggregator(interval=9999) + + # No routes registered — if a POST is made it will raise AllMockedAssertionError + with respx.mock(assert_all_called=False) as mock: + send_route = mock.post(SEND_URL).mock( + return_value=httpx.Response(200, json={"ok": True}) + ) + await agg.flush() + + assert send_route.call_count == 0 + + +@pytest.mark.asyncio +async def test_aggregator_buffer_cleared_after_flush(): + """After flush, the aggregator buffer is empty.""" + path = await _init_db_with_tmp() + try: + agg = SignalAggregator(interval=9999) + await agg.add_signal( + user_uuid="a9900099-0000-4000-8000-000000000099", + user_name="Test", + timestamp=1742478000000, + geo=None, + signal_id=99, + ) + assert len(agg._buffer) == 1 + + with respx.mock(assert_all_called=False) as mock: + mock.post(SEND_URL).mock(return_value=httpx.Response(200, json={"ok": True})) + with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): + with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): + await agg.flush() + + assert len(agg._buffer) == 0 + finally: + _cleanup(path) + + # --------------------------------------------------------------------------- # BATON-007: 400 "chat not found" handling # --------------------------------------------------------------------------- @@ -248,3 +371,33 @@ async def test_send_message_all_5xx_retries_exhausted_does_not_raise(): # Must not raise — message is dropped, service stays alive await send_message("test all retries exhausted") + +@pytest.mark.asyncio +async def test_aggregator_unknown_user_shows_uuid_prefix(): + """If user_name is None, the message shows first 8 chars of uuid.""" + path = await _init_db_with_tmp() + try: + agg = SignalAggregator(interval=9999) + test_uuid = "abcdef1234567890" + await agg.add_signal( + user_uuid=test_uuid, + user_name=None, + timestamp=1742478000000, + geo=None, + signal_id=1, + ) + + sent_texts: list[str] = [] + + async def _fake_send(text: str) -> None: + sent_texts.append(text) + + with patch("backend.telegram.send_message", side_effect=_fake_send): + with patch("backend.telegram.db.save_telegram_batch", new_callable=AsyncMock): + with patch("backend.telegram.asyncio.sleep", new_callable=AsyncMock): + await agg.flush() + + assert len(sent_texts) == 1 + assert test_uuid[:8] in sent_texts[0] + finally: + _cleanup(path)