baton/backend/db.py
2026-03-20 20:44:00 +02:00

123 lines
4.1 KiB
Python

from __future__ import annotations
from typing import Optional
import aiosqlite
from backend import config
async def _get_conn() -> aiosqlite.Connection:
conn = await aiosqlite.connect(config.DB_PATH)
await conn.execute("PRAGMA journal_mode=WAL")
await conn.execute("PRAGMA busy_timeout=5000")
await conn.execute("PRAGMA synchronous=NORMAL")
conn.row_factory = aiosqlite.Row
return conn
async def init_db() -> None:
async with await _get_conn() as conn:
await conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_uuid TEXT NOT NULL REFERENCES users(uuid),
timestamp INTEGER NOT NULL,
lat REAL DEFAULT NULL,
lon REAL DEFAULT NULL,
accuracy REAL DEFAULT NULL,
created_at TEXT DEFAULT (datetime('now')),
telegram_batch_id INTEGER DEFAULT NULL
);
CREATE TABLE IF NOT EXISTS telegram_batches (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_text TEXT DEFAULT NULL,
sent_at TEXT DEFAULT NULL,
signals_count INTEGER DEFAULT 0,
status TEXT DEFAULT 'pending'
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_uuid
ON users(uuid);
CREATE INDEX IF NOT EXISTS idx_signals_user_uuid
ON signals(user_uuid);
CREATE INDEX IF NOT EXISTS idx_signals_created_at
ON signals(created_at);
CREATE INDEX IF NOT EXISTS idx_batches_status
ON telegram_batches(status);
""")
await conn.commit()
async def register_user(uuid: str, name: str) -> dict:
async with await _get_conn() as conn:
await conn.execute(
"INSERT OR IGNORE INTO users (uuid, name) VALUES (?, ?)",
(uuid, name),
)
await conn.commit()
async with conn.execute(
"SELECT id, uuid FROM users WHERE uuid = ?", (uuid,)
) as cur:
row = await cur.fetchone()
return {"user_id": row["id"], "uuid": row["uuid"]}
async def save_signal(
user_uuid: str,
timestamp: int,
lat: Optional[float],
lon: Optional[float],
accuracy: Optional[float],
) -> int:
async with await _get_conn() as conn:
async with conn.execute(
"""
INSERT INTO signals (user_uuid, timestamp, lat, lon, accuracy)
VALUES (?, ?, ?, ?, ?)
""",
(user_uuid, timestamp, lat, lon, accuracy),
) as cur:
signal_id = cur.lastrowid
await conn.commit()
return signal_id
async def get_user_name(uuid: str) -> Optional[str]:
async with await _get_conn() as conn:
async with conn.execute(
"SELECT name FROM users WHERE uuid = ?", (uuid,)
) as cur:
row = await cur.fetchone()
return row["name"] if row else None
async def save_telegram_batch(
message_text: str,
signals_count: int,
signal_ids: list[int],
) -> int:
async with await _get_conn() as conn:
async with conn.execute(
"""
INSERT INTO telegram_batches (message_text, sent_at, signals_count, status)
VALUES (?, datetime('now'), ?, 'sent')
""",
(message_text, signals_count),
) as cur:
batch_id = cur.lastrowid
if signal_ids:
placeholders = ",".join("?" * len(signal_ids))
await conn.execute(
f"UPDATE signals SET telegram_batch_id = ? WHERE id IN ({placeholders})",
[batch_id, *signal_ids],
)
await conn.commit()
return batch_id