Compare commits

..

45 commits

Author SHA1 Message Date
Gros Frumos
36087c3d9e kin: BATON-FIX-012 Починить 25 тестов регрессии от BATON-SEC-005 2026-03-21 09:29:27 +02:00
Gros Frumos
c838a775f7 kin: BATON-FIX-005 Ротировать Telegram bot token — утечка в journalctl логах 2026-03-21 09:27:37 +02:00
Gros Frumos
33844a02ac Merge branch 'BATON-FIX-012-debugger' 2026-03-21 09:26:57 +02:00
Gros Frumos
2f6a84f08b kin: BATON-FIX-012-debugger 2026-03-21 09:26:57 +02:00
Gros Frumos
370a2157b9 kin: BATON-FIX-008 [TECH DEBT] Серверный код (backend/main.py, middleware.py) расходится с worktree — у сервера нет rate_limit_signal в middleware, серверный main.py пропатчен вручную через sed 2026-03-21 09:25:08 +02:00
Gros Frumos
177a0d80dd Merge branch 'BATON-FIX-005-backend_dev' 2026-03-21 09:24:31 +02:00
Gros Frumos
85d156e9be fix(BATON-FIX-005): mask BOT_TOKEN in logs — suppress httpx URL logging
- Add logging.getLogger("httpx/httpcore").setLevel(WARNING) to prevent
  token-embedded API URLs from leaking through transport-level loggers
- Add _mask_token() helper showing only last 4 chars of token
- Fix validate_bot_token() exception handler: log exc type + masked token
  instead of raw exc which may contain the full URL in some httpx versions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-21 09:24:15 +02:00
Gros Frumos
2ab5e9ab54 kin: BATON-FIX-011 Скрыть BOT_TOKEN из httpx/journalctl логов 2026-03-21 09:21:25 +02:00
Gros Frumos
42f4251184 Merge branch 'BATON-008-backend_dev' 2026-03-21 09:19:50 +02:00
Gros Frumos
4c9fec17de kin: BATON-008-backend_dev 2026-03-21 09:19:50 +02:00
Gros Frumos
63be474cdc Merge branch 'BATON-FIX-011-backend_dev' 2026-03-21 09:19:29 +02:00
Gros Frumos
8896bc32f4 kin: BATON-FIX-011-backend_dev 2026-03-21 09:19:29 +02:00
Gros Frumos
e21bcb1eb4 kin: BATON-007 При нажатии на кнопку происходит анимация и сообщение что сигнал отправлен, но в телеграм группу ничего не приходит. 2026-03-21 09:05:43 +02:00
Gros Frumos
726bb0a82c Merge branch 'BATON-007-backend_dev' 2026-03-21 08:56:40 +02:00
Gros Frumos
a2b38ef815 fix(BATON-007): add validate_bot_token() for startup detection and fix test mocks
- Add validate_bot_token() to backend/telegram.py: calls getMe on startup,
  logs ERROR if token is invalid (never raises per #1215 contract)
- Call validate_bot_token() in lifespan() after db.init_db() for early detection
- Update conftest.py make_app_client() to mock getMe endpoint
- Add 3 tests for validate_bot_token (200, 401, network error cases)

Root cause: CHAT_ID=5190015988 (positive) was wrong — fixed to -5190015988
on server per decision #1212. Group "Big Red Button" confirmed via getChat.
Service restarted.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-21 08:54:07 +02:00
Gros Frumos
cbc15eeedc kin: BATON-007 При нажатии на кнопку происходит анимация и сообщение что сигнал отправлен, но в телеграм группу ничего не приходит. 2026-03-21 08:36:20 +02:00
Gros Frumos
6142770c0c kin: BATON-SEC-003 Добавить аутентификацию на /api/signal 2026-03-21 08:16:46 +02:00
Gros Frumos
4b37703335 Merge branch 'BATON-SEC-003-frontend_dev' 2026-03-21 08:13:14 +02:00
Gros Frumos
99638fe22b kin: BATON-SEC-003-frontend_dev 2026-03-21 08:13:14 +02:00
Gros Frumos
4916b292c5 kin: BATON-007 При нажатии на кнопку происходит анимация и сообщение что сигнал отправлен, но в телеграм группу ничего не приходит. 2026-03-21 08:12:49 +02:00
Gros Frumos
dbd1048a51 Merge branch 'BATON-SEC-003-backend_dev' 2026-03-21 08:12:01 +02:00
Gros Frumos
f17ee79edb kin: BATON-SEC-003-backend_dev 2026-03-21 08:12:01 +02:00
Gros Frumos
fd4f32c1c3 kin: BATON-FIX-001 Установить FRONTEND_ORIGIN=https://baton.itafrika.com в .env на проде 2026-03-21 07:59:50 +02:00
Gros Frumos
5c9176fcd9 nginx: добавить security-заголовки (HSTS, CSP, X-Frame-Options, X-Content-Type)
Заголовки повторены в location / из-за особенности nginx — дочерний блок
с add_header отменяет наследование родительского server-уровня.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-21 07:58:56 +02:00
Gros Frumos
1b2aa501c6 Merge branch 'BATON-SEC-006-backend_dev' 2026-03-21 07:56:44 +02:00
Gros Frumos
4b7e59d78d kin: BATON-SEC-006-backend_dev 2026-03-21 07:56:44 +02:00
Gros Frumos
097b7af949 kin: BATON-SEC-005 UUID-валидация в models.py для uuid и user_id 2026-03-21 07:43:25 +02:00
Gros Frumos
205cc8037c Merge branch 'BATON-SEC-007-backend_dev' 2026-03-21 07:39:41 +02:00
Gros Frumos
2cf141f6ed kin: BATON-SEC-007-backend_dev 2026-03-21 07:39:41 +02:00
Gros Frumos
7aae8c0f62 Merge branch 'BATON-SEC-005-backend_dev' 2026-03-21 07:36:36 +02:00
Gros Frumos
5d6695ecab kin: BATON-SEC-005-backend_dev 2026-03-21 07:36:36 +02:00
Gros Frumos
9b8a5558a3 Merge branch 'BATON-SEC-002-backend_dev' 2026-03-21 07:36:33 +02:00
Gros Frumos
4ab2f04de6 kin: BATON-SEC-002-backend_dev 2026-03-21 07:36:33 +02:00
Gros Frumos
9a450d2a84 fix: add /api/health alias endpoint
Adds GET /api/health as alias for /health — fixes frontend 404.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-21 07:18:56 +02:00
Gros Frumos
fd60863e9c kin: BATON-005 Сделать админку для заведения пользователей со сменой пароля, блокировкой и удалением пользователей. 2026-03-20 23:50:54 +02:00
Gros Frumos
989074673a Merge branch 'BATON-005-frontend_dev' 2026-03-20 23:44:58 +02:00
Gros Frumos
8607a9f981 kin: BATON-005-frontend_dev 2026-03-20 23:44:58 +02:00
Gros Frumos
e547e1ce09 Merge branch 'BATON-005-backend_dev' 2026-03-20 23:39:28 +02:00
Gros Frumos
cb95c9928f kin: BATON-005-backend_dev 2026-03-20 23:39:28 +02:00
Gros Frumos
5fcfc3a76b kin: BATON-006 не работает фронт: {'detail':'Not Found'} 2026-03-20 23:31:26 +02:00
Gros Frumos
68a1c90541 Merge branch 'BATON-006-frontend_dev' 2026-03-20 23:27:06 +02:00
Gros Frumos
3cd7db11e7 kin: BATON-006-frontend_dev 2026-03-20 23:27:06 +02:00
Gros Frumos
7db8b849e0 fix: исправить RuntimeError в aiosqlite — _get_conn как async context manager
`async with await _get_conn()` запускал тред дважды: первый раз внутри
`_get_conn` через `await aiosqlite.connect()`, второй раз в `__aenter__`
через `await self`. Преобразован в `@asynccontextmanager` с `yield` и
`finally: conn.close()`. Все вызывающие места обновлены. Тест
`test_init_db_synchronous` обновлён под новый API.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 23:16:12 +02:00
Gros Frumos
1c383191cc security: заменить реальный BOT_TOKEN на плейсхолдер в env.template
Добавить пример CHAT_ID в комментарий.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 22:40:53 +02:00
Gros Frumos
b70d5990c8 deploy: подготовить артефакты для деплоя на baton.itafrika.com
- nginx/baton.conf: заменить <YOUR_DOMAIN> на baton.itafrika.com
- deploy/baton.service: добавить systemd-юнит для uvicorn (/opt/baton, port 8000)
- deploy/baton-keepalive.service: прописать реальный URL health-эндпоинта
- deploy/env.template: шаблон .env для сервера (без секретов)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 22:32:05 +02:00
20 changed files with 2067 additions and 39 deletions

1
=2.0.0 Normal file
View file

@ -0,0 +1 @@
(eval):1: command not found: pip

View file

@ -22,3 +22,7 @@ WEBHOOK_ENABLED: bool = os.getenv("WEBHOOK_ENABLED", "true").lower() == "true"
FRONTEND_ORIGIN: str = os.getenv("FRONTEND_ORIGIN", "http://localhost:3000") FRONTEND_ORIGIN: str = os.getenv("FRONTEND_ORIGIN", "http://localhost:3000")
APP_URL: str | None = os.getenv("APP_URL") # Публичный URL приложения для keep-alive self-ping APP_URL: str | None = os.getenv("APP_URL") # Публичный URL приложения для keep-alive self-ping
ADMIN_TOKEN: str = _require("ADMIN_TOKEN") ADMIN_TOKEN: str = _require("ADMIN_TOKEN")
ADMIN_CHAT_ID: str = os.getenv("ADMIN_CHAT_ID", "5694335584")
VAPID_PRIVATE_KEY: str = os.getenv("VAPID_PRIVATE_KEY", "")
VAPID_PUBLIC_KEY: str = os.getenv("VAPID_PUBLIC_KEY", "")
VAPID_CLAIMS_EMAIL: str = os.getenv("VAPID_CLAIMS_EMAIL", "")

View file

@ -67,6 +67,23 @@ async def init_db() -> None:
count INTEGER NOT NULL DEFAULT 0, count INTEGER NOT NULL DEFAULT 0,
window_start REAL NOT NULL DEFAULT 0 window_start REAL NOT NULL DEFAULT 0
); );
CREATE TABLE IF NOT EXISTS registrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
login TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
push_subscription TEXT DEFAULT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_registrations_status
ON registrations(status);
CREATE INDEX IF NOT EXISTS idx_registrations_email
ON registrations(email);
CREATE INDEX IF NOT EXISTS idx_registrations_login
ON registrations(login);
""") """)
# Migrations for existing databases (silently ignore if columns already exist) # Migrations for existing databases (silently ignore if columns already exist)
for stmt in [ for stmt in [
@ -284,6 +301,56 @@ async def rate_limit_increment(key: str, window: float) -> int:
return row["count"] if row else 1 return row["count"] if row else 1
async def create_registration(
email: str,
login: str,
password_hash: str,
push_subscription: Optional[str] = None,
) -> int:
"""Insert a new registration. Raises aiosqlite.IntegrityError on email/login conflict."""
async with _get_conn() as conn:
async with conn.execute(
"""
INSERT INTO registrations (email, login, password_hash, push_subscription)
VALUES (?, ?, ?, ?)
""",
(email, login, password_hash, push_subscription),
) as cur:
reg_id = cur.lastrowid
await conn.commit()
return reg_id # type: ignore[return-value]
async def get_registration(reg_id: int) -> Optional[dict]:
async with _get_conn() as conn:
async with conn.execute(
"SELECT id, email, login, status, push_subscription, created_at FROM registrations WHERE id = ?",
(reg_id,),
) as cur:
row = await cur.fetchone()
if row is None:
return None
return {
"id": row["id"],
"email": row["email"],
"login": row["login"],
"status": row["status"],
"push_subscription": row["push_subscription"],
"created_at": row["created_at"],
}
async def update_registration_status(reg_id: int, status: str) -> bool:
async with _get_conn() as conn:
async with conn.execute(
"UPDATE registrations SET status = ? WHERE id = ?",
(status, reg_id),
) as cur:
changed = cur.rowcount > 0
await conn.commit()
return changed
async def save_telegram_batch( async def save_telegram_batch(
message_text: str, message_text: str,
signals_count: int, signals_count: int,

View file

@ -15,12 +15,14 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from backend import config, db, telegram from backend import config, db, push, telegram
from backend.middleware import rate_limit_register, rate_limit_signal, verify_admin_token, verify_webhook_secret from backend.middleware import rate_limit_auth_register, rate_limit_register, rate_limit_signal, verify_admin_token, verify_webhook_secret
from backend.models import ( from backend.models import (
AdminBlockRequest, AdminBlockRequest,
AdminCreateUserRequest, AdminCreateUserRequest,
AdminSetPasswordRequest, AdminSetPasswordRequest,
AuthRegisterRequest,
AuthRegisterResponse,
RegisterRequest, RegisterRequest,
RegisterResponse, RegisterResponse,
SignalRequest, SignalRequest,
@ -30,6 +32,7 @@ from backend.models import (
_api_key_bearer = HTTPBearer(auto_error=False) _api_key_bearer = HTTPBearer(auto_error=False)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -182,6 +185,84 @@ async def signal(
return SignalResponse(status="ok", signal_id=signal_id) return SignalResponse(status="ok", signal_id=signal_id)
@app.post("/api/auth/register", response_model=AuthRegisterResponse, status_code=201)
async def auth_register(
body: AuthRegisterRequest,
_: None = Depends(rate_limit_auth_register),
) -> AuthRegisterResponse:
password_hash = _hash_password(body.password)
push_sub_json = (
body.push_subscription.model_dump_json() if body.push_subscription else None
)
try:
reg_id = await db.create_registration(
email=str(body.email),
login=body.login,
password_hash=password_hash,
push_subscription=push_sub_json,
)
except Exception as exc:
# aiosqlite.IntegrityError on email/login UNIQUE conflict
if "UNIQUE" in str(exc) or "unique" in str(exc).lower():
raise HTTPException(status_code=409, detail="Email или логин уже существует")
raise
reg = await db.get_registration(reg_id)
asyncio.create_task(
telegram.send_registration_notification(
reg_id=reg_id,
login=body.login,
email=str(body.email),
created_at=reg["created_at"] if reg else "",
)
)
return AuthRegisterResponse(status="pending", message="Заявка отправлена на рассмотрение")
async def _handle_callback_query(cb: dict) -> None:
"""Process approve/reject callback from admin Telegram inline buttons."""
data = cb.get("data", "")
callback_query_id = cb.get("id", "")
message = cb.get("message", {})
chat_id = message.get("chat", {}).get("id")
message_id = message.get("message_id")
if ":" not in data:
return
action, reg_id_str = data.split(":", 1)
try:
reg_id = int(reg_id_str)
except ValueError:
return
reg = await db.get_registration(reg_id)
if reg is None:
await telegram.answer_callback_query(callback_query_id)
return
if action == "approve":
await db.update_registration_status(reg_id, "approved")
if chat_id and message_id:
await telegram.edit_message_text(
chat_id, message_id, f"✅ Пользователь {reg['login']} одобрен"
)
if reg["push_subscription"]:
asyncio.create_task(
push.send_push(
reg["push_subscription"],
"Baton",
"Ваша регистрация одобрена!",
)
)
elif action == "reject":
await db.update_registration_status(reg_id, "rejected")
if chat_id and message_id:
await telegram.edit_message_text(
chat_id, message_id, f"❌ Пользователь {reg['login']} отклонён"
)
await telegram.answer_callback_query(callback_query_id)
@app.get("/admin/users", dependencies=[Depends(verify_admin_token)]) @app.get("/admin/users", dependencies=[Depends(verify_admin_token)])
async def admin_list_users() -> list[dict]: async def admin_list_users() -> list[dict]:
return await db.admin_list_users() return await db.admin_list_users()
@ -226,6 +307,13 @@ async def webhook_telegram(
_: None = Depends(verify_webhook_secret), _: None = Depends(verify_webhook_secret),
) -> dict[str, Any]: ) -> dict[str, Any]:
update = await request.json() update = await request.json()
# Handle inline button callback queries (approve/reject registration)
callback_query = update.get("callback_query")
if callback_query:
await _handle_callback_query(callback_query)
return {"ok": True}
message = update.get("message", {}) message = update.get("message", {})
text = message.get("text", "") text = message.get("text", "")

View file

@ -16,6 +16,9 @@ _RATE_WINDOW = 600 # 10 minutes
_SIGNAL_RATE_LIMIT = 10 _SIGNAL_RATE_LIMIT = 10
_SIGNAL_RATE_WINDOW = 60 # 1 minute _SIGNAL_RATE_WINDOW = 60 # 1 minute
_AUTH_REGISTER_RATE_LIMIT = 3
_AUTH_REGISTER_RATE_WINDOW = 600 # 10 minutes
def _get_client_ip(request: Request) -> str: def _get_client_ip(request: Request) -> str:
return ( return (
@ -55,3 +58,10 @@ async def rate_limit_signal(request: Request) -> None:
count = await db.rate_limit_increment(key, _SIGNAL_RATE_WINDOW) count = await db.rate_limit_increment(key, _SIGNAL_RATE_WINDOW)
if count > _SIGNAL_RATE_LIMIT: if count > _SIGNAL_RATE_LIMIT:
raise HTTPException(status_code=429, detail="Too Many Requests") raise HTTPException(status_code=429, detail="Too Many Requests")
async def rate_limit_auth_register(request: Request) -> None:
key = f"authreg:{_get_client_ip(request)}"
count = await db.rate_limit_increment(key, _AUTH_REGISTER_RATE_WINDOW)
if count > _AUTH_REGISTER_RATE_LIMIT:
raise HTTPException(status_code=429, detail="Too Many Requests")

View file

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional from typing import Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, EmailStr, Field
class RegisterRequest(BaseModel): class RegisterRequest(BaseModel):
@ -44,3 +44,25 @@ class AdminSetPasswordRequest(BaseModel):
class AdminBlockRequest(BaseModel): class AdminBlockRequest(BaseModel):
is_blocked: bool is_blocked: bool
class PushSubscriptionKeys(BaseModel):
p256dh: str
auth: str
class PushSubscription(BaseModel):
endpoint: str
keys: PushSubscriptionKeys
class AuthRegisterRequest(BaseModel):
email: EmailStr
login: str = Field(..., min_length=3, max_length=30, pattern=r'^[a-zA-Z0-9_-]+$')
password: str = Field(..., min_length=8, max_length=128)
push_subscription: Optional[PushSubscription] = None
class AuthRegisterResponse(BaseModel):
status: str
message: str

35
backend/push.py Normal file
View file

@ -0,0 +1,35 @@
from __future__ import annotations
import asyncio
import json
import logging
from backend import config
logger = logging.getLogger(__name__)
async def send_push(subscription_json: str, title: str, body: str) -> None:
"""Send a Web Push notification. Swallows all errors — never raises."""
if not config.VAPID_PRIVATE_KEY:
logger.warning("VAPID_PRIVATE_KEY not configured — push notification skipped")
return
try:
import pywebpush # type: ignore[import]
except ImportError:
logger.warning("pywebpush not installed — push notification skipped")
return
try:
subscription_info = json.loads(subscription_json)
data = json.dumps({"title": title, "body": body})
vapid_claims = {"sub": f"mailto:{config.VAPID_CLAIMS_EMAIL or 'admin@example.com'}"}
await asyncio.to_thread(
pywebpush.webpush,
subscription_info=subscription_info,
data=data,
vapid_private_key=config.VAPID_PRIVATE_KEY,
vapid_claims=vapid_claims,
)
except Exception as exc:
logger.error("Web Push failed: %s", exc)

View file

@ -11,9 +11,21 @@ from backend import config, db
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Suppress httpx/httpcore transport-level logging to prevent BOT_TOKEN URL leakage.
# httpx logs request URLs (which embed the token) at DEBUG/INFO level depending on version.
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
_TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}" _TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}"
def _mask_token(token: str) -> str:
"""Return a safe representation of the bot token for logging."""
if not token or len(token) < 4:
return "***REDACTED***"
return f"***{token[-4:]}"
async def validate_bot_token() -> bool: async def validate_bot_token() -> bool:
"""Validate BOT_TOKEN by calling getMe. Logs ERROR if invalid. Never raises.""" """Validate BOT_TOKEN by calling getMe. Logs ERROR if invalid. Never raises."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="getMe") url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="getMe")
@ -29,7 +41,13 @@ async def validate_bot_token() -> bool:
) )
return False return False
except Exception as exc: except Exception as exc:
logger.error("BOT_TOKEN validation failed (network): %s", exc) # Do not log `exc` directly — it may contain the API URL with the token
# embedded (httpx includes request URL in some exception types/versions).
logger.error(
"BOT_TOKEN validation failed (network error): %s — token ends with %s",
type(exc).__name__,
_mask_token(config.BOT_TOKEN),
)
return False return False
@ -55,6 +73,68 @@ async def send_message(text: str) -> None:
logger.error("Telegram send_message: all 3 attempts failed, message dropped") logger.error("Telegram send_message: all 3 attempts failed, message dropped")
async def send_registration_notification(
reg_id: int, login: str, email: str, created_at: str
) -> None:
"""Send registration request notification to admin with approve/reject inline buttons.
Swallows all errors never raises."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="sendMessage")
text = (
f"📋 Новая заявка на регистрацию\n\n"
f"Login: {login}\nEmail: {email}\nДата: {created_at}"
)
reply_markup = {
"inline_keyboard": [[
{"text": "✅ Одобрить", "callback_data": f"approve:{reg_id}"},
{"text": "❌ Отклонить", "callback_data": f"reject:{reg_id}"},
]]
}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
url,
json={
"chat_id": config.ADMIN_CHAT_ID,
"text": text,
"reply_markup": reply_markup,
},
)
if resp.status_code != 200:
logger.error(
"send_registration_notification failed %s: %s",
resp.status_code,
resp.text,
)
except Exception as exc:
logger.error("send_registration_notification error: %s", exc)
async def answer_callback_query(callback_query_id: str) -> None:
"""Answer a Telegram callback query. Swallows all errors."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="answerCallbackQuery")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json={"callback_query_id": callback_query_id})
if resp.status_code != 200:
logger.error("answerCallbackQuery failed %s: %s", resp.status_code, resp.text)
except Exception as exc:
logger.error("answerCallbackQuery error: %s", exc)
async def edit_message_text(chat_id: str | int, message_id: int, text: str) -> None:
"""Edit a Telegram message text. Swallows all errors."""
url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="editMessageText")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
url, json={"chat_id": chat_id, "message_id": message_id, "text": text}
)
if resp.status_code != 200:
logger.error("editMessageText failed %s: %s", resp.status_code, resp.text)
except Exception as exc:
logger.error("editMessageText error: %s", exc)
async def set_webhook(url: str, secret: str) -> None: async def set_webhook(url: str, secret: str) -> None:
api_url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="setWebhook") api_url = _TELEGRAM_API.format(token=config.BOT_TOKEN, method="setWebhook")
async with httpx.AsyncClient(timeout=10) as client: async with httpx.AsyncClient(timeout=10) as client:

300
docs/DESIGN_BATON008.md Normal file
View file

@ -0,0 +1,300 @@
# DESIGN_BATON008 — Регистрационный flow с Telegram-апрувом
## Flow диаграмма
```
Пользователь Backend Telegram PWA / Service Worker
| | | |
|-- POST /api/auth/-------->| | |
| register | | |
| {email,login,pwd,push} | | |
| |-- validate input | |
| |-- hash password (PBKDF2) | |
| |-- INSERT registrations | |
| | (status=pending) | |
|<-- 201 {status:pending} --| | |
| |-- create_task ─────────────>| |
| | send_registration_ | |
| | notification() | |
| | | |
| | [Admin видит сообщение с кнопками] |
| | [✅ Одобрить / ❌ Отклонить] |
| | | |
| |<-- POST /api/webhook/ -----| (callback_query) |
| | telegram | |
| |-- parse callback_data | |
| |-- UPDATE registrations | |
| | SET status='approved' | |
| |-- answerCallbackQuery ─────>| |
| |-- editMessageText ─────────>| |
| |-- create_task | |
| | send_push() ─────────────────────────────────────>|
| | | [Push: "Одобрен!"] |
|<-- 200 {"ok": True} ------| | |
```
## API контракт
### POST /api/auth/register
**Request body:**
```json
{
"email": "user@example.com",
"login": "user_name",
"password": "securepass",
"push_subscription": {
"endpoint": "https://fcm.googleapis.com/fcm/send/...",
"keys": {
"p256dh": "BNcR...",
"auth": "tBHI..."
}
}
}
```
`push_subscription` — nullable. Если null, push при одобрении не отправляется.
**Validation:**
- `email`: формат email (Pydantic EmailStr или regex `[^@]+@[^@]+\.[^@]+`)
- `login`: 330 символов, `[a-zA-Z0-9_-]`
- `password`: минимум 8 символов
- `push_subscription`: nullable object
**Response 201:**
```json
{"status": "pending", "message": "Заявка отправлена на рассмотрение"}
```
**Response 409 (дубль email или login):**
```json
{"detail": "Пользователь с таким email или логином уже существует"}
```
**Response 429:** rate limit (через существующий `rate_limit_register` middleware)
**Response 422:** невалидные поля (Pydantic автоматически)
---
### POST /api/webhook/telegram (расширение)
Существующий эндпоинт. Добавляется ветка обработки `callback_query`:
**Входящий update (approve):**
```json
{
"callback_query": {
"id": "123456789",
"data": "approve:42",
"message": {
"message_id": 777,
"chat": {"id": 5694335584}
}
}
}
```
**Поведение при `approve:{id}`:**
1. `UPDATE registrations SET status='approved' WHERE id=?`
2. Fetch registration row (для получения login и push_subscription)
3. `answerCallbackQuery(callback_query_id)`
4. `editMessageText(chat_id, message_id, "✅ Пользователь {login} одобрен")`
5. Если `push_subscription IS NOT NULL``create_task(send_push(...))`
6. Вернуть `{"ok": True}`
**Поведение при `reject:{id}`:**
1. `UPDATE registrations SET status='rejected' WHERE id=?`
2. `answerCallbackQuery(callback_query_id)`
3. `editMessageText(chat_id, message_id, "❌ Пользователь {login} отклонён")`
4. Push НЕ отправляется
5. Вернуть `{"ok": True}`
---
## SQL миграция
```sql
-- В init_db(), добавить в executescript:
CREATE TABLE IF NOT EXISTS registrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
login TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
push_subscription TEXT DEFAULT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_registrations_email
ON registrations(email);
CREATE UNIQUE INDEX IF NOT EXISTS idx_registrations_login
ON registrations(login);
CREATE INDEX IF NOT EXISTS idx_registrations_status
ON registrations(status);
```
Таблица создаётся через `CREATE TABLE IF NOT EXISTS` — backward compatible, не ломает существующие БД.
---
## Список изменяемых файлов
| Файл | Тип изменения | Суть |
|------|--------------|------|
| `backend/db.py` | Modify | Добавить таблицу `registrations` в `init_db()` + 3 функции CRUD |
| `backend/config.py` | Modify | Добавить `ADMIN_CHAT_ID`, `VAPID_PRIVATE_KEY`, `VAPID_PUBLIC_KEY`, `VAPID_CLAIMS_EMAIL` |
| `backend/models.py` | Modify | Добавить `PushKeys`, `PushSubscription`, `AuthRegisterRequest`, `AuthRegisterResponse` |
| `backend/telegram.py` | Modify | Добавить `send_registration_notification()`, `answer_callback_query()`, `edit_message_text()` |
| `backend/main.py` | Modify | Добавить `POST /api/auth/register` + callback_query ветку в webhook |
| `backend/push.py` | **New** | Отправка Web Push через pywebpush |
| `requirements.txt` | Modify | Добавить `pywebpush>=2.0.0` |
| `tests/test_baton_008.py` | **New** | Тесты для нового flow |
**НЕ трогать:** `backend/middleware.py`, `/api/register`, `users` таблица.
**Замечание:** CORS `allow_headers` уже содержит `Authorization` в `main.py:122` — изменение не требуется.
---
## Интеграционные точки с существующим кодом
### 1. `_hash_password()` в `main.py`
Функция уже существует (строки 4148). Dev agent должен **переиспользовать её напрямую** в новом endpoint `POST /api/auth/register`, не дублируя логику.
### 2. `rate_limit_register` middleware
Существующий middleware из `backend/middleware.py` может быть подключён к новому endpoint как `Depends(rate_limit_register)` — тот же ключ `reg:{ip}`, та же логика.
### 3. `telegram.send_message()` — не модифицировать
Существующая функция использует `config.CHAT_ID` для SOS-сигналов. Для регистрационных уведомлений создаётся отдельная функция `send_registration_notification()`, которая использует `config.ADMIN_CHAT_ID`. Это разделяет два потока уведомлений.
### 4. Webhook handler (строки 223242 в `main.py`)
Добавляется ветка в начало функции (до `message = update.get("message", {})`):
```python
callback_query = update.get("callback_query")
if callback_query:
asyncio.create_task(_handle_callback_query(callback_query))
return {"ok": True}
```
Существующая логика `/start` остаётся нетронутой.
### 5. `lifespan` в `main.py`
Никаких изменений — VAPID-ключи не требуют startup validation (unlike BOT_TOKEN), так как их инвалидация некритична для работы сервиса в целом.
---
## Спецификация новых компонентов
### `backend/db.py` — 3 новые функции
```
create_registration(email, login, password_hash, push_subscription) -> dict | None
INSERT INTO registrations ...
ON CONFLICT → raise aiosqlite.IntegrityError (caller catches → 409)
Returns: {"id", "email", "login", "created_at"}
get_registration_by_id(reg_id: int) -> dict | None
SELECT id, email, login, status, push_subscription FROM registrations WHERE id=?
update_registration_status(reg_id: int, status: str) -> dict | None
UPDATE registrations SET status=? WHERE id=?
Returns registration dict or None if not found
```
### `backend/config.py` — новые переменные
```python
ADMIN_CHAT_ID: str = os.getenv("ADMIN_CHAT_ID", "5694335584")
VAPID_PRIVATE_KEY: str = os.getenv("VAPID_PRIVATE_KEY", "")
VAPID_PUBLIC_KEY: str = os.getenv("VAPID_PUBLIC_KEY", "")
VAPID_CLAIMS_EMAIL: str = os.getenv("VAPID_CLAIMS_EMAIL", "")
```
Все optional (не `_require`) — отсутствие VAPID только отключает Web Push, не ломает сервис.
### `backend/models.py` — новые Pydantic модели
```python
class PushKeys(BaseModel):
p256dh: str
auth: str
class PushSubscription(BaseModel):
endpoint: str
keys: PushKeys
class AuthRegisterRequest(BaseModel):
email: str = Field(..., pattern=r'^[^@\s]+@[^@\s]+\.[^@\s]+$')
login: str = Field(..., min_length=3, max_length=30, pattern=r'^[a-zA-Z0-9_-]+$')
password: str = Field(..., min_length=8)
push_subscription: Optional[PushSubscription] = None
class AuthRegisterResponse(BaseModel):
status: str
message: str
```
### `backend/telegram.py` — 3 новые функции
```
send_registration_notification(login, email, reg_id, created_at) -> None
POST sendMessage с reply_markup=InlineKeyboardMarkup
chat_id = config.ADMIN_CHAT_ID
Swallows все ошибки (decision #1215)
answer_callback_query(callback_query_id: str, text: str = "") -> None
POST answerCallbackQuery
Swallows все ошибки
edit_message_text(chat_id: str | int, message_id: int, text: str) -> None
POST editMessageText
Swallows все ошибки
```
Все три используют тот же паттерн retry (3 попытки, 429/5xx) что и `send_message()`.
### `backend/push.py` — новый файл
```
send_push(subscription_json: str, title: str, body: str) -> None
Парсит subscription_json → dict
webpush(
subscription_info=subscription_dict,
data=json.dumps({"title": title, "body": body, "icon": "/icon-192.png"}),
vapid_private_key=config.VAPID_PRIVATE_KEY,
vapid_claims={"sub": f"mailto:{config.VAPID_CLAIMS_EMAIL}"}
)
Если VAPID_PRIVATE_KEY пустой → log warning, return (push disabled)
Swallows WebPushException и все прочие ошибки
```
---
## Edge Cases и решения
| Кейс | Решение |
|------|---------|
| Email уже зарегистрирован | `IntegrityError` → HTTP 409 |
| Login уже занят | `IntegrityError` → HTTP 409 |
| Rejected пользователь пытается зарегистрироваться заново | 409 (статус не учитывается — оба поля UNIQUE) |
| push_subscription = null при approve | `if reg["push_subscription"]: send_push(...)` — skip gracefully |
| Истёкший/невалидный push endpoint | pywebpush raises → `logger.warning()` → swallow |
| Двойной клик Одобрить (admin кликает дважды) | UPDATE выполняется (idempotent), editMessageText может вернуть ошибку (уже отредактировано) → swallow |
| reg_id не существует в callback | `get_registration_by_id` returns None → log warning, answerCallbackQuery всё равно вызвать |
| VAPID ключи не настроены | Push не отправляется, log warning, сервис работает |
| Telegram недоступен при регистрации | Fire-and-forget + swallow — пользователь получает 201, уведомление теряется |
---
## Решения по open questions (из context_packet)
**VAPID ключи не сгенерированы:** Dev agent добавляет в README инструкцию по генерации:
```bash
python -c "from py_vapid import Vapid; v = Vapid(); v.generate_keys(); print(v.private_key, v.public_key)"
```
Ключи добавляются в `.env` вручную оператором перед деплоем.
**Повторный approve/reject:** Операция idempotent — UPDATE всегда выполняется без проверки текущего статуса. EditMessageText вернёт ошибку при повторном вызове — swallow.
**Service Worker:** Фронтенд вне скоупа этого тикета. Backend отправляет корректный Web Push payload — обработка на стороне клиента.
**Login после approve:** Механизм авторизации не входит в BATON-008. Регистрация — отдельный flow от аутентификации.

View file

@ -4,3 +4,5 @@ aiosqlite>=0.20.0
httpx>=0.27.0 httpx>=0.27.0
python-dotenv>=1.0.0 python-dotenv>=1.0.0
pydantic>=2.0 pydantic>=2.0
email-validator>=2.0.0
pywebpush>=2.0.0

View file

@ -73,13 +73,15 @@ def make_app_client():
""" """
Async context manager that: Async context manager that:
1. Assigns a fresh temp-file DB path 1. Assigns a fresh temp-file DB path
2. Mocks Telegram setWebhook and sendMessage 2. Mocks Telegram setWebhook, sendMessage, answerCallbackQuery, editMessageText
3. Runs the FastAPI lifespan (startup test shutdown) 3. Runs the FastAPI lifespan (startup test shutdown)
4. Yields an httpx.AsyncClient wired to the app 4. Yields an httpx.AsyncClient wired to the app
""" """
tg_set_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook" tg_set_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook"
send_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage" send_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage"
get_me_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/getMe" get_me_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/getMe"
answer_cb_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/answerCallbackQuery"
edit_msg_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/editMessageText"
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def _ctx(): async def _ctx():
@ -96,6 +98,12 @@ def make_app_client():
mock_router.post(send_url).mock( mock_router.post(send_url).mock(
return_value=httpx.Response(200, json={"ok": True}) return_value=httpx.Response(200, json={"ok": True})
) )
mock_router.post(answer_cb_url).mock(
return_value=httpx.Response(200, json={"ok": True})
)
mock_router.post(edit_msg_url).mock(
return_value=httpx.Response(200, json={"ok": True})
)
with mock_router: with mock_router:
async with app.router.lifespan_context(app): async with app.router.lifespan_context(app):

View file

@ -42,6 +42,19 @@ _UUID_BLOCK = "f0000001-0000-4000-8000-000000000001"
_UUID_UNBLOCK = "f0000002-0000-4000-8000-000000000002" _UUID_UNBLOCK = "f0000002-0000-4000-8000-000000000002"
_UUID_SIG_OK = "f0000003-0000-4000-8000-000000000003" _UUID_SIG_OK = "f0000003-0000-4000-8000-000000000003"
# Valid UUID v4 for admin-only tests (POST /admin/users, no /api/register call)
_UUID_ADM_UNAUTH = "e0000000-0000-4000-8000-000000000000"
_UUID_ADM_CREATE_1 = "e0000001-0000-4000-8000-000000000001"
_UUID_ADM_CREATE_2 = "e0000002-0000-4000-8000-000000000002"
_UUID_ADM_CREATE_3 = "e0000003-0000-4000-8000-000000000003"
_UUID_ADM_PASS_1 = "e0000004-0000-4000-8000-000000000004"
_UUID_ADM_PASS_2 = "e0000005-0000-4000-8000-000000000005"
_UUID_ADM_BLOCK = "e0000006-0000-4000-8000-000000000006"
_UUID_ADM_UNBLOCK = "e0000007-0000-4000-8000-000000000007"
_UUID_ADM_DELETE_1 = "e0000008-0000-4000-8000-000000000008"
_UUID_ADM_DELETE_2 = "e0000009-0000-4000-8000-000000000009"
_UUID_ADM_REGRESS = "e000000a-0000-4000-8000-000000000010"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Criterion 6 — Unauthorised requests to /admin/* return 401 # Criterion 6 — Unauthorised requests to /admin/* return 401
@ -70,7 +83,7 @@ async def test_admin_create_user_without_token_returns_401() -> None:
async with make_app_client() as client: async with make_app_client() as client:
resp = await client.post( resp = await client.post(
"/admin/users", "/admin/users",
json={"uuid": "unauth-uuid-001", "name": "Ghost"}, json={"uuid": _UUID_ADM_UNAUTH, "name": "Ghost"},
) )
assert resp.status_code == 401 assert resp.status_code == 401
@ -116,12 +129,12 @@ async def test_admin_create_user_returns_201_with_user_data() -> None:
async with make_app_client() as client: async with make_app_client() as client:
resp = await client.post( resp = await client.post(
"/admin/users", "/admin/users",
json={"uuid": "create-uuid-001", "name": "Alice Admin"}, json={"uuid": _UUID_ADM_CREATE_1, "name": "Alice Admin"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
assert resp.status_code == 201 assert resp.status_code == 201
data = resp.json() data = resp.json()
assert data["uuid"] == "create-uuid-001" assert data["uuid"] == _UUID_ADM_CREATE_1
assert data["name"] == "Alice Admin" assert data["name"] == "Alice Admin"
assert data["id"] > 0 assert data["id"] > 0
assert data["is_blocked"] is False assert data["is_blocked"] is False
@ -133,7 +146,7 @@ async def test_admin_create_user_appears_in_list() -> None:
async with make_app_client() as client: async with make_app_client() as client:
await client.post( await client.post(
"/admin/users", "/admin/users",
json={"uuid": "create-uuid-002", "name": "Bob Admin"}, json={"uuid": _UUID_ADM_CREATE_2, "name": "Bob Admin"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
resp = await client.get("/admin/users", headers=ADMIN_HEADERS) resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
@ -141,7 +154,7 @@ async def test_admin_create_user_appears_in_list() -> None:
assert resp.status_code == 200 assert resp.status_code == 200
users = resp.json() users = resp.json()
uuids = [u["uuid"] for u in users] uuids = [u["uuid"] for u in users]
assert "create-uuid-002" in uuids assert _UUID_ADM_CREATE_2 in uuids
@pytest.mark.asyncio @pytest.mark.asyncio
@ -150,12 +163,12 @@ async def test_admin_create_user_duplicate_uuid_returns_409() -> None:
async with make_app_client() as client: async with make_app_client() as client:
await client.post( await client.post(
"/admin/users", "/admin/users",
json={"uuid": "create-uuid-003", "name": "Carol"}, json={"uuid": _UUID_ADM_CREATE_3, "name": "Carol"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
resp = await client.post( resp = await client.post(
"/admin/users", "/admin/users",
json={"uuid": "create-uuid-003", "name": "Carol Duplicate"}, json={"uuid": _UUID_ADM_CREATE_3, "name": "Carol Duplicate"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
assert resp.status_code == 409 assert resp.status_code == 409
@ -181,7 +194,7 @@ async def test_admin_set_password_returns_ok() -> None:
async with make_app_client() as client: async with make_app_client() as client:
create_resp = await client.post( create_resp = await client.post(
"/admin/users", "/admin/users",
json={"uuid": "pass-uuid-001", "name": "PassUser"}, json={"uuid": _UUID_ADM_PASS_1, "name": "PassUser"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
user_id = create_resp.json()["id"] user_id = create_resp.json()["id"]
@ -213,7 +226,7 @@ async def test_admin_set_password_user_still_accessible_after_change() -> None:
async with make_app_client() as client: async with make_app_client() as client:
create_resp = await client.post( create_resp = await client.post(
"/admin/users", "/admin/users",
json={"uuid": "pass-uuid-002", "name": "PassUser2"}, json={"uuid": _UUID_ADM_PASS_2, "name": "PassUser2"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
user_id = create_resp.json()["id"] user_id = create_resp.json()["id"]
@ -227,7 +240,7 @@ async def test_admin_set_password_user_still_accessible_after_change() -> None:
list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS) list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
uuids = [u["uuid"] for u in list_resp.json()] uuids = [u["uuid"] for u in list_resp.json()]
assert "pass-uuid-002" in uuids assert _UUID_ADM_PASS_2 in uuids
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -241,7 +254,7 @@ async def test_admin_block_user_returns_is_blocked_true() -> None:
async with make_app_client() as client: async with make_app_client() as client:
create_resp = await client.post( create_resp = await client.post(
"/admin/users", "/admin/users",
json={"uuid": "block-uuid-001", "name": "BlockUser"}, json={"uuid": _UUID_ADM_BLOCK, "name": "BlockUser"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
user_id = create_resp.json()["id"] user_id = create_resp.json()["id"]
@ -312,7 +325,7 @@ async def test_admin_unblock_user_returns_is_blocked_false() -> None:
async with make_app_client() as client: async with make_app_client() as client:
create_resp = await client.post( create_resp = await client.post(
"/admin/users", "/admin/users",
json={"uuid": "unblock-uuid-001", "name": "UnblockUser"}, json={"uuid": _UUID_ADM_UNBLOCK, "name": "UnblockUser"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
user_id = create_resp.json()["id"] user_id = create_resp.json()["id"]
@ -385,7 +398,7 @@ async def test_admin_delete_user_returns_204() -> None:
async with make_app_client() as client: async with make_app_client() as client:
create_resp = await client.post( create_resp = await client.post(
"/admin/users", "/admin/users",
json={"uuid": "delete-uuid-001", "name": "DeleteUser"}, json={"uuid": _UUID_ADM_DELETE_1, "name": "DeleteUser"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
user_id = create_resp.json()["id"] user_id = create_resp.json()["id"]
@ -403,7 +416,7 @@ async def test_admin_delete_user_disappears_from_list() -> None:
async with make_app_client() as client: async with make_app_client() as client:
create_resp = await client.post( create_resp = await client.post(
"/admin/users", "/admin/users",
json={"uuid": "delete-uuid-002", "name": "DeleteUser2"}, json={"uuid": _UUID_ADM_DELETE_2, "name": "DeleteUser2"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )
user_id = create_resp.json()["id"] user_id = create_resp.json()["id"]
@ -416,7 +429,7 @@ async def test_admin_delete_user_disappears_from_list() -> None:
list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS) list_resp = await client.get("/admin/users", headers=ADMIN_HEADERS)
uuids = [u["uuid"] for u in list_resp.json()] uuids = [u["uuid"] for u in list_resp.json()]
assert "delete-uuid-002" not in uuids assert _UUID_ADM_DELETE_2 not in uuids
@pytest.mark.asyncio @pytest.mark.asyncio
@ -480,7 +493,7 @@ async def test_register_not_broken_after_admin_operations() -> None:
# Admin операции # Admin операции
await client.post( await client.post(
"/admin/users", "/admin/users",
json={"uuid": "regress-admin-uuid-001", "name": "AdminCreated"}, json={"uuid": _UUID_ADM_REGRESS, "name": "AdminCreated"},
headers=ADMIN_HEADERS, headers=ADMIN_HEADERS,
) )

View file

@ -15,6 +15,7 @@ Physical delivery to an actual Telegram group is outside unit test scope.
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
import os import os
os.environ.setdefault("BOT_TOKEN", "test-bot-token") os.environ.setdefault("BOT_TOKEN", "test-bot-token")
@ -30,9 +31,9 @@ from unittest.mock import AsyncMock, patch
import httpx import httpx
import pytest import pytest
import respx import respx
from httpx import AsyncClient from httpx import AsyncClient, ASGITransport
from tests.conftest import make_app_client from tests.conftest import make_app_client, temp_db
# Valid UUID v4 constants — must not collide with UUIDs in other test files # Valid UUID v4 constants — must not collide with UUIDs in other test files
_UUID_A = "d0000001-0000-4000-8000-000000000001" _UUID_A = "d0000001-0000-4000-8000-000000000001"
@ -40,6 +41,7 @@ _UUID_B = "d0000002-0000-4000-8000-000000000002"
_UUID_C = "d0000003-0000-4000-8000-000000000003" _UUID_C = "d0000003-0000-4000-8000-000000000003"
_UUID_D = "d0000004-0000-4000-8000-000000000004" _UUID_D = "d0000004-0000-4000-8000-000000000004"
_UUID_E = "d0000005-0000-4000-8000-000000000005" _UUID_E = "d0000005-0000-4000-8000-000000000005"
_UUID_F = "d0000006-0000-4000-8000-000000000006"
async def _register(client: AsyncClient, uuid: str, name: str) -> str: async def _register(client: AsyncClient, uuid: str, name: str) -> str:
@ -260,3 +262,120 @@ async def test_repeated_signals_produce_incrementing_signal_ids():
assert r2.json()["signal_id"] > r1.json()["signal_id"], ( assert r2.json()["signal_id"] > r1.json()["signal_id"], (
"Second signal must have a higher signal_id than the first" "Second signal must have a higher signal_id than the first"
) )
# ---------------------------------------------------------------------------
# Director revision: regression #1214, #1226
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_send_message_uses_negative_chat_id_from_config():
"""Regression #1226: send_message must POST to Telegram with a negative chat_id.
Root cause of BATON-007: CHAT_ID=5190015988 (positive = user ID) was set in .env
instead of -5190015988 (negative = group ID). This test inspects the actual
chat_id value in the HTTP request body not just call_count.
"""
from backend import config as _cfg
from backend.telegram import send_message
send_url = f"https://api.telegram.org/bot{_cfg.BOT_TOKEN}/sendMessage"
with respx.mock(assert_all_called=False) as mock:
route = mock.post(send_url).mock(
return_value=httpx.Response(200, json={"ok": True})
)
await send_message("regression #1226")
assert route.called
body = json.loads(route.calls[0].request.content)
chat_id = body["chat_id"]
assert chat_id == _cfg.CHAT_ID, (
f"Expected chat_id={_cfg.CHAT_ID!r}, got {chat_id!r}"
)
assert str(chat_id).startswith("-"), (
f"Regression #1226: chat_id must be negative (group ID), got: {chat_id!r}. "
"Positive chat_id is a user ID, not a Telegram group."
)
@pytest.mark.asyncio
async def test_send_message_4xx_does_not_trigger_retry_loop():
"""Regression #1214: on Telegram 4xx (wrong chat_id), retry loop must NOT run.
Only one HTTP call should be made. Retrying a 4xx is pointless it will
keep failing. send_message must break immediately on any 4xx response.
"""
from backend import config as _cfg
from backend.telegram import send_message
send_url = f"https://api.telegram.org/bot{_cfg.BOT_TOKEN}/sendMessage"
with respx.mock(assert_all_called=False) as mock:
route = mock.post(send_url).mock(
return_value=httpx.Response(
400,
json={"ok": False, "error_code": 400, "description": "Bad Request: chat not found"},
)
)
await send_message("retry test #1214")
assert route.call_count == 1, (
f"Regression #1214: expected exactly 1 HTTP call on 4xx, got {route.call_count}. "
"send_message must break immediately on client errors — no retry loop."
)
@pytest.mark.asyncio
async def test_signal_endpoint_returns_200_on_telegram_4xx(caplog):
"""Regression: /api/signal must return 200 even when Telegram Bot API returns 4xx.
When CHAT_ID is wrong (or any Telegram 4xx), the error must be logged by
send_message but the /api/signal endpoint must still return 200 the signal
was saved to DB, only the Telegram notification failed.
"""
from backend import config as _cfg
from backend.main import app
send_url = f"https://api.telegram.org/bot{_cfg.BOT_TOKEN}/sendMessage"
tg_set_url = f"https://api.telegram.org/bot{_cfg.BOT_TOKEN}/setWebhook"
get_me_url = f"https://api.telegram.org/bot{_cfg.BOT_TOKEN}/getMe"
with temp_db():
with respx.mock(assert_all_called=False) as mock_tg:
mock_tg.get(get_me_url).mock(
return_value=httpx.Response(200, json={"ok": True, "result": {"username": "testbot"}})
)
mock_tg.post(tg_set_url).mock(
return_value=httpx.Response(200, json={"ok": True, "result": True})
)
mock_tg.post(send_url).mock(
return_value=httpx.Response(
400,
json={"ok": False, "error_code": 400, "description": "Bad Request: chat not found"},
)
)
async with app.router.lifespan_context(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://testserver") as client:
reg = await client.post("/api/register", json={"uuid": _UUID_F, "name": "Tg4xxUser"})
assert reg.status_code == 200, f"Register failed: {reg.text}"
api_key = reg.json()["api_key"]
with caplog.at_level(logging.ERROR, logger="backend.telegram"):
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_F, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {api_key}"},
)
await asyncio.sleep(0)
assert resp.status_code == 200, (
f"Expected /api/signal to return 200 even when Telegram returns 4xx, got {resp.status_code}"
)
assert any("400" in r.message for r in caplog.records), (
"Expected ERROR log containing '400' when Telegram returns 4xx. "
"Error must be logged, not silently swallowed."
)

581
tests/test_baton_008.py Normal file
View file

@ -0,0 +1,581 @@
"""
Tests for BATON-008: Registration flow with Telegram admin approval.
Acceptance criteria:
1. POST /api/auth/register returns 201 with status='pending' on valid input
2. POST /api/auth/register returns 409 on email or login conflict
3. POST /api/auth/register returns 422 on invalid email/login/password
4. Telegram notification is fire-and-forget 201 is returned even if Telegram fails
5. Webhook callback_query approve db status='approved', push task fired if subscription present
6. Webhook callback_query reject db status='rejected'
7. Webhook callback_query with unknown reg_id returns {"ok": True} gracefully
"""
from __future__ import annotations
import asyncio
import os
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
from unittest.mock import AsyncMock, patch
import pytest
from tests.conftest import make_app_client
_WEBHOOK_SECRET = "test-webhook-secret"
_WEBHOOK_HEADERS = {"X-Telegram-Bot-Api-Secret-Token": _WEBHOOK_SECRET}
_VALID_PAYLOAD = {
"email": "user@example.com",
"login": "testuser",
"password": "strongpassword123",
}
# ---------------------------------------------------------------------------
# 1. Happy path — 201
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_auth_register_returns_201_pending():
"""Valid registration request returns 201 with status='pending'."""
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
resp = await client.post("/api/auth/register", json=_VALID_PAYLOAD)
assert resp.status_code == 201, f"Expected 201, got {resp.status_code}: {resp.text}"
body = resp.json()
assert body["status"] == "pending"
assert "message" in body
@pytest.mark.asyncio
async def test_auth_register_fire_and_forget_telegram_error_still_returns_201():
"""Telegram failure must not break 201 — fire-and-forget pattern."""
async with make_app_client() as client:
with patch(
"backend.telegram.send_registration_notification",
new_callable=AsyncMock,
side_effect=Exception("Telegram down"),
):
resp = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "email": "other@example.com", "login": "otheruser"},
)
await asyncio.sleep(0)
assert resp.status_code == 201, f"Telegram error must not break 201, got {resp.status_code}"
# ---------------------------------------------------------------------------
# 2. Conflict — 409
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_auth_register_409_on_duplicate_email():
"""Duplicate email returns 409."""
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
r1 = await client.post("/api/auth/register", json=_VALID_PAYLOAD)
assert r1.status_code == 201, f"First registration failed: {r1.text}"
r2 = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "login": "differentlogin"},
)
assert r2.status_code == 409, f"Expected 409 on duplicate email, got {r2.status_code}"
@pytest.mark.asyncio
async def test_auth_register_409_on_duplicate_login():
"""Duplicate login returns 409."""
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
r1 = await client.post("/api/auth/register", json=_VALID_PAYLOAD)
assert r1.status_code == 201, f"First registration failed: {r1.text}"
r2 = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "email": "different@example.com"},
)
assert r2.status_code == 409, f"Expected 409 on duplicate login, got {r2.status_code}"
# ---------------------------------------------------------------------------
# 3. Validation — 422
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_auth_register_422_invalid_email():
"""Invalid email format returns 422."""
async with make_app_client() as client:
resp = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "email": "not-an-email"},
)
assert resp.status_code == 422, f"Expected 422 on invalid email, got {resp.status_code}"
@pytest.mark.asyncio
async def test_auth_register_422_short_login():
"""Login shorter than 3 chars returns 422."""
async with make_app_client() as client:
resp = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "login": "ab"},
)
assert resp.status_code == 422, f"Expected 422 on short login, got {resp.status_code}"
@pytest.mark.asyncio
async def test_auth_register_422_login_invalid_chars():
"""Login with spaces/special chars returns 422."""
async with make_app_client() as client:
resp = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "login": "invalid login!"},
)
assert resp.status_code == 422, f"Expected 422 on login with spaces, got {resp.status_code}"
@pytest.mark.asyncio
async def test_auth_register_422_short_password():
"""Password shorter than 8 chars returns 422."""
async with make_app_client() as client:
resp = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "password": "short"},
)
assert resp.status_code == 422, f"Expected 422 on short password, got {resp.status_code}"
# ---------------------------------------------------------------------------
# 4. Telegram notification is sent to ADMIN_CHAT_ID
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_auth_register_sends_notification_to_admin():
"""Registration triggers send_registration_notification with correct data."""
calls: list[dict] = []
async def _capture(reg_id, login, email, created_at):
calls.append({"reg_id": reg_id, "login": login, "email": email})
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", side_effect=_capture):
await client.post("/api/auth/register", json=_VALID_PAYLOAD)
await asyncio.sleep(0)
assert len(calls) == 1, f"Expected 1 notification call, got {len(calls)}"
assert calls[0]["login"] == _VALID_PAYLOAD["login"]
assert calls[0]["email"] == _VALID_PAYLOAD["email"]
# ---------------------------------------------------------------------------
# 5. Webhook callback_query — approve
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_webhook_callback_approve_updates_db_status():
"""approve callback updates registration status to 'approved' in DB."""
from backend import db as _db
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
reg_resp = await client.post("/api/auth/register", json=_VALID_PAYLOAD)
assert reg_resp.status_code == 201
# We need reg_id — get it from DB directly
reg_id = None
from tests.conftest import temp_db as _temp_db # noqa: F401 — already active
from backend import config as _cfg
import aiosqlite
async with aiosqlite.connect(_cfg.DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute("SELECT id FROM registrations LIMIT 1") as cur:
row = await cur.fetchone()
reg_id = row["id"] if row else None
assert reg_id is not None, "Registration not found in DB"
cb_payload = {
"callback_query": {
"id": "cq_001",
"data": f"approve:{reg_id}",
"message": {
"message_id": 42,
"chat": {"id": 5694335584},
},
}
}
resp = await client.post(
"/api/webhook/telegram",
json=cb_payload,
headers=_WEBHOOK_HEADERS,
)
await asyncio.sleep(0)
assert resp.status_code == 200
assert resp.json() == {"ok": True}
# Verify DB status updated
reg = await _db.get_registration(reg_id)
assert reg is not None
assert reg["status"] == "approved", f"Expected status='approved', got {reg['status']!r}"
@pytest.mark.asyncio
async def test_webhook_callback_approve_fires_push_when_subscription_present():
"""approve callback triggers send_push when push_subscription is set."""
push_sub = {
"endpoint": "https://fcm.googleapis.com/fcm/send/test",
"keys": {"p256dh": "BQABC", "auth": "xyz"},
}
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
reg_resp = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "push_subscription": push_sub},
)
assert reg_resp.status_code == 201
from backend import config as _cfg
import aiosqlite
async with aiosqlite.connect(_cfg.DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute("SELECT id FROM registrations LIMIT 1") as cur:
row = await cur.fetchone()
reg_id = row["id"] if row else None
assert reg_id is not None
cb_payload = {
"callback_query": {
"id": "cq_002",
"data": f"approve:{reg_id}",
"message": {"message_id": 43, "chat": {"id": 5694335584}},
}
}
push_calls: list = []
async def _capture_push(sub_json, title, body):
push_calls.append(sub_json)
with patch("backend.push.send_push", side_effect=_capture_push):
await client.post("/api/webhook/telegram", json=cb_payload, headers=_WEBHOOK_HEADERS)
await asyncio.sleep(0)
assert len(push_calls) == 1, f"Expected 1 push call, got {len(push_calls)}"
# ---------------------------------------------------------------------------
# 6. Webhook callback_query — reject
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_webhook_callback_reject_updates_db_status():
"""reject callback updates registration status to 'rejected' in DB."""
from backend import db as _db
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
reg_resp = await client.post("/api/auth/register", json=_VALID_PAYLOAD)
assert reg_resp.status_code == 201
from backend import config as _cfg
import aiosqlite
async with aiosqlite.connect(_cfg.DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute("SELECT id FROM registrations LIMIT 1") as cur:
row = await cur.fetchone()
reg_id = row["id"] if row else None
assert reg_id is not None
cb_payload = {
"callback_query": {
"id": "cq_003",
"data": f"reject:{reg_id}",
"message": {"message_id": 44, "chat": {"id": 5694335584}},
}
}
resp = await client.post(
"/api/webhook/telegram", json=cb_payload, headers=_WEBHOOK_HEADERS
)
await asyncio.sleep(0)
assert resp.status_code == 200
reg = await _db.get_registration(reg_id)
assert reg is not None
assert reg["status"] == "rejected", f"Expected status='rejected', got {reg['status']!r}"
# ---------------------------------------------------------------------------
# 7. Unknown reg_id — graceful handling
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_webhook_callback_unknown_reg_id_returns_ok():
"""callback_query with unknown reg_id returns ok without error."""
async with make_app_client() as client:
cb_payload = {
"callback_query": {
"id": "cq_999",
"data": "approve:99999",
"message": {"message_id": 1, "chat": {"id": 5694335584}},
}
}
resp = await client.post(
"/api/webhook/telegram", json=cb_payload, headers=_WEBHOOK_HEADERS
)
await asyncio.sleep(0)
assert resp.status_code == 200
assert resp.json() == {"ok": True}
# ---------------------------------------------------------------------------
# 8. Registration without push_subscription
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_register_without_push_subscription():
"""Registration with push_subscription=null returns 201."""
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
resp = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "email": "nopush@example.com", "login": "nopushuser"},
)
assert resp.status_code == 201
assert resp.json()["status"] == "pending"
# ---------------------------------------------------------------------------
# 9. reject does NOT trigger Web Push
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_webhook_callback_reject_does_not_send_push():
"""reject callback does NOT call send_push."""
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
reg_resp = await client.post("/api/auth/register", json=_VALID_PAYLOAD)
assert reg_resp.status_code == 201
from backend import config as _cfg
import aiosqlite
async with aiosqlite.connect(_cfg.DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute("SELECT id FROM registrations LIMIT 1") as cur:
row = await cur.fetchone()
reg_id = row["id"] if row else None
assert reg_id is not None
cb_payload = {
"callback_query": {
"id": "cq_r001",
"data": f"reject:{reg_id}",
"message": {"message_id": 50, "chat": {"id": 5694335584}},
}
}
push_calls: list = []
async def _capture_push(sub_json, title, body):
push_calls.append(sub_json)
with patch("backend.push.send_push", side_effect=_capture_push):
await client.post("/api/webhook/telegram", json=cb_payload, headers=_WEBHOOK_HEADERS)
await asyncio.sleep(0)
assert len(push_calls) == 0, f"Expected 0 push calls on reject, got {len(push_calls)}"
# ---------------------------------------------------------------------------
# 10. approve calls editMessageText with ✅ text
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_webhook_callback_approve_edits_message():
"""approve callback calls editMessageText with '✅ Пользователь ... одобрен'."""
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
reg_resp = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "email": "edit@example.com", "login": "edituser"},
)
assert reg_resp.status_code == 201
from backend import config as _cfg
import aiosqlite
async with aiosqlite.connect(_cfg.DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute("SELECT id FROM registrations LIMIT 1") as cur:
row = await cur.fetchone()
reg_id = row["id"] if row else None
assert reg_id is not None
cb_payload = {
"callback_query": {
"id": "cq_e001",
"data": f"approve:{reg_id}",
"message": {"message_id": 51, "chat": {"id": 5694335584}},
}
}
edit_calls: list[str] = []
async def _capture_edit(chat_id, message_id, text):
edit_calls.append(text)
with patch("backend.telegram.edit_message_text", side_effect=_capture_edit):
await client.post("/api/webhook/telegram", json=cb_payload, headers=_WEBHOOK_HEADERS)
assert len(edit_calls) == 1, f"Expected 1 editMessageText call, got {len(edit_calls)}"
assert "" in edit_calls[0], f"Expected ✅ in edit text, got: {edit_calls[0]!r}"
assert "edituser" in edit_calls[0], f"Expected login in edit text, got: {edit_calls[0]!r}"
# ---------------------------------------------------------------------------
# 11. answerCallbackQuery is called after callback processing
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_webhook_callback_answer_sent():
"""answerCallbackQuery is called with the callback_query_id after processing."""
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
reg_resp = await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "email": "answer@example.com", "login": "answeruser"},
)
assert reg_resp.status_code == 201
from backend import config as _cfg
import aiosqlite
async with aiosqlite.connect(_cfg.DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute("SELECT id FROM registrations LIMIT 1") as cur:
row = await cur.fetchone()
reg_id = row["id"] if row else None
assert reg_id is not None
cb_payload = {
"callback_query": {
"id": "cq_a001",
"data": f"approve:{reg_id}",
"message": {"message_id": 52, "chat": {"id": 5694335584}},
}
}
answer_calls: list[str] = []
async def _capture_answer(callback_query_id):
answer_calls.append(callback_query_id)
with patch("backend.telegram.answer_callback_query", side_effect=_capture_answer):
await client.post("/api/webhook/telegram", json=cb_payload, headers=_WEBHOOK_HEADERS)
assert len(answer_calls) == 1, f"Expected 1 answerCallbackQuery call, got {len(answer_calls)}"
assert answer_calls[0] == "cq_a001"
# ---------------------------------------------------------------------------
# 12. CORS — Authorization header is allowed
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cors_authorization_header_allowed():
"""CORS preflight request allows Authorization header."""
async with make_app_client() as client:
resp = await client.options(
"/api/auth/register",
headers={
"Origin": "http://localhost:3000",
"Access-Control-Request-Method": "POST",
"Access-Control-Request-Headers": "Authorization",
},
)
assert resp.status_code in (200, 204), f"CORS preflight returned {resp.status_code}"
allow_headers = resp.headers.get("access-control-allow-headers", "")
assert "authorization" in allow_headers.lower(), (
f"Authorization not in Access-Control-Allow-Headers: {allow_headers!r}"
)
# ---------------------------------------------------------------------------
# 13. DB — registrations table exists after init_db
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_registrations_table_created():
"""init_db creates the registrations table with correct schema."""
from tests.conftest import temp_db
from backend import db as _db, config as _cfg
import aiosqlite
with temp_db():
await _db.init_db()
async with aiosqlite.connect(_cfg.DB_PATH) as conn:
async with conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='registrations'"
) as cur:
row = await cur.fetchone()
assert row is not None, "Table 'registrations' not found after init_db()"
# ---------------------------------------------------------------------------
# 14. DB — password_hash uses PBKDF2 '{salt_hex}:{dk_hex}' format
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_password_hash_stored_in_pbkdf2_format():
"""Stored password_hash uses '<salt_hex>:<dk_hex>' PBKDF2 format."""
from backend import config as _cfg
import aiosqlite
async with make_app_client() as client:
with patch("backend.telegram.send_registration_notification", new_callable=AsyncMock):
await client.post(
"/api/auth/register",
json={**_VALID_PAYLOAD, "email": "pbkdf2@example.com", "login": "pbkdf2user"},
)
async with aiosqlite.connect(_cfg.DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(
"SELECT password_hash FROM registrations WHERE login = 'pbkdf2user'"
) as cur:
row = await cur.fetchone()
assert row is not None, "Registration not found in DB"
password_hash = row["password_hash"]
assert ":" in password_hash, f"Expected 'salt:hash' format, got {password_hash!r}"
parts = password_hash.split(":")
assert len(parts) == 2, f"Expected exactly one colon separator, got {password_hash!r}"
salt_hex, dk_hex = parts
# salt = os.urandom(16) → 32 hex chars; dk = SHA-256 output (32 bytes) → 64 hex chars
assert len(salt_hex) == 32, f"Expected 32-char salt hex, got {len(salt_hex)}"
assert len(dk_hex) == 64, f"Expected 64-char dk hex (SHA-256), got {len(dk_hex)}"
int(salt_hex, 16) # raises ValueError if not valid hex
int(dk_hex, 16)

View file

@ -29,6 +29,14 @@ import pytest
from backend import config, db from backend import config, db
# Valid UUID v4 constants — db-layer tests bypass Pydantic but use canonical UUIDs
_UUID_DB_1 = "d0000001-0000-4000-8000-000000000001"
_UUID_DB_2 = "d0000002-0000-4000-8000-000000000002"
_UUID_DB_3 = "d0000003-0000-4000-8000-000000000003"
_UUID_DB_4 = "d0000004-0000-4000-8000-000000000004"
_UUID_DB_5 = "d0000005-0000-4000-8000-000000000005"
_UUID_DB_6 = "d0000006-0000-4000-8000-000000000006"
def _tmpdb(): def _tmpdb():
"""Return a fresh temp-file path and set config.DB_PATH.""" """Return a fresh temp-file path and set config.DB_PATH."""
@ -128,10 +136,10 @@ async def test_register_user_returns_id():
path = _tmpdb() path = _tmpdb()
try: try:
await db.init_db() await db.init_db()
result = await db.register_user(uuid="uuid-001", name="Alice") result = await db.register_user(uuid=_UUID_DB_1, name="Alice")
assert isinstance(result["user_id"], int) assert isinstance(result["user_id"], int)
assert result["user_id"] > 0 assert result["user_id"] > 0
assert result["uuid"] == "uuid-001" assert result["uuid"] == _UUID_DB_1
finally: finally:
_cleanup(path) _cleanup(path)
@ -142,8 +150,8 @@ async def test_register_user_idempotent():
path = _tmpdb() path = _tmpdb()
try: try:
await db.init_db() await db.init_db()
r1 = await db.register_user(uuid="uuid-002", name="Bob") r1 = await db.register_user(uuid=_UUID_DB_2, name="Bob")
r2 = await db.register_user(uuid="uuid-002", name="Bob") r2 = await db.register_user(uuid=_UUID_DB_2, name="Bob")
assert r1["user_id"] == r2["user_id"] assert r1["user_id"] == r2["user_id"]
finally: finally:
_cleanup(path) _cleanup(path)
@ -159,8 +167,8 @@ async def test_get_user_name_returns_name():
path = _tmpdb() path = _tmpdb()
try: try:
await db.init_db() await db.init_db()
await db.register_user(uuid="uuid-003", name="Charlie") await db.register_user(uuid=_UUID_DB_3, name="Charlie")
name = await db.get_user_name("uuid-003") name = await db.get_user_name(_UUID_DB_3)
assert name == "Charlie" assert name == "Charlie"
finally: finally:
_cleanup(path) _cleanup(path)
@ -188,9 +196,9 @@ async def test_save_signal_returns_id():
path = _tmpdb() path = _tmpdb()
try: try:
await db.init_db() await db.init_db()
await db.register_user(uuid="uuid-004", name="Dana") await db.register_user(uuid=_UUID_DB_4, name="Dana")
signal_id = await db.save_signal( signal_id = await db.save_signal(
user_uuid="uuid-004", user_uuid=_UUID_DB_4,
timestamp=1742478000000, timestamp=1742478000000,
lat=55.7558, lat=55.7558,
lon=37.6173, lon=37.6173,
@ -208,9 +216,9 @@ async def test_save_signal_without_geo():
path = _tmpdb() path = _tmpdb()
try: try:
await db.init_db() await db.init_db()
await db.register_user(uuid="uuid-005", name="Eve") await db.register_user(uuid=_UUID_DB_5, name="Eve")
signal_id = await db.save_signal( signal_id = await db.save_signal(
user_uuid="uuid-005", user_uuid=_UUID_DB_5,
timestamp=1742478000000, timestamp=1742478000000,
lat=None, lat=None,
lon=None, lon=None,
@ -239,9 +247,9 @@ async def test_save_signal_increments_id():
path = _tmpdb() path = _tmpdb()
try: try:
await db.init_db() await db.init_db()
await db.register_user(uuid="uuid-006", name="Frank") await db.register_user(uuid=_UUID_DB_6, name="Frank")
id1 = await db.save_signal("uuid-006", 1742478000001, None, None, None) id1 = await db.save_signal(_UUID_DB_6, 1742478000001, None, None, None)
id2 = await db.save_signal("uuid-006", 1742478000002, None, None, None) id2 = await db.save_signal(_UUID_DB_6, 1742478000002, None, None, None)
assert id2 > id1 assert id2 > id1
finally: finally:
_cleanup(path) _cleanup(path)

172
tests/test_fix_005.py Normal file
View file

@ -0,0 +1,172 @@
"""
Tests for BATON-FIX-005: BOT_TOKEN leak prevention in logs.
Acceptance criteria covered by unit tests:
AC#4 — no places in source code where token is logged in plain text:
- _mask_token() returns masked representation (***XXXX format)
- validate_bot_token() exception handler does not log raw BOT_TOKEN
- validate_bot_token() exception handler logs type(exc).__name__ + masked token
- httpcore logger level >= WARNING (prevents URL leak via transport layer)
AC#1, AC#2, AC#3 (journalctl, webhook, service health) require live production
verification and are outside unit test scope.
"""
from __future__ import annotations
import logging
import os
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
import httpx
import pytest
import respx
from backend import config
from backend.telegram import _mask_token, validate_bot_token
GET_ME_URL = f"https://api.telegram.org/bot{config.BOT_TOKEN}/getMe"
# ---------------------------------------------------------------------------
# _mask_token helper
# ---------------------------------------------------------------------------
def test_mask_token_shows_last_4_chars():
"""_mask_token returns '***XXXX' where XXXX is the last 4 chars of the token."""
token = "123456789:ABCDEFsomeLongTokenXYZW"
result = _mask_token(token)
assert result == f"***{token[-4:]}", f"Expected ***{token[-4:]}, got {result!r}"
def test_mask_token_hides_most_of_token():
"""_mask_token must NOT expose the full token — only last 4 chars."""
token = "123456789:ABCDEFsomeLongTokenXYZW"
result = _mask_token(token)
assert token[:-4] not in result, f"Masked token exposes too much: {result!r}"
def test_mask_token_short_token_returns_redacted():
"""_mask_token returns '***REDACTED***' for tokens shorter than 4 chars."""
assert _mask_token("abc") == "***REDACTED***"
def test_mask_token_empty_string_returns_redacted():
"""_mask_token on empty string returns '***REDACTED***'."""
assert _mask_token("") == "***REDACTED***"
def test_mask_token_exactly_4_chars_is_not_redacted():
"""_mask_token with exactly 4 chars returns '***XXXX' (not redacted)."""
result = _mask_token("1234")
assert result == "***1234", f"Expected ***1234, got {result!r}"
# ---------------------------------------------------------------------------
# httpcore logger suppression (new in FIX-005; httpx covered in test_fix_011)
# ---------------------------------------------------------------------------
def test_httpcore_logger_level_is_warning_or_higher():
"""logging.getLogger('httpcore').level must be WARNING or higher after app import."""
import backend.main # noqa: F401 — ensures telegram.py module-level setLevel is called
httpcore_logger = logging.getLogger("httpcore")
assert httpcore_logger.level >= logging.WARNING, (
f"httpcore logger level must be >= WARNING (30), got {httpcore_logger.level}. "
"httpcore logs transport-level requests including URLs with BOT_TOKEN."
)
def test_httpcore_logger_info_not_enabled():
"""httpcore logger must not propagate INFO-level messages (would leak BOT_TOKEN URL)."""
import backend.main # noqa: F401
httpcore_logger = logging.getLogger("httpcore")
assert not httpcore_logger.isEnabledFor(logging.INFO), (
"httpcore logger must not process INFO messages — could leak BOT_TOKEN via URL"
)
# ---------------------------------------------------------------------------
# validate_bot_token() exception handler — AC#4: no raw token in error logs
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_validate_bot_token_network_error_does_not_log_raw_token(caplog):
"""validate_bot_token() on ConnectError must NOT log the raw BOT_TOKEN.
AC#4: The exception handler logs type(exc).__name__ + _mask_token() instead
of raw exc, which embeds the Telegram API URL containing the token.
"""
with respx.mock(assert_all_called=False) as mock:
mock.get(GET_ME_URL).mock(side_effect=httpx.ConnectError("connection refused"))
with caplog.at_level(logging.ERROR, logger="backend.telegram"):
result = await validate_bot_token()
assert result is False
raw_token = config.BOT_TOKEN
for record in caplog.records:
assert raw_token not in record.message, (
f"AC#4: Raw BOT_TOKEN leaked in log message: {record.message!r}"
)
@pytest.mark.asyncio
async def test_validate_bot_token_network_error_logs_exception_type_name(caplog):
"""validate_bot_token() on ConnectError logs the exception type name, not repr(exc).
The fixed handler: logger.error('...%s...', type(exc).__name__, ...) not str(exc).
"""
with respx.mock(assert_all_called=False) as mock:
mock.get(GET_ME_URL).mock(side_effect=httpx.ConnectError("connection refused"))
with caplog.at_level(logging.ERROR, logger="backend.telegram"):
await validate_bot_token()
error_messages = [r.message for r in caplog.records if r.levelno >= logging.ERROR]
assert error_messages, "Expected at least one ERROR log on network failure"
assert any("ConnectError" in msg for msg in error_messages), (
f"Expected 'ConnectError' (type name) in error log, got: {error_messages}"
)
@pytest.mark.asyncio
async def test_validate_bot_token_network_error_logs_masked_token(caplog):
"""validate_bot_token() on network error logs masked token (***XXXX), not raw token."""
with respx.mock(assert_all_called=False) as mock:
mock.get(GET_ME_URL).mock(side_effect=httpx.ConnectError("connection refused"))
with caplog.at_level(logging.ERROR, logger="backend.telegram"):
await validate_bot_token()
token = config.BOT_TOKEN # "test-bot-token"
masked = f"***{token[-4:]}" # "***oken"
error_messages = [r.message for r in caplog.records if r.levelno >= logging.ERROR]
assert any(masked in msg for msg in error_messages), (
f"Expected masked token '{masked}' in error log. Got: {error_messages}"
)
@pytest.mark.asyncio
async def test_validate_bot_token_network_error_no_api_url_in_logs(caplog):
"""validate_bot_token() on network error must not log the Telegram API URL.
httpx embeds the request URL (including the token) into exception repr/str.
The fixed handler avoids logging exc directly to prevent this leak.
"""
with respx.mock(assert_all_called=False) as mock:
mock.get(GET_ME_URL).mock(side_effect=httpx.ConnectError("connection refused"))
with caplog.at_level(logging.ERROR, logger="backend.telegram"):
await validate_bot_token()
for record in caplog.records:
assert "api.telegram.org" not in record.message, (
f"AC#4: Telegram API URL (containing token) leaked in log: {record.message!r}"
)

229
tests/test_fix_009.py Normal file
View file

@ -0,0 +1,229 @@
"""
Tests for BATON-FIX-009: Live delivery verification automated regression guards.
Acceptance criteria mapped to unit tests:
AC#3 — BOT_TOKEN validates on startup via validate_bot_token() (getMe call)
AC#4 — CHAT_ID is negative (regression guard for decision #1212)
AC#1 — POST /api/signal returns 200 with valid auth
Physical production checks (AC#2 Telegram group message, AC#5 systemd status)
are outside unit test scope and require live production verification.
"""
from __future__ import annotations
import logging
import os
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
import json
from unittest.mock import AsyncMock, patch
import httpx
import pytest
import respx
from tests.conftest import make_app_client, temp_db
# ---------------------------------------------------------------------------
# AC#3 — validate_bot_token called at startup (decision #1211)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_validate_bot_token_called_once_during_startup():
"""AC#3: validate_bot_token() must be called exactly once during app startup.
Maps to production check: curl getMe must be executed to detect invalid token
before the service starts accepting signals (decision #1211).
"""
from backend.main import app
with temp_db():
with patch("backend.telegram.validate_bot_token", new_callable=AsyncMock) as mock_validate:
mock_validate.return_value = True
with patch("backend.telegram.set_webhook", new_callable=AsyncMock):
async with app.router.lifespan_context(app):
pass
assert mock_validate.call_count == 1, (
f"Expected validate_bot_token to be called exactly once at startup, "
f"got {mock_validate.call_count}"
)
@pytest.mark.asyncio
async def test_invalid_bot_token_logs_critical_error_on_startup(caplog):
"""AC#3: When BOT_TOKEN is invalid (validate_bot_token returns False),
a CRITICAL/ERROR is logged but lifespan continues service must not crash.
Maps to: 'Check BOT_TOKEN valid via getMe — status OK/FAIL' (decision #1211).
"""
from backend.main import app
with temp_db():
with patch("backend.telegram.validate_bot_token", new_callable=AsyncMock) as mock_validate:
mock_validate.return_value = False
with patch("backend.telegram.set_webhook", new_callable=AsyncMock):
with caplog.at_level(logging.ERROR, logger="backend.main"):
async with app.router.lifespan_context(app):
pass # lifespan must complete without raising
critical_msgs = [r.message for r in caplog.records if r.levelno >= logging.ERROR]
assert len(critical_msgs) >= 1, (
"Expected at least one ERROR/CRITICAL log when BOT_TOKEN is invalid. "
"Operator must be alerted on startup if Telegram delivery is broken."
)
assert any("BOT_TOKEN" in m for m in critical_msgs), (
f"Expected log mentioning 'BOT_TOKEN', got: {critical_msgs}"
)
@pytest.mark.asyncio
async def test_invalid_bot_token_lifespan_does_not_raise():
"""AC#3: Invalid BOT_TOKEN must not crash the service — lifespan completes normally."""
from backend.main import app
with temp_db():
with patch("backend.telegram.validate_bot_token", new_callable=AsyncMock) as mock_validate:
mock_validate.return_value = False
with patch("backend.telegram.set_webhook", new_callable=AsyncMock):
# Must not raise — service stays alive even with broken Telegram token
async with app.router.lifespan_context(app):
pass
# ---------------------------------------------------------------------------
# AC#4 — CHAT_ID is negative (decision #1212 regression guard)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_send_message_chat_id_in_request_is_negative():
"""AC#4: The chat_id sent to Telegram API must be negative (group ID).
Root cause of BATON-007: CHAT_ID=5190015988 (positive) was set in .env
instead of -5190015988 (negative). Negative ID = Telegram group/supergroup.
Decision #1212: CHAT_ID=-5190015988 отрицательный.
"""
from backend import config
from backend.telegram import send_message
send_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage"
with respx.mock(assert_all_called=False) as mock:
route = mock.post(send_url).mock(
return_value=httpx.Response(200, json={"ok": True})
)
await send_message("AC#4 regression guard")
assert route.called
body = json.loads(route.calls[0].request.content)
chat_id = body["chat_id"]
assert str(chat_id).startswith("-"), (
f"Regression #1212: chat_id must be negative (group ID), got {chat_id!r}. "
"Positive chat_id is a user ID — messages go to private DM, not the group."
)
# ---------------------------------------------------------------------------
# AC#1 — POST /api/signal returns 200 (decision #1211)
# ---------------------------------------------------------------------------
_UUID_FIX009 = "f0090001-0000-4000-8000-000000000001"
@pytest.mark.asyncio
async def test_signal_endpoint_returns_200_with_valid_auth():
"""AC#1: POST /api/signal with valid Bearer token must return HTTP 200.
Maps to production check: 'SSH на сервер, отправить POST /api/signal,
зафиксировать raw ответ API' (decision #1211).
"""
async with make_app_client() as client:
reg = await client.post(
"/api/register",
json={"uuid": _UUID_FIX009, "name": "Fix009User"},
)
assert reg.status_code == 200, f"Registration failed: {reg.text}"
api_key = reg.json()["api_key"]
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_FIX009, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {api_key}"},
)
assert resp.status_code == 200, (
f"Expected /api/signal to return 200, got {resp.status_code}: {resp.text}"
)
body = resp.json()
assert body.get("status") == "ok", f"Expected status='ok', got: {body}"
assert "signal_id" in body, f"Expected signal_id in response, got: {body}"
@pytest.mark.asyncio
async def test_signal_endpoint_returns_200_even_when_telegram_returns_400(caplog):
"""AC#1 + decision #1230: POST /api/signal must return 200 even if Telegram returns 400.
Decision #1230: 'Если Telegram возвращает 400 — зафиксировать и сообщить'.
The HTTP 400 from Telegram must be logged as ERROR (captured/reported),
but /api/signal must still return 200 signal was saved to DB.
"""
from backend import config
from backend.main import app
send_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage"
set_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/setWebhook"
get_me_url = f"https://api.telegram.org/bot{config.BOT_TOKEN}/getMe"
_UUID_400 = "f0090002-0000-4000-8000-000000000002"
with temp_db():
with respx.mock(assert_all_called=False) as mock_tg:
mock_tg.get(get_me_url).mock(
return_value=httpx.Response(200, json={"ok": True, "result": {"username": "testbot"}})
)
mock_tg.post(set_url).mock(
return_value=httpx.Response(200, json={"ok": True, "result": True})
)
mock_tg.post(send_url).mock(
return_value=httpx.Response(
400,
json={"ok": False, "error_code": 400, "description": "Bad Request: chat not found"},
)
)
async with app.router.lifespan_context(app):
import asyncio
from httpx import AsyncClient, ASGITransport
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://testserver") as client:
reg = await client.post("/api/register", json={"uuid": _UUID_400, "name": "TgErrUser"})
assert reg.status_code == 200
api_key = reg.json()["api_key"]
with caplog.at_level(logging.ERROR, logger="backend.telegram"):
resp = await client.post(
"/api/signal",
json={"user_id": _UUID_400, "timestamp": 1742478000000},
headers={"Authorization": f"Bearer {api_key}"},
)
await asyncio.sleep(0)
assert resp.status_code == 200, (
f"Decision #1230: /api/signal must return 200 even on Telegram 400, "
f"got {resp.status_code}"
)
assert any("400" in r.message for r in caplog.records), (
"Decision #1230: Telegram 400 error must be logged (captured and reported). "
"Got logs: " + str([r.message for r in caplog.records])
)

116
tests/test_fix_011.py Normal file
View file

@ -0,0 +1,116 @@
"""
BATON-FIX-011: Проверяет, что BOT_TOKEN не попадает в httpx-логи.
1. logging.getLogger('httpx').level >= logging.WARNING после импорта приложения.
2. Дочерние логгеры httpx._client и httpx._async_client также не пишут INFO.
3. При вызове send_message ни одна запись httpx-логгера с уровнем INFO
не содержит 'bot' или токен-подобный паттерн /bot[0-9]+:/.
"""
from __future__ import annotations
import logging
import re
import httpx
import pytest
import respx
from unittest.mock import patch, AsyncMock
# conftest.py уже устанавливает BOT_TOKEN=test-bot-token до этого импорта
from backend import config
from backend.telegram import send_message
SEND_URL = f"https://api.telegram.org/bot{config.BOT_TOKEN}/sendMessage"
BOT_TOKEN_PATTERN = re.compile(r"bot[0-9]+:")
# ---------------------------------------------------------------------------
# Уровень логгера httpx
# ---------------------------------------------------------------------------
def test_httpx_logger_level_is_warning_or_higher():
"""logging.getLogger('httpx').level должен быть WARNING (30) или выше после импорта приложения."""
# Импортируем main, чтобы гарантировать, что setLevel уже вызван
import backend.main # noqa: F401
httpx_logger = logging.getLogger("httpx")
assert httpx_logger.level >= logging.WARNING, (
f"Ожидался уровень >= WARNING (30), получен {httpx_logger.level}"
)
def test_httpx_logger_info_not_enabled():
"""logging.getLogger('httpx').isEnabledFor(INFO) должен возвращать False."""
import backend.main # noqa: F401
httpx_logger = logging.getLogger("httpx")
assert not httpx_logger.isEnabledFor(logging.INFO), (
"httpx-логгер не должен обрабатывать INFO-сообщения"
)
def test_httpx_client_logger_info_not_enabled():
"""Дочерний логгер httpx._client не должен обрабатывать INFO."""
import backend.main # noqa: F401
child_logger = logging.getLogger("httpx._client")
assert not child_logger.isEnabledFor(logging.INFO), (
"httpx._client не должен обрабатывать INFO-сообщения"
)
def test_httpx_async_client_logger_info_not_enabled():
"""Дочерний логгер httpx._async_client не должен обрабатывать INFO."""
import backend.main # noqa: F401
child_logger = logging.getLogger("httpx._async_client")
assert not child_logger.isEnabledFor(logging.INFO), (
"httpx._async_client не должен обрабатывать INFO-сообщения"
)
# ---------------------------------------------------------------------------
# BOT_TOKEN не появляется в httpx INFO-логах при send_message
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_send_message_no_httpx_records_at_warning_level(caplog):
"""При вызове send_message httpx не выдаёт записей уровня WARNING и ниже с токеном.
Проверяет фактическое состояние логгера в продакшне (WARNING): INFO-сообщения
с URL (включая BOT_TOKEN) не должны проходить через httpx-логгер.
"""
import backend.main # noqa: F401 — убеждаемся, что setLevel вызван
with respx.mock(assert_all_called=False) as mock:
mock.post(SEND_URL).mock(return_value=httpx.Response(200, json={"ok": True}))
# Захватываем логи при реальном уровне WARNING — INFO-сообщения не должны проходить
with caplog.at_level(logging.WARNING, logger="httpx"):
await send_message("test message for token leak check")
bot_token = config.BOT_TOKEN
httpx_records = [r for r in caplog.records if r.name.startswith("httpx")]
for record in httpx_records:
assert bot_token not in record.message, (
f"BOT_TOKEN найден в httpx-логе (уровень {record.levelname}): {record.message!r}"
)
@pytest.mark.asyncio
async def test_send_message_no_token_pattern_in_httpx_info_logs(caplog):
"""При вызове send_message httpx INFO-логи не содержат паттерн /bot[0-9]+:/."""
with respx.mock(assert_all_called=False) as mock:
mock.post(SEND_URL).mock(return_value=httpx.Response(200, json={"ok": True}))
with caplog.at_level(logging.INFO, logger="httpx"):
await send_message("check token pattern")
info_records = [
r for r in caplog.records
if r.name.startswith("httpx") and r.levelno <= logging.INFO
]
for record in info_records:
assert not BOT_TOKEN_PATTERN.search(record.message), (
f"Паттерн bot[0-9]+: найден в httpx INFO-логе: {record.message!r}"
)

173
tests/test_fix_012.py Normal file
View file

@ -0,0 +1,173 @@
"""
Tests for BATON-FIX-012: UUID v4 validation regression guard.
BATON-SEC-005 added UUID v4 pattern validation to RegisterRequest.uuid and
SignalRequest.user_id. Tests in test_db.py / test_baton_005.py / test_telegram.py
previously used placeholder strings ('uuid-001', 'create-uuid-001', 'agg-uuid-001')
that are not valid UUID v4 causing 25 regressions.
This file locks down the behaviour so the same mistake cannot recur silently:
- Old-style placeholder strings are rejected by Pydantic
- All UUID constants used across the fixed test files are valid UUID v4
- RegisterRequest and SignalRequest accept exactly-valid v4 UUIDs
- They reject strings that violate version (bit 3 of field-3 must be 4) or
variant (top bits of field-4 must be 10xx) requirements
"""
from __future__ import annotations
import os
os.environ.setdefault("BOT_TOKEN", "test-bot-token")
os.environ.setdefault("CHAT_ID", "-1001234567890")
os.environ.setdefault("WEBHOOK_SECRET", "test-webhook-secret")
os.environ.setdefault("WEBHOOK_URL", "https://example.com/api/webhook/telegram")
os.environ.setdefault("FRONTEND_ORIGIN", "http://localhost:3000")
os.environ.setdefault("ADMIN_TOKEN", "test-admin-token")
import pytest
from pydantic import ValidationError
from backend.models import RegisterRequest, SignalRequest
# ---------------------------------------------------------------------------
# UUID constants from fixed test files — all must be valid UUID v4
# ---------------------------------------------------------------------------
# test_db.py constants (_UUID_DB_1 .. _UUID_DB_6)
_DB_UUIDS = [
"d0000001-0000-4000-8000-000000000001",
"d0000002-0000-4000-8000-000000000002",
"d0000003-0000-4000-8000-000000000003",
"d0000004-0000-4000-8000-000000000004",
"d0000005-0000-4000-8000-000000000005",
"d0000006-0000-4000-8000-000000000006",
]
# test_baton_005.py constants (_UUID_ADM_*)
_ADM_UUIDS = [
"e0000000-0000-4000-8000-000000000000",
"e0000001-0000-4000-8000-000000000001",
"e0000002-0000-4000-8000-000000000002",
"e0000003-0000-4000-8000-000000000003",
"e0000004-0000-4000-8000-000000000004",
"e0000005-0000-4000-8000-000000000005",
"e0000006-0000-4000-8000-000000000006",
"e0000007-0000-4000-8000-000000000007",
"e0000008-0000-4000-8000-000000000008",
"e0000009-0000-4000-8000-000000000009",
"e000000a-0000-4000-8000-000000000010",
]
# test_telegram.py constants (aggregator UUIDs)
_AGG_UUIDS = [
"a9900001-0000-4000-8000-000000000001",
"a9900099-0000-4000-8000-000000000099",
] + [f"a990000{i}-0000-4000-8000-00000000000{i}" for i in range(5)]
# ---------------------------------------------------------------------------
# Old-style placeholder UUIDs (pre-fix) must be rejected
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("bad_uuid", [
"uuid-001",
"uuid-002",
"uuid-003",
"uuid-004",
"uuid-005",
"uuid-006",
"create-uuid-001",
"create-uuid-002",
"create-uuid-003",
"pass-uuid-001",
"pass-uuid-002",
"block-uuid-001",
"unblock-uuid-001",
"delete-uuid-001",
"delete-uuid-002",
"regress-admin-uuid-001",
"unauth-uuid-001",
"agg-uuid-001",
"agg-uuid-clr",
])
def test_register_request_rejects_old_placeholder_uuid(bad_uuid: str) -> None:
"""RegisterRequest.uuid must reject all pre-BATON-SEC-005 placeholder strings."""
with pytest.raises(ValidationError):
RegisterRequest(uuid=bad_uuid, name="Test")
@pytest.mark.parametrize("bad_uuid", [
"uuid-001",
"agg-uuid-001",
"create-uuid-001",
])
def test_signal_request_rejects_old_placeholder_uuid(bad_uuid: str) -> None:
"""SignalRequest.user_id must reject old-style placeholder strings."""
with pytest.raises(ValidationError):
SignalRequest(user_id=bad_uuid, timestamp=1700000000000)
# ---------------------------------------------------------------------------
# All UUID constants from the fixed test files are valid UUID v4
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("valid_uuid", _DB_UUIDS)
def test_register_request_accepts_db_uuid_constants(valid_uuid: str) -> None:
"""RegisterRequest accepts all _UUID_DB_* constants from test_db.py."""
req = RegisterRequest(uuid=valid_uuid, name="Test")
assert req.uuid == valid_uuid
@pytest.mark.parametrize("valid_uuid", _ADM_UUIDS)
def test_register_request_accepts_adm_uuid_constants(valid_uuid: str) -> None:
"""RegisterRequest accepts all _UUID_ADM_* constants from test_baton_005.py."""
req = RegisterRequest(uuid=valid_uuid, name="Test")
assert req.uuid == valid_uuid
@pytest.mark.parametrize("valid_uuid", _AGG_UUIDS)
def test_signal_request_accepts_agg_uuid_constants(valid_uuid: str) -> None:
"""SignalRequest accepts all aggregator UUID constants from test_telegram.py."""
req = SignalRequest(user_id=valid_uuid, timestamp=1700000000000)
assert req.user_id == valid_uuid
# ---------------------------------------------------------------------------
# UUID v4 structural requirements — version digit and variant bits
# ---------------------------------------------------------------------------
def test_register_request_rejects_uuid_v1_version_digit() -> None:
"""UUID with version digit = 1 (not 4) must be rejected by RegisterRequest."""
with pytest.raises(ValidationError):
# third group starts with '1' — version 1, not v4
RegisterRequest(uuid="550e8400-e29b-11d4-a716-446655440000", name="Test")
def test_register_request_rejects_uuid_v3_version_digit() -> None:
"""UUID with version digit = 3 must be rejected."""
with pytest.raises(ValidationError):
RegisterRequest(uuid="550e8400-e29b-31d4-a716-446655440000", name="Test")
def test_signal_request_rejects_uuid_wrong_variant_bits() -> None:
"""UUID with invalid variant bits (0xxx in fourth group) must be rejected."""
with pytest.raises(ValidationError):
# fourth group starts with '0' — not 8/9/a/b variant
SignalRequest(user_id="550e8400-e29b-41d4-0716-446655440000", timestamp=1700000000000)
def test_signal_request_rejects_uuid_wrong_variant_c() -> None:
"""UUID with variant 'c' (1100 bits) must be rejected — only 8/9/a/b allowed."""
with pytest.raises(ValidationError):
SignalRequest(user_id="550e8400-e29b-41d4-c716-446655440000", timestamp=1700000000000)
def test_register_request_accepts_all_valid_v4_variants() -> None:
"""RegisterRequest accepts UUIDs with variant nibbles 8, 9, a, b."""
for variant in ("8", "9", "a", "b"):
uuid = f"550e8400-e29b-41d4-{variant}716-446655440000"
req = RegisterRequest(uuid=uuid, name="Test")
assert req.uuid == uuid

View file

@ -218,7 +218,7 @@ async def test_aggregator_single_signal_calls_send_message():
try: try:
agg = SignalAggregator(interval=9999) agg = SignalAggregator(interval=9999)
await agg.add_signal( await agg.add_signal(
user_uuid="agg-uuid-001", user_uuid="a9900001-0000-4000-8000-000000000001",
user_name="Alice", user_name="Alice",
timestamp=1742478000000, timestamp=1742478000000,
geo={"lat": 55.0, "lon": 37.0, "accuracy": 10.0}, geo={"lat": 55.0, "lon": 37.0, "accuracy": 10.0},
@ -246,7 +246,7 @@ async def test_aggregator_multiple_signals_one_message():
agg = SignalAggregator(interval=9999) agg = SignalAggregator(interval=9999)
for i in range(5): for i in range(5):
await agg.add_signal( await agg.add_signal(
user_uuid=f"agg-uuid-{i:03d}", user_uuid=f"a990000{i}-0000-4000-8000-00000000000{i}",
user_name=f"User{i}", user_name=f"User{i}",
timestamp=1742478000000 + i * 1000, timestamp=1742478000000 + i * 1000,
geo=None, geo=None,
@ -288,7 +288,7 @@ async def test_aggregator_buffer_cleared_after_flush():
try: try:
agg = SignalAggregator(interval=9999) agg = SignalAggregator(interval=9999)
await agg.add_signal( await agg.add_signal(
user_uuid="agg-uuid-clr", user_uuid="a9900099-0000-4000-8000-000000000099",
user_name="Test", user_name="Test",
timestamp=1742478000000, timestamp=1742478000000,
geo=None, geo=None,