from __future__ import annotations from contextlib import asynccontextmanager from typing import AsyncGenerator, Optional import aiosqlite from backend import config @asynccontextmanager async def _get_conn() -> AsyncGenerator[aiosqlite.Connection, None]: conn = await aiosqlite.connect(config.DB_PATH) await conn.execute("PRAGMA journal_mode=WAL") await conn.execute("PRAGMA busy_timeout=5000") await conn.execute("PRAGMA synchronous=NORMAL") conn.row_factory = aiosqlite.Row try: yield conn finally: await conn.close() async def init_db() -> None: async with _get_conn() as conn: await conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, uuid TEXT UNIQUE NOT NULL, name TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS signals ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_uuid TEXT NOT NULL REFERENCES users(uuid), timestamp INTEGER NOT NULL, lat REAL DEFAULT NULL, lon REAL DEFAULT NULL, accuracy REAL DEFAULT NULL, created_at TEXT DEFAULT (datetime('now')), telegram_batch_id INTEGER DEFAULT NULL ); CREATE TABLE IF NOT EXISTS telegram_batches ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_text TEXT DEFAULT NULL, sent_at TEXT DEFAULT NULL, signals_count INTEGER DEFAULT 0, status TEXT DEFAULT 'pending' ); CREATE UNIQUE INDEX IF NOT EXISTS idx_users_uuid ON users(uuid); CREATE INDEX IF NOT EXISTS idx_signals_user_uuid ON signals(user_uuid); CREATE INDEX IF NOT EXISTS idx_signals_created_at ON signals(created_at); CREATE INDEX IF NOT EXISTS idx_batches_status ON telegram_batches(status); """) await conn.commit() async def register_user(uuid: str, name: str) -> dict: async with _get_conn() as conn: await conn.execute( "INSERT OR IGNORE INTO users (uuid, name) VALUES (?, ?)", (uuid, name), ) await conn.commit() async with conn.execute( "SELECT id, uuid FROM users WHERE uuid = ?", (uuid,) ) as cur: row = await cur.fetchone() return {"user_id": row["id"], "uuid": row["uuid"]} async def save_signal( user_uuid: str, timestamp: int, lat: Optional[float], lon: Optional[float], accuracy: Optional[float], ) -> int: async with _get_conn() as conn: async with conn.execute( """ INSERT INTO signals (user_uuid, timestamp, lat, lon, accuracy) VALUES (?, ?, ?, ?, ?) """, (user_uuid, timestamp, lat, lon, accuracy), ) as cur: signal_id = cur.lastrowid await conn.commit() return signal_id async def get_user_name(uuid: str) -> Optional[str]: async with _get_conn() as conn: async with conn.execute( "SELECT name FROM users WHERE uuid = ?", (uuid,) ) as cur: row = await cur.fetchone() return row["name"] if row else None async def save_telegram_batch( message_text: str, signals_count: int, signal_ids: list[int], ) -> int: async with _get_conn() as conn: async with conn.execute( """ INSERT INTO telegram_batches (message_text, sent_at, signals_count, status) VALUES (?, datetime('now'), ?, 'sent') """, (message_text, signals_count), ) as cur: batch_id = cur.lastrowid if signal_ids: placeholders = ",".join("?" * len(signal_ids)) await conn.execute( f"UPDATE signals SET telegram_batch_id = ? WHERE id IN ({placeholders})", [batch_id, *signal_ids], ) await conn.commit() return batch_id