""" Kin — data access functions for all tables. Pure functions: (conn, params) → dict | list[dict]. No ORM, no classes. """ import base64 import json import os import sqlite3 from datetime import datetime from typing import Any VALID_TASK_STATUSES = frozenset({ "pending", "in_progress", "review", "done", "blocked", "decomposed", "cancelled", "revising", }) VALID_COMPLETION_MODES = {"auto_complete", "review"} TASK_CATEGORIES = [ "SEC", "UI", "API", "INFRA", "BIZ", "DB", "ARCH", "TEST", "PERF", "DOCS", "FIX", "OBS", ] # Valid categories for task returns (KIN-135) RETURN_CATEGORIES = [ "requirements_unclear", "scope_too_large", "technical_blocker", "missing_context", "recurring_quality_fail", "conflicting_requirements", ] def validate_completion_mode(value: str) -> str: """Validate completion mode from LLM output. Falls back to 'review' if invalid.""" if value in VALID_COMPLETION_MODES: return value return "review" # Columns that are stored as JSON strings and must be decoded on read. # Text fields (title, description, name, etc.) are NOT in this set. _JSON_COLUMNS: frozenset[str] = frozenset({ "tech_stack", "brief", "spec", "review", "test_result", "security_result", "labels", "smoke_test_result", "pending_steps", "tags", "dependencies", "steps", "artifacts", "decisions_made", "blockers", "context_packet", "extra_json", "pending_actions", }) def _row_to_dict(row: sqlite3.Row | None) -> dict | None: """Convert sqlite3.Row to dict with JSON fields decoded.""" if row is None: return None d = dict(row) for key, val in d.items(): if key in _JSON_COLUMNS and isinstance(val, str) and val.startswith(("[", "{")): try: d[key] = json.loads(val) except (json.JSONDecodeError, ValueError): pass return d def _rows_to_list(rows: list[sqlite3.Row]) -> list[dict]: """Convert list of sqlite3.Row to list of dicts.""" return [_row_to_dict(r) for r in rows] def _json_encode(val: Any) -> Any: """Encode lists/dicts to JSON strings for storage.""" if isinstance(val, (list, dict)): return json.dumps(val, ensure_ascii=False) return val # --------------------------------------------------------------------------- # Projects # --------------------------------------------------------------------------- def create_project( conn: sqlite3.Connection, id: str, name: str, path: str | None = None, tech_stack: list | None = None, status: str = "active", priority: int = 5, pm_prompt: str | None = None, claude_md_path: str | None = None, forgejo_repo: str | None = None, language: str = "ru", execution_mode: str = "review", project_type: str = "development", ssh_host: str | None = None, ssh_user: str | None = None, ssh_key_path: str | None = None, ssh_proxy_jump: str | None = None, description: str | None = None, ) -> dict: """Create a new project and return it as dict.""" conn.execute( """INSERT INTO projects (id, name, path, tech_stack, status, priority, pm_prompt, claude_md_path, forgejo_repo, language, execution_mode, project_type, ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump, description) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (id, name, path, _json_encode(tech_stack), status, priority, pm_prompt, claude_md_path, forgejo_repo, language, execution_mode, project_type, ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump, description), ) conn.commit() return get_project(conn, id) def get_project(conn: sqlite3.Connection, id: str) -> dict | None: """Get project by id.""" row = conn.execute("SELECT * FROM projects WHERE id = ?", (id,)).fetchone() return _row_to_dict(row) def delete_project(conn: sqlite3.Connection, id: str) -> None: """Delete a project and all its related data (modules, decisions, tasks, phases).""" # Delete child tables that reference pipelines/tasks without ON DELETE CASCADE, # before their parents are deleted (FK constraint enforcement, foreign_keys=ON). conn.execute( "DELETE FROM department_handoffs WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?)", (id,), ) conn.execute( "DELETE FROM department_handoffs WHERE pipeline_id IN (SELECT id FROM pipelines WHERE project_id = ?)", (id,), ) conn.execute( "DELETE FROM pipeline_log WHERE pipeline_id IN (SELECT id FROM pipelines WHERE project_id = ?)", (id,), ) conn.execute("DELETE FROM audit_log WHERE project_id = ?", (id,)) conn.execute("DELETE FROM support_tickets WHERE project_id = ?", (id,)) conn.execute("DELETE FROM hook_logs WHERE project_id = ?", (id,)) conn.execute("DELETE FROM hooks WHERE project_id = ?", (id,)) conn.execute("DELETE FROM project_links WHERE from_project = ? OR to_project = ?", (id, id)) # Delete tables with direct project_id FK (order: FK children before parents) # project_environments must come before tasks (FK on project_id) for table in ("modules", "agent_logs", "decisions", "pipelines", "project_phases", "project_environments", "chat_messages", "tasks"): conn.execute(f"DELETE FROM {table} WHERE project_id = ?", (id,)) conn.execute("DELETE FROM projects WHERE id = ?", (id,)) conn.commit() def get_effective_mode(conn: sqlite3.Connection, project_id: str, task_id: str) -> str: """Return effective execution mode: 'auto' or 'review'. Priority: task.execution_mode > project.execution_mode > 'review' """ task = get_task(conn, task_id) if task and task.get("execution_mode"): return task["execution_mode"] project = get_project(conn, project_id) if project: return project.get("execution_mode") or "review" return "review" def list_projects(conn: sqlite3.Connection, status: str | None = None) -> list[dict]: """List projects, optionally filtered by status.""" if status: rows = conn.execute( "SELECT * FROM projects WHERE status = ? ORDER BY priority, name", (status,), ).fetchall() else: rows = conn.execute( "SELECT * FROM projects ORDER BY priority, name" ).fetchall() return _rows_to_list(rows) def update_project(conn: sqlite3.Connection, id: str, **fields) -> dict: """Update project fields. Returns updated project.""" if not fields: return get_project(conn, id) for key in ("tech_stack",): if key in fields: fields[key] = _json_encode(fields[key]) sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] conn.execute(f"UPDATE projects SET {sets} WHERE id = ?", vals) conn.commit() return get_project(conn, id) # --------------------------------------------------------------------------- # Tasks # --------------------------------------------------------------------------- def next_task_id( conn: sqlite3.Connection, project_id: str, category: str | None = None, ) -> str: """Generate next task ID. Without category: PROJ-001 (backward-compatible old format) With category: PROJ-CAT-001 (new format, per-category counter) """ prefix = project_id.upper() existing = list_tasks(conn, project_id=project_id) if category: cat_prefix = f"{prefix}-{category}-" max_num = 0 for t in existing: tid = t["id"] if tid.startswith(cat_prefix): try: max_num = max(max_num, int(tid[len(cat_prefix):])) except ValueError: pass return f"{prefix}-{category}-{max_num + 1:03d}" else: # Old format: global max across project (integers only, skip CAT-NNN) max_num = 0 for t in existing: tid = t["id"] if tid.startswith(prefix + "-"): suffix = tid[len(prefix) + 1:] try: max_num = max(max_num, int(suffix)) except ValueError: pass return f"{prefix}-{max_num + 1:03d}" def create_task( conn: sqlite3.Connection, id: str, project_id: str, title: str, status: str = "pending", priority: int = 5, assigned_role: str | None = None, parent_task_id: str | None = None, brief: dict | None = None, spec: dict | None = None, forgejo_issue_id: int | None = None, execution_mode: str | None = None, category: str | None = None, acceptance_criteria: str | None = None, labels: list | None = None, ) -> dict: """Create a task linked to a project.""" conn.execute( """INSERT INTO tasks (id, project_id, title, status, priority, assigned_role, parent_task_id, brief, spec, forgejo_issue_id, execution_mode, category, acceptance_criteria, labels) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (id, project_id, title, status, priority, assigned_role, parent_task_id, _json_encode(brief), _json_encode(spec), forgejo_issue_id, execution_mode, category, acceptance_criteria, _json_encode(labels)), ) conn.commit() return get_task(conn, id) def get_task(conn: sqlite3.Connection, id: str) -> dict | None: """Get task by id.""" row = conn.execute("SELECT * FROM tasks WHERE id = ?", (id,)).fetchone() return _row_to_dict(row) def get_children(conn: sqlite3.Connection, task_id: str) -> list[dict]: """Return direct child tasks of given task_id.""" rows = conn.execute( "SELECT * FROM tasks WHERE parent_task_id = ?", (task_id,) ).fetchall() return _rows_to_list(rows) def has_open_children(conn: sqlite3.Connection, task_id: str, visited: set[str] | None = None) -> bool: """Recursively check if task has any open (not done/cancelled) descendants.""" if visited is None: visited = set() if task_id in visited: return False visited = visited | {task_id} children = get_children(conn, task_id) for child in children: if child["status"] not in ("done", "cancelled"): return True if has_open_children(conn, child["id"], visited): return True return False def _do_cascade(conn: sqlite3.Connection, task_id: str, visited: set[str]) -> None: """Recursive upward cascade without transaction management — no commits.""" if task_id in visited: return visited.add(task_id) task = get_task(conn, task_id) if not task: return parent_id = task.get("parent_task_id") if not parent_id: return parent = get_task(conn, parent_id) if not parent: return if parent["status"] != "revising": return if not has_open_children(conn, parent_id): now = datetime.now().isoformat() conn.execute( "UPDATE tasks SET status = 'done', completed_at = ?, updated_at = ? WHERE id = ?", (now, now, parent_id), ) _do_cascade(conn, parent_id, visited) def _check_parent_completion(conn: sqlite3.Connection, task_id: str, visited: set[str] | None = None) -> None: """Cascade-check upward: promote all ready parents to 'done' in one atomic transaction.""" if visited is None: visited = set() try: _do_cascade(conn, task_id, visited) conn.commit() except Exception: conn.rollback() raise VALID_TASK_SORT_FIELDS = frozenset({ "updated_at", "created_at", "priority", "status", "title", "id", }) def list_tasks( conn: sqlite3.Connection, project_id: str | None = None, status: str | None = None, parent_task_id: str | None = None, limit: int | None = None, sort: str = "updated_at", sort_dir: str = "desc", ) -> list[dict]: """List tasks with optional project/status/parent filters, limit, and sort. sort: column name (validated against VALID_TASK_SORT_FIELDS, default 'updated_at') sort_dir: 'asc' or 'desc' (default 'desc') parent_task_id: filter by parent task id (pass '__none__' to get root tasks) """ # Validate sort field to prevent SQL injection sort_col = sort if sort in VALID_TASK_SORT_FIELDS else "updated_at" sort_direction = "DESC" if sort_dir.lower() != "asc" else "ASC" query = "SELECT * FROM tasks WHERE 1=1" params: list = [] if project_id: query += " AND project_id = ?" params.append(project_id) if status: query += " AND status = ?" params.append(status) if parent_task_id == "__none__": query += " AND parent_task_id IS NULL" elif parent_task_id: query += " AND parent_task_id = ?" params.append(parent_task_id) query += f" ORDER BY {sort_col} {sort_direction}" if limit is not None: query += " LIMIT ?" params.append(limit) return _rows_to_list(conn.execute(query, params).fetchall()) def update_task(conn: sqlite3.Connection, id: str, **fields) -> dict: """Update task fields. Auto-sets updated_at. Sets completed_at when status transitions to 'done'. If transitioning to 'done' but task has open children, sets 'revising' instead. After a child closes (done/cancelled), cascades upward to promote revising parents. """ if not fields: return get_task(conn, id) json_cols = ("brief", "spec", "review", "test_result", "security_result", "labels", "smoke_test_result", "pending_steps") for key in json_cols: if key in fields: fields[key] = _json_encode(fields[key]) if "status" in fields and fields["status"] == "done": if has_open_children(conn, id): fields["status"] = "revising" # Do NOT set completed_at for revising (Decision #737) else: fields["completed_at"] = datetime.now().isoformat() fields["updated_at"] = datetime.now().isoformat() sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] conn.execute(f"UPDATE tasks SET {sets} WHERE id = ?", vals) conn.commit() # Cascade upward: if this child just closed, maybe its parent can finish new_status = fields.get("status") if new_status in ("done", "cancelled"): _check_parent_completion(conn, id) return get_task(conn, id) def mark_telegram_sent(conn: sqlite3.Connection, task_id: str) -> None: """Mark that a Telegram escalation was sent for this task.""" conn.execute( "UPDATE tasks SET telegram_sent = 1 WHERE id = ?", (task_id,), ) conn.commit() # --------------------------------------------------------------------------- # Decisions # --------------------------------------------------------------------------- def add_decision( conn: sqlite3.Connection, project_id: str, type: str, title: str, description: str, category: str | None = None, tags: list | None = None, task_id: str | None = None, ) -> dict: """Record a decision, gotcha, or convention for a project.""" cur = conn.execute( """INSERT INTO decisions (project_id, task_id, type, category, title, description, tags) VALUES (?, ?, ?, ?, ?, ?, ?)""", (project_id, task_id, type, category, title, description, _json_encode(tags)), ) conn.commit() row = conn.execute( "SELECT * FROM decisions WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def add_decision_if_new( conn: sqlite3.Connection, project_id: str, type: str, title: str, description: str, category: str | None = None, tags: list | None = None, task_id: str | None = None, ) -> dict | None: """Add a decision only if no existing one matches (project_id, type, normalized title). Returns the new decision dict, or None if skipped as duplicate. """ existing = conn.execute( """SELECT id FROM decisions WHERE project_id = ? AND type = ? AND lower(trim(title)) = lower(trim(?))""", (project_id, type, title), ).fetchone() if existing: return None return add_decision(conn, project_id, type, title, description, category=category, tags=tags, task_id=task_id) def get_decisions( conn: sqlite3.Connection, project_id: str, category: str | None = None, tags: list | None = None, types: list | None = None, limit: int | None = None, ) -> list[dict]: """Query decisions for a project with optional filters. tags: matches if ANY tag is present (OR logic via json_each). types: filter by decision type (decision, gotcha, workaround, etc). """ query = "SELECT DISTINCT d.* FROM decisions d WHERE d.project_id = ?" params: list = [project_id] if category: query += " AND d.category = ?" params.append(category) if types is not None: if not types: return [] placeholders = ", ".join("?" for _ in types) query += f" AND d.type IN ({placeholders})" params.extend(types) if tags is not None: if not tags: return [] query += """ AND d.id IN ( SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t WHERE t.value IN ({}) )""".format(", ".join("?" for _ in tags)) params.extend(tags) query += " ORDER BY d.created_at DESC" if limit: query += " LIMIT ?" params.append(limit) return _rows_to_list(conn.execute(query, params).fetchall()) def get_decision(conn: sqlite3.Connection, decision_id: int) -> dict | None: """Get a single decision by id.""" row = conn.execute("SELECT * FROM decisions WHERE id = ?", (decision_id,)).fetchone() return _row_to_dict(row) if row else None def delete_decision(conn: sqlite3.Connection, decision_id: int) -> bool: """Delete a decision by id. Returns True if deleted, False if not found.""" cur = conn.execute("DELETE FROM decisions WHERE id = ?", (decision_id,)) conn.commit() return cur.rowcount > 0 # --------------------------------------------------------------------------- # Modules # --------------------------------------------------------------------------- def add_module( conn: sqlite3.Connection, project_id: str, name: str, type: str, path: str, description: str | None = None, owner_role: str | None = None, dependencies: list | None = None, ) -> dict: """Register a project module.""" cur = conn.execute( """INSERT OR IGNORE INTO modules (project_id, name, type, path, description, owner_role, dependencies) VALUES (?, ?, ?, ?, ?, ?, ?)""", (project_id, name, type, path, description, owner_role, _json_encode(dependencies)), ) created = cur.rowcount > 0 conn.commit() if cur.lastrowid: row = conn.execute( "SELECT * FROM modules WHERE id = ?", (cur.lastrowid,) ).fetchone() else: row = conn.execute( "SELECT * FROM modules WHERE project_id = ? AND name = ?", (project_id, name), ).fetchone() result = _row_to_dict(row) result["_created"] = created return result def get_modules(conn: sqlite3.Connection, project_id: str) -> list[dict]: """Get all modules for a project.""" rows = conn.execute( "SELECT * FROM modules WHERE project_id = ? ORDER BY type, name", (project_id,), ).fetchall() return _rows_to_list(rows) # --------------------------------------------------------------------------- # Agent Logs # --------------------------------------------------------------------------- def log_agent_run( conn: sqlite3.Connection, project_id: str, agent_role: str, action: str, task_id: str | None = None, session_id: str | None = None, input_summary: str | None = None, output_summary: str | None = None, tokens_used: int | None = None, model: str | None = None, cost_usd: float | None = None, success: bool = True, error_message: str | None = None, duration_seconds: int | None = None, ) -> dict: """Log an agent execution run.""" cur = conn.execute( """INSERT INTO agent_logs (project_id, task_id, agent_role, session_id, action, input_summary, output_summary, tokens_used, model, cost_usd, success, error_message, duration_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (project_id, task_id, agent_role, session_id, action, input_summary, output_summary, tokens_used, model, cost_usd, success, error_message, duration_seconds), ) conn.commit() row = conn.execute( "SELECT * FROM agent_logs WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) # --------------------------------------------------------------------------- # Pipelines # --------------------------------------------------------------------------- def create_pipeline( conn: sqlite3.Connection, task_id: str, project_id: str, route_type: str, steps: list | dict, parent_pipeline_id: int | None = None, department: str | None = None, ) -> dict: """Create a new pipeline run.""" cur = conn.execute( """INSERT INTO pipelines (task_id, project_id, route_type, steps, parent_pipeline_id, department) VALUES (?, ?, ?, ?, ?, ?)""", (task_id, project_id, route_type, _json_encode(steps), parent_pipeline_id, department), ) conn.commit() row = conn.execute( "SELECT * FROM pipelines WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def update_pipeline( conn: sqlite3.Connection, id: int, status: str | None = None, total_cost_usd: float | None = None, total_tokens: int | None = None, total_duration_seconds: int | None = None, pid: int | None = None, ) -> dict: """Update pipeline status and stats.""" fields: dict[str, Any] = {} if status is not None: fields["status"] = status if status in ("completed", "failed", "cancelled"): fields["completed_at"] = datetime.now().isoformat() if total_cost_usd is not None: fields["total_cost_usd"] = total_cost_usd if total_tokens is not None: fields["total_tokens"] = total_tokens if total_duration_seconds is not None: fields["total_duration_seconds"] = total_duration_seconds if pid is not None: fields["pid"] = pid if fields: sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [id] conn.execute(f"UPDATE pipelines SET {sets} WHERE id = ?", vals) conn.commit() row = conn.execute( "SELECT * FROM pipelines WHERE id = ?", (id,) ).fetchone() return _row_to_dict(row) def get_running_pipelines_with_pid(conn: sqlite3.Connection) -> list[dict]: """Return all running pipelines that have a known PID (used by watchdog).""" rows = conn.execute( "SELECT id, task_id, pid FROM pipelines WHERE status = 'running' AND pid IS NOT NULL" ).fetchall() return _rows_to_list(rows) def get_pipeline_for_watch(conn: sqlite3.Connection, task_id: str) -> dict | None: """Return the most recent top-level pipeline for a task (for kin watch).""" row = conn.execute( """SELECT id, task_id, project_id, status, pid, steps, created_at, completed_at FROM pipelines WHERE task_id = ? AND parent_pipeline_id IS NULL ORDER BY created_at DESC, id DESC LIMIT 1""", (task_id,), ).fetchone() return _row_to_dict(row) def count_agent_logs_since(conn: sqlite3.Connection, task_id: str, since_iso: str) -> int: """Return number of agent_logs for a task created at or after since_iso.""" row = conn.execute( "SELECT COUNT(*) FROM agent_logs WHERE task_id = ? AND created_at >= ?", (task_id, since_iso), ).fetchone() return row[0] def get_current_agent_log( conn: sqlite3.Connection, task_id: str, since_iso: str ) -> dict | None: """Return the most recent agent log for a task since a given datetime (for kin watch).""" row = conn.execute( """SELECT agent_role, output_summary, duration_seconds, success, created_at FROM agent_logs WHERE task_id = ? AND created_at >= ? ORDER BY id DESC LIMIT 1""", (task_id, since_iso), ).fetchone() return _row_to_dict(row) def write_log( conn: sqlite3.Connection, pipeline_id: int, message: str, level: str = "INFO", extra: dict | list | None = None, ts: str | None = None, ) -> dict: """Insert a pipeline log entry. Returns inserted row as dict. ts: optional ISO-8601 UTC timestamp override (e.g. for retrospective PM entries). If None, the DB DEFAULT (datetime('now')) is used. """ extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None if ts is not None: cur = conn.execute( """INSERT INTO pipeline_log (pipeline_id, ts, message, level, extra_json) VALUES (?, ?, ?, ?, ?)""", (pipeline_id, ts, message, level, extra_json), ) else: cur = conn.execute( """INSERT INTO pipeline_log (pipeline_id, message, level, extra_json) VALUES (?, ?, ?, ?)""", (pipeline_id, message, level, extra_json), ) conn.commit() row = conn.execute( "SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def get_pipeline_logs( conn: sqlite3.Connection, pipeline_id: int, since_id: int = 0, ) -> list[dict]: """Get pipeline log entries after since_id in chronological order.""" rows = conn.execute( """SELECT * FROM pipeline_log WHERE pipeline_id = ? AND id > ? ORDER BY id ASC""", (pipeline_id, since_id), ).fetchall() return _rows_to_list(rows) def get_all_running_pipelines(conn: sqlite3.Connection) -> list[dict]: """Return all running pipelines with task/project info and current agent (for kin ps).""" rows = conn.execute( """SELECT p.id, p.task_id, p.status, p.pid, p.created_at, p.parent_pipeline_id, t.title, proj.name AS project_name, (SELECT agent_role FROM agent_logs WHERE task_id = p.task_id AND created_at >= p.created_at ORDER BY id DESC LIMIT 1) AS current_agent FROM pipelines p JOIN tasks t ON p.task_id = t.id JOIN projects proj ON p.project_id = proj.id WHERE p.status = 'running' ORDER BY p.created_at DESC""" ).fetchall() return _rows_to_list(rows) # --------------------------------------------------------------------------- # Support # --------------------------------------------------------------------------- def create_ticket( conn: sqlite3.Connection, project_id: str, source: str, client_message: str, client_id: str | None = None, classification: str | None = None, ) -> dict: """Create a support ticket.""" cur = conn.execute( """INSERT INTO support_tickets (project_id, source, client_id, client_message, classification) VALUES (?, ?, ?, ?, ?)""", (project_id, source, client_id, client_message, classification), ) conn.commit() row = conn.execute( "SELECT * FROM support_tickets WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def list_tickets( conn: sqlite3.Connection, project_id: str | None = None, status: str | None = None, ) -> list[dict]: """List support tickets with optional filters.""" query = "SELECT * FROM support_tickets WHERE 1=1" params: list = [] if project_id: query += " AND project_id = ?" params.append(project_id) if status: query += " AND status = ?" params.append(status) query += " ORDER BY created_at DESC" return _rows_to_list(conn.execute(query, params).fetchall()) # --------------------------------------------------------------------------- # Audit Log # --------------------------------------------------------------------------- def log_audit_event( conn: sqlite3.Connection, event_type: str, task_id: str | None = None, step_id: str | None = None, reason: str | None = None, project_id: str | None = None, ) -> dict: """Log a security-sensitive event to audit_log. event_type='dangerous_skip' is used when --dangerously-skip-permissions is invoked. """ cur = conn.execute( """INSERT INTO audit_log (event_type, task_id, step_id, reason, project_id) VALUES (?, ?, ?, ?, ?)""", (event_type, task_id, step_id, reason, project_id), ) conn.commit() row = conn.execute( "SELECT * FROM audit_log WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def get_audit_log( conn: sqlite3.Connection, task_id: str | None = None, project_id: str | None = None, event_type: str | None = None, limit: int = 100, ) -> list[dict]: """Query audit log entries with optional filters.""" query = "SELECT * FROM audit_log WHERE 1=1" params: list = [] if task_id: query += " AND task_id = ?" params.append(task_id) if project_id: query += " AND project_id = ?" params.append(project_id) if event_type: query += " AND event_type = ?" params.append(event_type) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) return _rows_to_list(conn.execute(query, params).fetchall()) # --------------------------------------------------------------------------- # Statistics / Dashboard # --------------------------------------------------------------------------- def get_project_summary(conn: sqlite3.Connection) -> list[dict]: """Get all projects with task counts by status.""" rows = conn.execute(""" SELECT p.*, COUNT(t.id) AS total_tasks, SUM(CASE WHEN t.status = 'done' THEN 1 ELSE 0 END) AS done_tasks, SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) AS active_tasks, SUM(CASE WHEN t.status = 'blocked' THEN 1 ELSE 0 END) AS blocked_tasks, SUM(CASE WHEN t.status = 'review' THEN 1 ELSE 0 END) AS review_tasks FROM projects p LEFT JOIN tasks t ON t.project_id = p.id GROUP BY p.id ORDER BY p.priority, p.name """).fetchall() return _rows_to_list(rows) def get_cost_summary(conn: sqlite3.Connection, days: int = 7) -> list[dict]: """Get cost summary by project for the last N days.""" rows = conn.execute(""" SELECT p.id AS project_id, p.name AS project_name, COUNT(a.id) AS runs, COALESCE(SUM(a.tokens_used), 0) AS total_tokens, COALESCE(SUM(a.cost_usd), 0) AS total_cost_usd, COALESCE(SUM(a.duration_seconds), 0) AS total_duration_seconds FROM projects p LEFT JOIN agent_logs a ON a.project_id = p.id AND a.created_at >= datetime('now', ?) GROUP BY p.id HAVING runs > 0 ORDER BY total_cost_usd DESC """, (f"-{days} days",)).fetchall() return _rows_to_list(rows) # --------------------------------------------------------------------------- # Project Phases (KIN-059) # --------------------------------------------------------------------------- def create_phase( conn: sqlite3.Connection, project_id: str, role: str, phase_order: int, ) -> dict: """Create a research phase for a project.""" cur = conn.execute( """INSERT INTO project_phases (project_id, role, phase_order, status) VALUES (?, ?, ?, 'pending')""", (project_id, role, phase_order), ) conn.commit() row = conn.execute( "SELECT * FROM project_phases WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def get_phase(conn: sqlite3.Connection, phase_id: int) -> dict | None: """Get a project phase by id.""" row = conn.execute( "SELECT * FROM project_phases WHERE id = ?", (phase_id,) ).fetchone() return _row_to_dict(row) def list_phases(conn: sqlite3.Connection, project_id: str) -> list[dict]: """List all phases for a project ordered by phase_order.""" rows = conn.execute( "SELECT * FROM project_phases WHERE project_id = ? ORDER BY phase_order", (project_id,), ).fetchall() return _rows_to_list(rows) def update_phase(conn: sqlite3.Connection, phase_id: int, **fields) -> dict: """Update phase fields. Auto-sets updated_at.""" if not fields: return get_phase(conn, phase_id) fields["updated_at"] = datetime.now().isoformat() sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [phase_id] conn.execute(f"UPDATE project_phases SET {sets} WHERE id = ?", vals) conn.commit() return get_phase(conn, phase_id) # --------------------------------------------------------------------------- # Project Environments (KIN-087) # --------------------------------------------------------------------------- def _get_fernet(): """Get Fernet instance using KIN_SECRET_KEY env var. Raises RuntimeError if KIN_SECRET_KEY is not set. """ key = os.environ.get("KIN_SECRET_KEY") if not key: raise RuntimeError( "KIN_SECRET_KEY environment variable is not set. " "Generate with: python -c \"from cryptography.fernet import Fernet; " "print(Fernet.generate_key().decode())\"" ) from cryptography.fernet import Fernet return Fernet(key.encode()) def _encrypt_auth(value: str) -> str: """Encrypt auth_value using Fernet (AES-128-CBC + HMAC-SHA256).""" return _get_fernet().encrypt(value.encode()).decode() def _decrypt_auth( stored: str, conn: sqlite3.Connection | None = None, env_id: int | None = None, ) -> str: """Decrypt auth_value. Handles migration from legacy base64 obfuscation. If stored value uses the old b64: prefix, decodes it and re-encrypts in the DB (re-encrypt on read) if conn and env_id are provided. """ if not stored: return stored from cryptography.fernet import InvalidToken try: return _get_fernet().decrypt(stored.encode()).decode() except (InvalidToken, Exception): # Legacy b64: format — migrate on read if stored.startswith("b64:"): plaintext = base64.b64decode(stored[4:]).decode() if conn is not None and env_id is not None: new_encrypted = _encrypt_auth(plaintext) conn.execute( "UPDATE project_environments SET auth_value = ? WHERE id = ?", (new_encrypted, env_id), ) conn.commit() return plaintext return stored def create_environment( conn: sqlite3.Connection, project_id: str, name: str, host: str, username: str, port: int = 22, auth_type: str = "password", auth_value: str | None = None, is_installed: bool = False, ) -> dict: """Create a project environment. auth_value stored Fernet-encrypted; returned as None.""" obfuscated = _encrypt_auth(auth_value) if auth_value else None cur = conn.execute( """INSERT INTO project_environments (project_id, name, host, port, username, auth_type, auth_value, is_installed) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (project_id, name, host, port, username, auth_type, obfuscated, int(is_installed)), ) conn.commit() row = conn.execute( "SELECT * FROM project_environments WHERE id = ?", (cur.lastrowid,) ).fetchone() result = _row_to_dict(row) result["auth_value"] = None # never expose in API responses return result def get_environment(conn: sqlite3.Connection, env_id: int) -> dict | None: """Get environment by id. auth_value is returned decrypted (for internal use).""" row = conn.execute( "SELECT * FROM project_environments WHERE id = ?", (env_id,) ).fetchone() result = _row_to_dict(row) if result and result.get("auth_value"): result["auth_value"] = _decrypt_auth(result["auth_value"], conn=conn, env_id=env_id) return result def list_environments(conn: sqlite3.Connection, project_id: str) -> list[dict]: """List all environments for a project. auth_value is always None in response.""" rows = conn.execute( "SELECT * FROM project_environments WHERE project_id = ? ORDER BY created_at", (project_id,), ).fetchall() result = _rows_to_list(rows) for env in result: env["auth_value"] = None return result def update_environment(conn: sqlite3.Connection, env_id: int, **fields) -> dict: """Update environment fields. Auto-sets updated_at. Returns record with auth_value=None.""" if not fields: result = get_environment(conn, env_id) if result: result["auth_value"] = None return result if "auth_value" in fields and fields["auth_value"]: fields["auth_value"] = _encrypt_auth(fields["auth_value"]) elif "auth_value" in fields: del fields["auth_value"] # empty/None = don't update auth_value fields["updated_at"] = datetime.now().isoformat() sets = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [env_id] conn.execute(f"UPDATE project_environments SET {sets} WHERE id = ?", vals) conn.commit() result = get_environment(conn, env_id) if result: result["auth_value"] = None return result def delete_environment(conn: sqlite3.Connection, env_id: int) -> bool: """Delete environment by id. Returns True if deleted, False if not found.""" cur = conn.execute( "DELETE FROM project_environments WHERE id = ?", (env_id,) ) conn.commit() return cur.rowcount > 0 # --------------------------------------------------------------------------- # Chat Messages (KIN-OBS-012) # --------------------------------------------------------------------------- def add_chat_message( conn: sqlite3.Connection, project_id: str, role: str, content: str, message_type: str = "text", task_id: str | None = None, ) -> dict: """Add a chat message and return it as dict. role: 'user' | 'assistant' | 'system' message_type: 'text' | 'task_created' | 'error' task_id: set for message_type='task_created' to link to the created task. """ cur = conn.execute( """INSERT INTO chat_messages (project_id, role, content, message_type, task_id) VALUES (?, ?, ?, ?, ?)""", (project_id, role, content, message_type, task_id), ) conn.commit() row = conn.execute( "SELECT * FROM chat_messages WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) # --------------------------------------------------------------------------- # Task Attachments (KIN-090) # --------------------------------------------------------------------------- def create_attachment( conn: sqlite3.Connection, task_id: str, filename: str, path: str, mime_type: str, size: int, ) -> dict: """Create a task attachment record. path must be absolute.""" cur = conn.execute( """INSERT INTO task_attachments (task_id, filename, path, mime_type, size) VALUES (?, ?, ?, ?, ?)""", (task_id, filename, path, mime_type, size), ) conn.commit() row = conn.execute( "SELECT * FROM task_attachments WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def list_attachments(conn: sqlite3.Connection, task_id: str) -> list[dict]: """List all attachments for a task ordered by creation time.""" rows = conn.execute( "SELECT * FROM task_attachments WHERE task_id = ? ORDER BY created_at", (task_id,), ).fetchall() return _rows_to_list(rows) def get_attachment(conn: sqlite3.Connection, attachment_id: int) -> dict | None: """Get a single attachment by id.""" row = conn.execute( "SELECT * FROM task_attachments WHERE id = ?", (attachment_id,) ).fetchone() return _row_to_dict(row) def delete_attachment(conn: sqlite3.Connection, attachment_id: int) -> bool: """Delete attachment record. Returns True if deleted, False if not found.""" cur = conn.execute("DELETE FROM task_attachments WHERE id = ?", (attachment_id,)) conn.commit() return cur.rowcount > 0 # --------------------------------------------------------------------------- # Department Handoffs (KIN-098) # --------------------------------------------------------------------------- def create_handoff( conn: sqlite3.Connection, pipeline_id: int, task_id: str, from_department: str, to_department: str | None = None, artifacts: dict | None = None, decisions_made: list | None = None, blockers: list | None = None, status: str = "pending", context_packet: dict | list | None = None, ) -> dict: """Record a department handoff with artifacts for inter-department context.""" cur = conn.execute( """INSERT INTO department_handoffs (pipeline_id, task_id, from_department, to_department, artifacts, decisions_made, blockers, context_packet, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", (pipeline_id, task_id, from_department, to_department, _json_encode(artifacts), _json_encode(decisions_made), _json_encode(blockers), _json_encode(context_packet), status), ) conn.commit() row = conn.execute( "SELECT * FROM department_handoffs WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def get_handoffs_for_task(conn: sqlite3.Connection, task_id: str) -> list[dict]: """Get all handoffs for a task ordered by creation time.""" rows = conn.execute( "SELECT * FROM department_handoffs WHERE task_id = ? ORDER BY created_at, id ASC", (task_id,), ).fetchall() return _rows_to_list(rows) def get_last_handoff( conn: sqlite3.Connection, task_id: str, to_department: str | None = None, ) -> dict | None: """Get the most recent handoff for a task, optionally filtered by destination department.""" if to_department: row = conn.execute( """SELECT * FROM department_handoffs WHERE task_id = ? AND to_department = ? ORDER BY created_at DESC, id DESC LIMIT 1""", (task_id, to_department), ).fetchone() else: row = conn.execute( """SELECT * FROM department_handoffs WHERE task_id = ? ORDER BY created_at DESC, id DESC LIMIT 1""", (task_id,), ).fetchone() return _row_to_dict(row) # --------------------------------------------------------------------------- # Project Links (KIN-079) # --------------------------------------------------------------------------- def create_project_link( conn: sqlite3.Connection, from_project: str, to_project: str, type: str, description: str | None = None, ) -> dict: """Create a project dependency link. Returns the created link as dict.""" cur = conn.execute( """INSERT INTO project_links (from_project, to_project, type, description) VALUES (?, ?, ?, ?)""", (from_project, to_project, type, description), ) conn.commit() row = conn.execute( "SELECT * FROM project_links WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) def get_project_links(conn: sqlite3.Connection, project_id: str) -> list[dict]: """Get all project links where project_id is from_project or to_project.""" rows = conn.execute( "SELECT * FROM project_links WHERE from_project = ? OR to_project = ?" " ORDER BY created_at", (project_id, project_id), ).fetchall() return _rows_to_list(rows) def delete_project_link(conn: sqlite3.Connection, link_id: int) -> bool: """Delete a project link by id. Returns True if deleted, False if not found.""" cur = conn.execute("DELETE FROM project_links WHERE id = ?", (link_id,)) conn.commit() return cur.rowcount > 0 def get_chat_messages( conn: sqlite3.Connection, project_id: str, limit: int = 50, before_id: int | None = None, ) -> list[dict]: """Get chat messages for a project in chronological order (oldest first). before_id: pagination cursor — return messages with id < before_id. """ query = "SELECT * FROM chat_messages WHERE project_id = ?" params: list = [project_id] if before_id is not None: query += " AND id < ?" params.append(before_id) query += " ORDER BY created_at ASC, id ASC LIMIT ?" params.append(limit) return _rows_to_list(conn.execute(query, params).fetchall()) # --------------------------------------------------------------------------- # Task returns — escalation tracking (KIN-135) # --------------------------------------------------------------------------- def record_task_return( conn: sqlite3.Connection, task_id: str, reason_category: str, reason_text: str | None = None, returned_by: str = "system", pipeline_id: int | None = None, ) -> dict: """Record a task return to PM and increment return_count. reason_category must be one of RETURN_CATEGORIES; defaults to 'missing_context' if an invalid value is supplied (fail-open). Returns the inserted task_returns row as dict. """ if reason_category not in RETURN_CATEGORIES: reason_category = "missing_context" # Determine the next return_number for this task row = conn.execute( "SELECT COALESCE(MAX(return_number), 0) FROM task_returns WHERE task_id = ?", (task_id,), ).fetchone() next_number = (row[0] if row else 0) + 1 conn.execute( """INSERT INTO task_returns (task_id, return_number, reason_category, reason_text, returned_by, pipeline_id) VALUES (?, ?, ?, ?, ?, ?)""", (task_id, next_number, reason_category, reason_text, returned_by, pipeline_id), ) conn.execute( "UPDATE tasks SET return_count = return_count + 1 WHERE id = ?", (task_id,), ) conn.commit() inserted = conn.execute( "SELECT * FROM task_returns WHERE task_id = ? ORDER BY id DESC LIMIT 1", (task_id,), ).fetchone() return dict(inserted) if inserted else {} def get_task_returns( conn: sqlite3.Connection, task_id: str, limit: int = 20, ) -> list[dict]: """Return task return history ordered by return_number ASC.""" rows = conn.execute( "SELECT * FROM task_returns WHERE task_id = ? ORDER BY return_number ASC LIMIT ?", (task_id, limit), ).fetchall() return [dict(r) for r in rows] def check_return_pattern( conn: sqlite3.Connection, task_id: str, ) -> dict: """Analyse return history for a recurring pattern. Returns: { "pattern_detected": bool, # True if dominant_category appears >= 2 times "dominant_category": str | None, "occurrences": int, } """ from collections import Counter rows = conn.execute( "SELECT reason_category FROM task_returns WHERE task_id = ?", (task_id,), ).fetchall() if not rows: return {"pattern_detected": False, "dominant_category": None, "occurrences": 0} counts = Counter(r[0] for r in rows) dominant_category, occurrences = counts.most_common(1)[0] pattern_detected = occurrences >= 2 return { "pattern_detected": pattern_detected, "dominant_category": dominant_category, "occurrences": occurrences, }