""" Kin — data access functions for all tables. Pure functions: (conn, params) → dict | list[dict]. No ORM, no classes. """ import json import sqlite3 from datetime import datetime from typing import Any def _row_to_dict(row: sqlite3.Row | None) -> dict | None: """Convert sqlite3.Row to dict with JSON fields decoded.""" if row is None: return None d = dict(row) for key, val in d.items(): if isinstance(val, str) and val.startswith(("[", "{")): try: d[key] = json.loads(val) except (json.JSONDecodeError, ValueError): pass return d def _rows_to_list(rows: list[sqlite3.Row]) -> list[dict]: """Convert list of sqlite3.Row to list of dicts.""" return [_row_to_dict(r) for r in rows] def _json_encode(val: Any) -> Any: """Encode lists/dicts to JSON strings for storage.""" if isinstance(val, (list, dict)): return json.dumps(val, ensure_ascii=False) return val # --------------------------------------------------------------------------- # Projects # --------------------------------------------------------------------------- def create_project( conn: sqlite3.Connection, id: str, name: str, path: str, tech_stack: list | None = None, status: str = "active", priority: int = 5, pm_prompt: str | None = None, claude_md_path: str | None = None, forgejo_repo: str | None = None, ) -> dict: """Create a new project and return it as dict.""" conn.execute( """INSERT INTO projects (id, name, path, tech_stack, status, priority, pm_prompt, claude_md_path, forgejo_repo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", (id, name, path, _json_encode(tech_stack), status, priority, pm_prompt, claude_md_path, forgejo_repo), ) conn.commit() return get_project(conn, id) def get_project(conn: sqlite3.Connection, id: str) -> dict | None: """Get project by id.""" row = conn.execute("SELECT * FROM projects WHERE id = ?", (id,)).fetchone() return _row_to_dict(row) def list_projects(conn: sqlite3.Connection, status: str | None = None) -> list[dict]: """List projects, optionally filtered by status.""" if status: rows = conn.execute( "SELECT * FROM projects WHERE status = ? ORDER BY priority, name", (status,), ).fetchall() else: rows = conn.execute( "SELECT * FROM projects ORDER BY priority, name" ).fetchall() return _rows_to_list(rows) def update_project(conn: sqlite3.Connection, id: str, **fields) -> dict: """Update project fields. Returns updated project.""" if not fields: return get_project(conn, id) for key in ("tech_stack",): if key in fields: fields[key] = _json_encode(fields[key]) sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] conn.execute(f"UPDATE projects SET {sets} WHERE id = ?", vals) conn.commit() return get_project(conn, id) # --------------------------------------------------------------------------- # Tasks # --------------------------------------------------------------------------- def create_task( conn: sqlite3.Connection, id: str, project_id: str, title: str, status: str = "pending", priority: int = 5, assigned_role: str | None = None, parent_task_id: str | None = None, brief: dict | None = None, spec: dict | None = None, forgejo_issue_id: int | None = None, ) -> dict: """Create a task linked to a project.""" conn.execute( """INSERT INTO tasks (id, project_id, title, status, priority, assigned_role, parent_task_id, brief, spec, forgejo_issue_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (id, project_id, title, status, priority, assigned_role, parent_task_id, _json_encode(brief), _json_encode(spec), forgejo_issue_id), ) conn.commit() return get_task(conn, id) def get_task(conn: sqlite3.Connection, id: str) -> dict | None: """Get task by id.""" row = conn.execute("SELECT * FROM tasks WHERE id = ?", (id,)).fetchone() return _row_to_dict(row) def list_tasks( conn: sqlite3.Connection, project_id: str | None = None, status: str | None = None, ) -> list[dict]: """List tasks with optional project/status filters.""" query = "SELECT * FROM tasks WHERE 1=1" params: list = [] if project_id: query += " AND project_id = ?" params.append(project_id) if status: query += " AND status = ?" params.append(status) query += " ORDER BY priority, created_at" return _rows_to_list(conn.execute(query, params).fetchall()) def update_task(conn: sqlite3.Connection, id: str, **fields) -> dict: """Update task fields. Auto-sets updated_at.""" if not fields: return get_task(conn, id) json_cols = ("brief", "spec", "review", "test_result", "security_result") for key in json_cols: if key in fields: fields[key] = _json_encode(fields[key]) fields["updated_at"] = datetime.now().isoformat() sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] conn.execute(f"UPDATE tasks SET {sets} WHERE id = ?", vals) conn.commit() return get_task(conn, id) # --------------------------------------------------------------------------- # Decisions # --------------------------------------------------------------------------- def add_decision( conn: sqlite3.Connection, project_id: str, type: str, title: str, description: str, category: str | None = None, tags: list | None = None, task_id: str | None = None, ) -> dict: """Record a decision, gotcha, or convention for a project.""" cur = conn.execute( """INSERT INTO decisions (project_id, task_id, type, category, title, description, tags) VALUES (?, ?, ?, ?, ?, ?, ?)""", (project_id, task_id, type, category, title, description, _json_encode(tags)), ) conn.commit() row = conn.execute( "SELECT * FROM decisions WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def get_decisions( conn: sqlite3.Connection, project_id: str, category: str | None = None, tags: list | None = None, types: list | None = None, limit: int | None = None, ) -> list[dict]: """Query decisions for a project with optional filters. tags: matches if ANY tag is present (OR logic via json_each). types: filter by decision type (decision, gotcha, workaround, etc). """ query = "SELECT DISTINCT d.* FROM decisions d WHERE d.project_id = ?" params: list = [project_id] if category: query += " AND d.category = ?" params.append(category) if types: placeholders = ", ".join("?" for _ in types) query += f" AND d.type IN ({placeholders})" params.extend(types) if tags: query += """ AND d.id IN ( SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t WHERE t.value IN ({}) )""".format(", ".join("?" for _ in tags)) params.extend(tags) query += " ORDER BY d.created_at DESC" if limit: query += " LIMIT ?" params.append(limit) return _rows_to_list(conn.execute(query, params).fetchall()) # --------------------------------------------------------------------------- # Modules # --------------------------------------------------------------------------- def add_module( conn: sqlite3.Connection, project_id: str, name: str, type: str, path: str, description: str | None = None, owner_role: str | None = None, dependencies: list | None = None, ) -> dict: """Register a project module.""" cur = conn.execute( """INSERT INTO modules (project_id, name, type, path, description, owner_role, dependencies) VALUES (?, ?, ?, ?, ?, ?, ?)""", (project_id, name, type, path, description, owner_role, _json_encode(dependencies)), ) conn.commit() row = conn.execute( "SELECT * FROM modules WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def get_modules(conn: sqlite3.Connection, project_id: str) -> list[dict]: """Get all modules for a project.""" rows = conn.execute( "SELECT * FROM modules WHERE project_id = ? ORDER BY type, name", (project_id,), ).fetchall() return _rows_to_list(rows) # --------------------------------------------------------------------------- # Agent Logs # --------------------------------------------------------------------------- def log_agent_run( conn: sqlite3.Connection, project_id: str, agent_role: str, action: str, task_id: str | None = None, session_id: str | None = None, input_summary: str | None = None, output_summary: str | None = None, tokens_used: int | None = None, model: str | None = None, cost_usd: float | None = None, success: bool = True, error_message: str | None = None, duration_seconds: int | None = None, ) -> dict: """Log an agent execution run.""" cur = conn.execute( """INSERT INTO agent_logs (project_id, task_id, agent_role, session_id, action, input_summary, output_summary, tokens_used, model, cost_usd, success, error_message, duration_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (project_id, task_id, agent_role, session_id, action, input_summary, output_summary, tokens_used, model, cost_usd, success, error_message, duration_seconds), ) conn.commit() row = conn.execute( "SELECT * FROM agent_logs WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) # --------------------------------------------------------------------------- # Pipelines # --------------------------------------------------------------------------- def create_pipeline( conn: sqlite3.Connection, task_id: str, project_id: str, route_type: str, steps: list | dict, ) -> dict: """Create a new pipeline run.""" cur = conn.execute( """INSERT INTO pipelines (task_id, project_id, route_type, steps) VALUES (?, ?, ?, ?)""", (task_id, project_id, route_type, _json_encode(steps)), ) conn.commit() row = conn.execute( "SELECT * FROM pipelines WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def update_pipeline( conn: sqlite3.Connection, id: int, status: str | None = None, total_cost_usd: float | None = None, total_tokens: int | None = None, total_duration_seconds: int | None = None, ) -> dict: """Update pipeline status and stats.""" fields: dict[str, Any] = {} if status is not None: fields["status"] = status if status in ("completed", "failed", "cancelled"): fields["completed_at"] = datetime.now().isoformat() if total_cost_usd is not None: fields["total_cost_usd"] = total_cost_usd if total_tokens is not None: fields["total_tokens"] = total_tokens if total_duration_seconds is not None: fields["total_duration_seconds"] = total_duration_seconds if fields: sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] conn.execute(f"UPDATE pipelines SET {sets} WHERE id = ?", vals) conn.commit() row = conn.execute( "SELECT * FROM pipelines WHERE id = ?", (id,) ).fetchone() return _row_to_dict(row) # --------------------------------------------------------------------------- # Support # --------------------------------------------------------------------------- def create_ticket( conn: sqlite3.Connection, project_id: str, source: str, client_message: str, client_id: str | None = None, classification: str | None = None, ) -> dict: """Create a support ticket.""" cur = conn.execute( """INSERT INTO support_tickets (project_id, source, client_id, client_message, classification) VALUES (?, ?, ?, ?, ?)""", (project_id, source, client_id, client_message, classification), ) conn.commit() row = conn.execute( "SELECT * FROM support_tickets WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def list_tickets( conn: sqlite3.Connection, project_id: str | None = None, status: str | None = None, ) -> list[dict]: """List support tickets with optional filters.""" query = "SELECT * FROM support_tickets WHERE 1=1" params: list = [] if project_id: query += " AND project_id = ?" params.append(project_id) if status: query += " AND status = ?" params.append(status) query += " ORDER BY created_at DESC" return _rows_to_list(conn.execute(query, params).fetchall()) # --------------------------------------------------------------------------- # Statistics / Dashboard # --------------------------------------------------------------------------- def get_project_summary(conn: sqlite3.Connection) -> list[dict]: """Get all projects with task counts by status.""" rows = conn.execute(""" SELECT p.*, COUNT(t.id) AS total_tasks, SUM(CASE WHEN t.status = 'done' THEN 1 ELSE 0 END) AS done_tasks, SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) AS active_tasks, SUM(CASE WHEN t.status = 'blocked' THEN 1 ELSE 0 END) AS blocked_tasks, SUM(CASE WHEN t.status = 'review' THEN 1 ELSE 0 END) AS review_tasks FROM projects p LEFT JOIN tasks t ON t.project_id = p.id GROUP BY p.id ORDER BY p.priority, p.name """).fetchall() return _rows_to_list(rows) def get_cost_summary(conn: sqlite3.Connection, days: int = 7) -> list[dict]: """Get cost summary by project for the last N days.""" rows = conn.execute(""" SELECT p.id AS project_id, p.name AS project_name, COUNT(a.id) AS runs, COALESCE(SUM(a.tokens_used), 0) AS total_tokens, COALESCE(SUM(a.cost_usd), 0) AS total_cost_usd, COALESCE(SUM(a.duration_seconds), 0) AS total_duration_seconds FROM projects p LEFT JOIN agent_logs a ON a.project_id = p.id AND a.created_at >= datetime('now', ?) GROUP BY p.id HAVING runs > 0 ORDER BY total_cost_usd DESC """, (f"-{days} days",)).fetchall() return _rows_to_list(rows)