""" Kin — data access functions for all tables. Pure functions: (conn, params) → dict | list[dict]. No ORM, no classes. """ import json import sqlite3 from datetime import datetime from typing import Any VALID_TASK_STATUSES = [ "pending", "in_progress", "review", "done", "blocked", "decomposed", "cancelled", ] VALID_COMPLETION_MODES = {"auto_complete", "review"} TASK_CATEGORIES = [ "SEC", "UI", "API", "INFRA", "BIZ", "DB", "ARCH", "TEST", "PERF", "DOCS", "FIX", "OBS", ] def validate_completion_mode(value: str) -> str: """Validate completion mode from LLM output. Falls back to 'review' if invalid.""" if value in VALID_COMPLETION_MODES: return value return "review" def _row_to_dict(row: sqlite3.Row | None) -> dict | None: """Convert sqlite3.Row to dict with JSON fields decoded.""" if row is None: return None d = dict(row) for key, val in d.items(): if isinstance(val, str) and val.startswith(("[", "{")): try: d[key] = json.loads(val) except (json.JSONDecodeError, ValueError): pass return d def _rows_to_list(rows: list[sqlite3.Row]) -> list[dict]: """Convert list of sqlite3.Row to list of dicts.""" return [_row_to_dict(r) for r in rows] def _json_encode(val: Any) -> Any: """Encode lists/dicts to JSON strings for storage.""" if isinstance(val, (list, dict)): return json.dumps(val, ensure_ascii=False) return val # --------------------------------------------------------------------------- # Projects # --------------------------------------------------------------------------- def create_project( conn: sqlite3.Connection, id: str, name: str, path: str | None = None, tech_stack: list | None = None, status: str = "active", priority: int = 5, pm_prompt: str | None = None, claude_md_path: str | None = None, forgejo_repo: str | None = None, language: str = "ru", execution_mode: str = "review", project_type: str = "development", ssh_host: str | None = None, ssh_user: str | None = None, ssh_key_path: str | None = None, ssh_proxy_jump: str | None = None, description: str | None = None, ) -> dict: """Create a new project and return it as dict.""" conn.execute( """INSERT INTO projects (id, name, path, tech_stack, status, priority, pm_prompt, claude_md_path, forgejo_repo, language, execution_mode, project_type, ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump, description) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (id, name, path, _json_encode(tech_stack), status, priority, pm_prompt, claude_md_path, forgejo_repo, language, execution_mode, project_type, ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump, description), ) conn.commit() return get_project(conn, id) def get_project(conn: sqlite3.Connection, id: str) -> dict | None: """Get project by id.""" row = conn.execute("SELECT * FROM projects WHERE id = ?", (id,)).fetchone() return _row_to_dict(row) def delete_project(conn: sqlite3.Connection, id: str) -> None: """Delete a project and all its related data (modules, decisions, tasks).""" # Delete tables that have FK references to tasks BEFORE deleting tasks for table in ("modules", "agent_logs", "decisions", "pipelines", "tasks"): conn.execute(f"DELETE FROM {table} WHERE project_id = ?", (id,)) conn.execute("DELETE FROM projects WHERE id = ?", (id,)) conn.commit() def get_effective_mode(conn: sqlite3.Connection, project_id: str, task_id: str) -> str: """Return effective execution mode: 'auto' or 'review'. Priority: task.execution_mode > project.execution_mode > 'review' """ task = get_task(conn, task_id) if task and task.get("execution_mode"): return task["execution_mode"] project = get_project(conn, project_id) if project: return project.get("execution_mode") or "review" return "review" def list_projects(conn: sqlite3.Connection, status: str | None = None) -> list[dict]: """List projects, optionally filtered by status.""" if status: rows = conn.execute( "SELECT * FROM projects WHERE status = ? ORDER BY priority, name", (status,), ).fetchall() else: rows = conn.execute( "SELECT * FROM projects ORDER BY priority, name" ).fetchall() return _rows_to_list(rows) def update_project(conn: sqlite3.Connection, id: str, **fields) -> dict: """Update project fields. Returns updated project.""" if not fields: return get_project(conn, id) for key in ("tech_stack",): if key in fields: fields[key] = _json_encode(fields[key]) sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] conn.execute(f"UPDATE projects SET {sets} WHERE id = ?", vals) conn.commit() return get_project(conn, id) # --------------------------------------------------------------------------- # Tasks # --------------------------------------------------------------------------- def next_task_id( conn: sqlite3.Connection, project_id: str, category: str | None = None, ) -> str: """Generate next task ID. Without category: PROJ-001 (backward-compatible old format) With category: PROJ-CAT-001 (new format, per-category counter) """ prefix = project_id.upper() existing = list_tasks(conn, project_id=project_id) if category: cat_prefix = f"{prefix}-{category}-" max_num = 0 for t in existing: tid = t["id"] if tid.startswith(cat_prefix): try: max_num = max(max_num, int(tid[len(cat_prefix):])) except ValueError: pass return f"{prefix}-{category}-{max_num + 1:03d}" else: # Old format: global max across project (integers only, skip CAT-NNN) max_num = 0 for t in existing: tid = t["id"] if tid.startswith(prefix + "-"): suffix = tid[len(prefix) + 1:] try: max_num = max(max_num, int(suffix)) except ValueError: pass return f"{prefix}-{max_num + 1:03d}" def create_task( conn: sqlite3.Connection, id: str, project_id: str, title: str, status: str = "pending", priority: int = 5, assigned_role: str | None = None, parent_task_id: str | None = None, brief: dict | None = None, spec: dict | None = None, forgejo_issue_id: int | None = None, execution_mode: str | None = None, category: str | None = None, acceptance_criteria: str | None = None, ) -> dict: """Create a task linked to a project.""" conn.execute( """INSERT INTO tasks (id, project_id, title, status, priority, assigned_role, parent_task_id, brief, spec, forgejo_issue_id, execution_mode, category, acceptance_criteria) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (id, project_id, title, status, priority, assigned_role, parent_task_id, _json_encode(brief), _json_encode(spec), forgejo_issue_id, execution_mode, category, acceptance_criteria), ) conn.commit() return get_task(conn, id) def get_task(conn: sqlite3.Connection, id: str) -> dict | None: """Get task by id.""" row = conn.execute("SELECT * FROM tasks WHERE id = ?", (id,)).fetchone() return _row_to_dict(row) def list_tasks( conn: sqlite3.Connection, project_id: str | None = None, status: str | None = None, ) -> list[dict]: """List tasks with optional project/status filters.""" query = "SELECT * FROM tasks WHERE 1=1" params: list = [] if project_id: query += " AND project_id = ?" params.append(project_id) if status: query += " AND status = ?" params.append(status) query += " ORDER BY priority, created_at" return _rows_to_list(conn.execute(query, params).fetchall()) def update_task(conn: sqlite3.Connection, id: str, **fields) -> dict: """Update task fields. Auto-sets updated_at.""" if not fields: return get_task(conn, id) json_cols = ("brief", "spec", "review", "test_result", "security_result") for key in json_cols: if key in fields: fields[key] = _json_encode(fields[key]) fields["updated_at"] = datetime.now().isoformat() sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] conn.execute(f"UPDATE tasks SET {sets} WHERE id = ?", vals) conn.commit() return get_task(conn, id) def mark_telegram_sent(conn: sqlite3.Connection, task_id: str) -> None: """Mark that a Telegram escalation was sent for this task.""" conn.execute( "UPDATE tasks SET telegram_sent = 1 WHERE id = ?", (task_id,), ) conn.commit() # --------------------------------------------------------------------------- # Decisions # --------------------------------------------------------------------------- def add_decision( conn: sqlite3.Connection, project_id: str, type: str, title: str, description: str, category: str | None = None, tags: list | None = None, task_id: str | None = None, ) -> dict: """Record a decision, gotcha, or convention for a project.""" cur = conn.execute( """INSERT INTO decisions (project_id, task_id, type, category, title, description, tags) VALUES (?, ?, ?, ?, ?, ?, ?)""", (project_id, task_id, type, category, title, description, _json_encode(tags)), ) conn.commit() row = conn.execute( "SELECT * FROM decisions WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def add_decision_if_new( conn: sqlite3.Connection, project_id: str, type: str, title: str, description: str, category: str | None = None, tags: list | None = None, task_id: str | None = None, ) -> dict | None: """Add a decision only if no existing one matches (project_id, type, normalized title). Returns the new decision dict, or None if skipped as duplicate. """ existing = conn.execute( """SELECT id FROM decisions WHERE project_id = ? AND type = ? AND lower(trim(title)) = lower(trim(?))""", (project_id, type, title), ).fetchone() if existing: return None return add_decision(conn, project_id, type, title, description, category=category, tags=tags, task_id=task_id) def get_decisions( conn: sqlite3.Connection, project_id: str, category: str | None = None, tags: list | None = None, types: list | None = None, limit: int | None = None, ) -> list[dict]: """Query decisions for a project with optional filters. tags: matches if ANY tag is present (OR logic via json_each). types: filter by decision type (decision, gotcha, workaround, etc). """ query = "SELECT DISTINCT d.* FROM decisions d WHERE d.project_id = ?" params: list = [project_id] if category: query += " AND d.category = ?" params.append(category) if types: placeholders = ", ".join("?" for _ in types) query += f" AND d.type IN ({placeholders})" params.extend(types) if tags: query += """ AND d.id IN ( SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t WHERE t.value IN ({}) )""".format(", ".join("?" for _ in tags)) params.extend(tags) query += " ORDER BY d.created_at DESC" if limit: query += " LIMIT ?" params.append(limit) return _rows_to_list(conn.execute(query, params).fetchall()) def get_decision(conn: sqlite3.Connection, decision_id: int) -> dict | None: """Get a single decision by id.""" row = conn.execute("SELECT * FROM decisions WHERE id = ?", (decision_id,)).fetchone() return _row_to_dict(row) if row else None def delete_decision(conn: sqlite3.Connection, decision_id: int) -> bool: """Delete a decision by id. Returns True if deleted, False if not found.""" cur = conn.execute("DELETE FROM decisions WHERE id = ?", (decision_id,)) conn.commit() return cur.rowcount > 0 # --------------------------------------------------------------------------- # Modules # --------------------------------------------------------------------------- def add_module( conn: sqlite3.Connection, project_id: str, name: str, type: str, path: str, description: str | None = None, owner_role: str | None = None, dependencies: list | None = None, ) -> dict: """Register a project module.""" cur = conn.execute( """INSERT OR IGNORE INTO modules (project_id, name, type, path, description, owner_role, dependencies) VALUES (?, ?, ?, ?, ?, ?, ?)""", (project_id, name, type, path, description, owner_role, _json_encode(dependencies)), ) created = cur.rowcount > 0 conn.commit() if cur.lastrowid: row = conn.execute( "SELECT * FROM modules WHERE id = ?", (cur.lastrowid,) ).fetchone() else: row = conn.execute( "SELECT * FROM modules WHERE project_id = ? AND name = ?", (project_id, name), ).fetchone() result = _row_to_dict(row) result["_created"] = created return result def get_modules(conn: sqlite3.Connection, project_id: str) -> list[dict]: """Get all modules for a project.""" rows = conn.execute( "SELECT * FROM modules WHERE project_id = ? ORDER BY type, name", (project_id,), ).fetchall() return _rows_to_list(rows) # --------------------------------------------------------------------------- # Agent Logs # --------------------------------------------------------------------------- def log_agent_run( conn: sqlite3.Connection, project_id: str, agent_role: str, action: str, task_id: str | None = None, session_id: str | None = None, input_summary: str | None = None, output_summary: str | None = None, tokens_used: int | None = None, model: str | None = None, cost_usd: float | None = None, success: bool = True, error_message: str | None = None, duration_seconds: int | None = None, ) -> dict: """Log an agent execution run.""" cur = conn.execute( """INSERT INTO agent_logs (project_id, task_id, agent_role, session_id, action, input_summary, output_summary, tokens_used, model, cost_usd, success, error_message, duration_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (project_id, task_id, agent_role, session_id, action, input_summary, output_summary, tokens_used, model, cost_usd, success, error_message, duration_seconds), ) conn.commit() row = conn.execute( "SELECT * FROM agent_logs WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) # --------------------------------------------------------------------------- # Pipelines # --------------------------------------------------------------------------- def create_pipeline( conn: sqlite3.Connection, task_id: str, project_id: str, route_type: str, steps: list | dict, ) -> dict: """Create a new pipeline run.""" cur = conn.execute( """INSERT INTO pipelines (task_id, project_id, route_type, steps) VALUES (?, ?, ?, ?)""", (task_id, project_id, route_type, _json_encode(steps)), ) conn.commit() row = conn.execute( "SELECT * FROM pipelines WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def update_pipeline( conn: sqlite3.Connection, id: int, status: str | None = None, total_cost_usd: float | None = None, total_tokens: int | None = None, total_duration_seconds: int | None = None, ) -> dict: """Update pipeline status and stats.""" fields: dict[str, Any] = {} if status is not None: fields["status"] = status if status in ("completed", "failed", "cancelled"): fields["completed_at"] = datetime.now().isoformat() if total_cost_usd is not None: fields["total_cost_usd"] = total_cost_usd if total_tokens is not None: fields["total_tokens"] = total_tokens if total_duration_seconds is not None: fields["total_duration_seconds"] = total_duration_seconds if fields: sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] conn.execute(f"UPDATE pipelines SET {sets} WHERE id = ?", vals) conn.commit() row = conn.execute( "SELECT * FROM pipelines WHERE id = ?", (id,) ).fetchone() return _row_to_dict(row) # --------------------------------------------------------------------------- # Support # --------------------------------------------------------------------------- def create_ticket( conn: sqlite3.Connection, project_id: str, source: str, client_message: str, client_id: str | None = None, classification: str | None = None, ) -> dict: """Create a support ticket.""" cur = conn.execute( """INSERT INTO support_tickets (project_id, source, client_id, client_message, classification) VALUES (?, ?, ?, ?, ?)""", (project_id, source, client_id, client_message, classification), ) conn.commit() row = conn.execute( "SELECT * FROM support_tickets WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def list_tickets( conn: sqlite3.Connection, project_id: str | None = None, status: str | None = None, ) -> list[dict]: """List support tickets with optional filters.""" query = "SELECT * FROM support_tickets WHERE 1=1" params: list = [] if project_id: query += " AND project_id = ?" params.append(project_id) if status: query += " AND status = ?" params.append(status) query += " ORDER BY created_at DESC" return _rows_to_list(conn.execute(query, params).fetchall()) # --------------------------------------------------------------------------- # Audit Log # --------------------------------------------------------------------------- def log_audit_event( conn: sqlite3.Connection, event_type: str, task_id: str | None = None, step_id: str | None = None, reason: str | None = None, project_id: str | None = None, ) -> dict: """Log a security-sensitive event to audit_log. event_type='dangerous_skip' is used when --dangerously-skip-permissions is invoked. """ cur = conn.execute( """INSERT INTO audit_log (event_type, task_id, step_id, reason, project_id) VALUES (?, ?, ?, ?, ?)""", (event_type, task_id, step_id, reason, project_id), ) conn.commit() row = conn.execute( "SELECT * FROM audit_log WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def get_audit_log( conn: sqlite3.Connection, task_id: str | None = None, project_id: str | None = None, event_type: str | None = None, limit: int = 100, ) -> list[dict]: """Query audit log entries with optional filters.""" query = "SELECT * FROM audit_log WHERE 1=1" params: list = [] if task_id: query += " AND task_id = ?" params.append(task_id) if project_id: query += " AND project_id = ?" params.append(project_id) if event_type: query += " AND event_type = ?" params.append(event_type) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) return _rows_to_list(conn.execute(query, params).fetchall()) # --------------------------------------------------------------------------- # Statistics / Dashboard # --------------------------------------------------------------------------- def get_project_summary(conn: sqlite3.Connection) -> list[dict]: """Get all projects with task counts by status.""" rows = conn.execute(""" SELECT p.*, COUNT(t.id) AS total_tasks, SUM(CASE WHEN t.status = 'done' THEN 1 ELSE 0 END) AS done_tasks, SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) AS active_tasks, SUM(CASE WHEN t.status = 'blocked' THEN 1 ELSE 0 END) AS blocked_tasks, SUM(CASE WHEN t.status = 'review' THEN 1 ELSE 0 END) AS review_tasks FROM projects p LEFT JOIN tasks t ON t.project_id = p.id GROUP BY p.id ORDER BY p.priority, p.name """).fetchall() return _rows_to_list(rows) def get_cost_summary(conn: sqlite3.Connection, days: int = 7) -> list[dict]: """Get cost summary by project for the last N days.""" rows = conn.execute(""" SELECT p.id AS project_id, p.name AS project_name, COUNT(a.id) AS runs, COALESCE(SUM(a.tokens_used), 0) AS total_tokens, COALESCE(SUM(a.cost_usd), 0) AS total_cost_usd, COALESCE(SUM(a.duration_seconds), 0) AS total_duration_seconds FROM projects p LEFT JOIN agent_logs a ON a.project_id = p.id AND a.created_at >= datetime('now', ?) GROUP BY p.id HAVING runs > 0 ORDER BY total_cost_usd DESC """, (f"-{days} days",)).fetchall() return _rows_to_list(rows) # --------------------------------------------------------------------------- # Project Phases (KIN-059) # --------------------------------------------------------------------------- def create_phase( conn: sqlite3.Connection, project_id: str, role: str, phase_order: int, ) -> dict: """Create a research phase for a project.""" cur = conn.execute( """INSERT INTO project_phases (project_id, role, phase_order, status) VALUES (?, ?, ?, 'pending')""", (project_id, role, phase_order), ) conn.commit() row = conn.execute( "SELECT * FROM project_phases WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def get_phase(conn: sqlite3.Connection, phase_id: int) -> dict | None: """Get a project phase by id.""" row = conn.execute( "SELECT * FROM project_phases WHERE id = ?", (phase_id,) ).fetchone() return _row_to_dict(row) def list_phases(conn: sqlite3.Connection, project_id: str) -> list[dict]: """List all phases for a project ordered by phase_order.""" rows = conn.execute( "SELECT * FROM project_phases WHERE project_id = ? ORDER BY phase_order", (project_id,), ).fetchall() return _rows_to_list(rows) def update_phase(conn: sqlite3.Connection, phase_id: int, **fields) -> dict: """Update phase fields. Auto-sets updated_at.""" if not fields: return get_phase(conn, phase_id) fields["updated_at"] = datetime.now().isoformat() sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [phase_id] conn.execute(f"UPDATE project_phases SET {sets} WHERE id = ?", vals) conn.commit() return get_phase(conn, phase_id)