1214 lines
41 KiB
Python
1214 lines
41 KiB
Python
"""
|
|
Kin — data access functions for all tables.
|
|
Pure functions: (conn, params) → dict | list[dict]. No ORM, no classes.
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
|
|
VALID_TASK_STATUSES = [
|
|
"pending", "in_progress", "review", "done",
|
|
"blocked", "decomposed", "cancelled",
|
|
]
|
|
|
|
VALID_COMPLETION_MODES = {"auto_complete", "review"}
|
|
|
|
TASK_CATEGORIES = [
|
|
"SEC", "UI", "API", "INFRA", "BIZ", "DB",
|
|
"ARCH", "TEST", "PERF", "DOCS", "FIX", "OBS",
|
|
]
|
|
|
|
|
|
def validate_completion_mode(value: str) -> str:
|
|
"""Validate completion mode from LLM output. Falls back to 'review' if invalid."""
|
|
if value in VALID_COMPLETION_MODES:
|
|
return value
|
|
return "review"
|
|
|
|
|
|
# Columns that are stored as JSON strings and must be decoded on read.
|
|
# Text fields (title, description, name, etc.) are NOT in this set.
|
|
_JSON_COLUMNS: frozenset[str] = frozenset({
|
|
"tech_stack",
|
|
"brief", "spec", "review", "test_result", "security_result", "labels",
|
|
"tags",
|
|
"dependencies",
|
|
"steps",
|
|
"artifacts", "decisions_made", "blockers",
|
|
"extra_json",
|
|
"pending_actions",
|
|
})
|
|
|
|
|
|
def _row_to_dict(row: sqlite3.Row | None) -> dict | None:
|
|
"""Convert sqlite3.Row to dict with JSON fields decoded."""
|
|
if row is None:
|
|
return None
|
|
d = dict(row)
|
|
for key, val in d.items():
|
|
if key in _JSON_COLUMNS and isinstance(val, str) and val.startswith(("[", "{")):
|
|
try:
|
|
d[key] = json.loads(val)
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
return d
|
|
|
|
|
|
def _rows_to_list(rows: list[sqlite3.Row]) -> list[dict]:
|
|
"""Convert list of sqlite3.Row to list of dicts."""
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def _json_encode(val: Any) -> Any:
|
|
"""Encode lists/dicts to JSON strings for storage."""
|
|
if isinstance(val, (list, dict)):
|
|
return json.dumps(val, ensure_ascii=False)
|
|
return val
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Projects
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_project(
|
|
conn: sqlite3.Connection,
|
|
id: str,
|
|
name: str,
|
|
path: str | None = None,
|
|
tech_stack: list | None = None,
|
|
status: str = "active",
|
|
priority: int = 5,
|
|
pm_prompt: str | None = None,
|
|
claude_md_path: str | None = None,
|
|
forgejo_repo: str | None = None,
|
|
language: str = "ru",
|
|
execution_mode: str = "review",
|
|
project_type: str = "development",
|
|
ssh_host: str | None = None,
|
|
ssh_user: str | None = None,
|
|
ssh_key_path: str | None = None,
|
|
ssh_proxy_jump: str | None = None,
|
|
description: str | None = None,
|
|
) -> dict:
|
|
"""Create a new project and return it as dict."""
|
|
conn.execute(
|
|
"""INSERT INTO projects (id, name, path, tech_stack, status, priority,
|
|
pm_prompt, claude_md_path, forgejo_repo, language, execution_mode,
|
|
project_type, ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump, description)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(id, name, path, _json_encode(tech_stack), status, priority,
|
|
pm_prompt, claude_md_path, forgejo_repo, language, execution_mode,
|
|
project_type, ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump, description),
|
|
)
|
|
conn.commit()
|
|
return get_project(conn, id)
|
|
|
|
|
|
def get_project(conn: sqlite3.Connection, id: str) -> dict | None:
|
|
"""Get project by id."""
|
|
row = conn.execute("SELECT * FROM projects WHERE id = ?", (id,)).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def delete_project(conn: sqlite3.Connection, id: str) -> None:
|
|
"""Delete a project and all its related data (modules, decisions, tasks, phases)."""
|
|
# Delete child tables that reference pipelines/tasks without ON DELETE CASCADE,
|
|
# before their parents are deleted (FK constraint enforcement, foreign_keys=ON).
|
|
conn.execute(
|
|
"DELETE FROM department_handoffs WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?)",
|
|
(id,),
|
|
)
|
|
conn.execute(
|
|
"DELETE FROM department_handoffs WHERE pipeline_id IN (SELECT id FROM pipelines WHERE project_id = ?)",
|
|
(id,),
|
|
)
|
|
conn.execute(
|
|
"DELETE FROM pipeline_log WHERE pipeline_id IN (SELECT id FROM pipelines WHERE project_id = ?)",
|
|
(id,),
|
|
)
|
|
conn.execute("DELETE FROM audit_log WHERE project_id = ?", (id,))
|
|
conn.execute("DELETE FROM support_tickets WHERE project_id = ?", (id,))
|
|
conn.execute("DELETE FROM hook_logs WHERE project_id = ?", (id,))
|
|
conn.execute("DELETE FROM hooks WHERE project_id = ?", (id,))
|
|
conn.execute("DELETE FROM project_links WHERE from_project = ? OR to_project = ?", (id, id))
|
|
# Delete tables with direct project_id FK (order: FK children before parents)
|
|
# project_environments must come before tasks (FK on project_id)
|
|
for table in ("modules", "agent_logs", "decisions", "pipelines", "project_phases", "project_environments", "chat_messages", "tasks"):
|
|
conn.execute(f"DELETE FROM {table} WHERE project_id = ?", (id,))
|
|
conn.execute("DELETE FROM projects WHERE id = ?", (id,))
|
|
conn.commit()
|
|
|
|
|
|
def get_effective_mode(conn: sqlite3.Connection, project_id: str, task_id: str) -> str:
|
|
"""Return effective execution mode: 'auto' or 'review'.
|
|
|
|
Priority: task.execution_mode > project.execution_mode > 'review'
|
|
"""
|
|
task = get_task(conn, task_id)
|
|
if task and task.get("execution_mode"):
|
|
return task["execution_mode"]
|
|
project = get_project(conn, project_id)
|
|
if project:
|
|
return project.get("execution_mode") or "review"
|
|
return "review"
|
|
|
|
|
|
def list_projects(conn: sqlite3.Connection, status: str | None = None) -> list[dict]:
|
|
"""List projects, optionally filtered by status."""
|
|
if status:
|
|
rows = conn.execute(
|
|
"SELECT * FROM projects WHERE status = ? ORDER BY priority, name",
|
|
(status,),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT * FROM projects ORDER BY priority, name"
|
|
).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
def update_project(conn: sqlite3.Connection, id: str, **fields) -> dict:
|
|
"""Update project fields. Returns updated project."""
|
|
if not fields:
|
|
return get_project(conn, id)
|
|
for key in ("tech_stack",):
|
|
if key in fields:
|
|
fields[key] = _json_encode(fields[key])
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values()) + [id]
|
|
conn.execute(f"UPDATE projects SET {sets} WHERE id = ?", vals)
|
|
conn.commit()
|
|
return get_project(conn, id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tasks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def next_task_id(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
category: str | None = None,
|
|
) -> str:
|
|
"""Generate next task ID.
|
|
|
|
Without category: PROJ-001 (backward-compatible old format)
|
|
With category: PROJ-CAT-001 (new format, per-category counter)
|
|
"""
|
|
prefix = project_id.upper()
|
|
existing = list_tasks(conn, project_id=project_id)
|
|
|
|
if category:
|
|
cat_prefix = f"{prefix}-{category}-"
|
|
max_num = 0
|
|
for t in existing:
|
|
tid = t["id"]
|
|
if tid.startswith(cat_prefix):
|
|
try:
|
|
max_num = max(max_num, int(tid[len(cat_prefix):]))
|
|
except ValueError:
|
|
pass
|
|
return f"{prefix}-{category}-{max_num + 1:03d}"
|
|
else:
|
|
# Old format: global max across project (integers only, skip CAT-NNN)
|
|
max_num = 0
|
|
for t in existing:
|
|
tid = t["id"]
|
|
if tid.startswith(prefix + "-"):
|
|
suffix = tid[len(prefix) + 1:]
|
|
try:
|
|
max_num = max(max_num, int(suffix))
|
|
except ValueError:
|
|
pass
|
|
return f"{prefix}-{max_num + 1:03d}"
|
|
|
|
|
|
def create_task(
|
|
conn: sqlite3.Connection,
|
|
id: str,
|
|
project_id: str,
|
|
title: str,
|
|
status: str = "pending",
|
|
priority: int = 5,
|
|
assigned_role: str | None = None,
|
|
parent_task_id: str | None = None,
|
|
brief: dict | None = None,
|
|
spec: dict | None = None,
|
|
forgejo_issue_id: int | None = None,
|
|
execution_mode: str | None = None,
|
|
category: str | None = None,
|
|
acceptance_criteria: str | None = None,
|
|
labels: list | None = None,
|
|
) -> dict:
|
|
"""Create a task linked to a project."""
|
|
conn.execute(
|
|
"""INSERT INTO tasks (id, project_id, title, status, priority,
|
|
assigned_role, parent_task_id, brief, spec, forgejo_issue_id,
|
|
execution_mode, category, acceptance_criteria, labels)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(id, project_id, title, status, priority, assigned_role,
|
|
parent_task_id, _json_encode(brief), _json_encode(spec),
|
|
forgejo_issue_id, execution_mode, category, acceptance_criteria,
|
|
_json_encode(labels)),
|
|
)
|
|
conn.commit()
|
|
return get_task(conn, id)
|
|
|
|
|
|
def get_task(conn: sqlite3.Connection, id: str) -> dict | None:
|
|
"""Get task by id."""
|
|
row = conn.execute("SELECT * FROM tasks WHERE id = ?", (id,)).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
VALID_TASK_SORT_FIELDS = frozenset({
|
|
"updated_at", "created_at", "priority", "status", "title", "id",
|
|
})
|
|
|
|
|
|
def list_tasks(
|
|
conn: sqlite3.Connection,
|
|
project_id: str | None = None,
|
|
status: str | None = None,
|
|
limit: int | None = None,
|
|
sort: str = "updated_at",
|
|
sort_dir: str = "desc",
|
|
) -> list[dict]:
|
|
"""List tasks with optional project/status filters, limit, and sort.
|
|
|
|
sort: column name (validated against VALID_TASK_SORT_FIELDS, default 'updated_at')
|
|
sort_dir: 'asc' or 'desc' (default 'desc')
|
|
"""
|
|
# Validate sort field to prevent SQL injection
|
|
sort_col = sort if sort in VALID_TASK_SORT_FIELDS else "updated_at"
|
|
sort_direction = "DESC" if sort_dir.lower() != "asc" else "ASC"
|
|
|
|
query = "SELECT * FROM tasks WHERE 1=1"
|
|
params: list = []
|
|
if project_id:
|
|
query += " AND project_id = ?"
|
|
params.append(project_id)
|
|
if status:
|
|
query += " AND status = ?"
|
|
params.append(status)
|
|
query += f" ORDER BY {sort_col} {sort_direction}"
|
|
if limit is not None:
|
|
query += " LIMIT ?"
|
|
params.append(limit)
|
|
return _rows_to_list(conn.execute(query, params).fetchall())
|
|
|
|
|
|
def update_task(conn: sqlite3.Connection, id: str, **fields) -> dict:
|
|
"""Update task fields. Auto-sets updated_at. Sets completed_at when status transitions to 'done'."""
|
|
if not fields:
|
|
return get_task(conn, id)
|
|
json_cols = ("brief", "spec", "review", "test_result", "security_result", "labels")
|
|
for key in json_cols:
|
|
if key in fields:
|
|
fields[key] = _json_encode(fields[key])
|
|
if "status" in fields and fields["status"] == "done":
|
|
fields["completed_at"] = datetime.now().isoformat()
|
|
fields["updated_at"] = datetime.now().isoformat()
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values()) + [id]
|
|
conn.execute(f"UPDATE tasks SET {sets} WHERE id = ?", vals)
|
|
conn.commit()
|
|
return get_task(conn, id)
|
|
|
|
|
|
def mark_telegram_sent(conn: sqlite3.Connection, task_id: str) -> None:
|
|
"""Mark that a Telegram escalation was sent for this task."""
|
|
conn.execute(
|
|
"UPDATE tasks SET telegram_sent = 1 WHERE id = ?",
|
|
(task_id,),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Decisions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def add_decision(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
type: str,
|
|
title: str,
|
|
description: str,
|
|
category: str | None = None,
|
|
tags: list | None = None,
|
|
task_id: str | None = None,
|
|
) -> dict:
|
|
"""Record a decision, gotcha, or convention for a project."""
|
|
cur = conn.execute(
|
|
"""INSERT INTO decisions (project_id, task_id, type, category,
|
|
title, description, tags)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(project_id, task_id, type, category, title, description,
|
|
_json_encode(tags)),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM decisions WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def add_decision_if_new(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
type: str,
|
|
title: str,
|
|
description: str,
|
|
category: str | None = None,
|
|
tags: list | None = None,
|
|
task_id: str | None = None,
|
|
) -> dict | None:
|
|
"""Add a decision only if no existing one matches (project_id, type, normalized title).
|
|
|
|
Returns the new decision dict, or None if skipped as duplicate.
|
|
"""
|
|
existing = conn.execute(
|
|
"""SELECT id FROM decisions
|
|
WHERE project_id = ? AND type = ?
|
|
AND lower(trim(title)) = lower(trim(?))""",
|
|
(project_id, type, title),
|
|
).fetchone()
|
|
if existing:
|
|
return None
|
|
return add_decision(conn, project_id, type, title, description,
|
|
category=category, tags=tags, task_id=task_id)
|
|
|
|
|
|
def get_decisions(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
category: str | None = None,
|
|
tags: list | None = None,
|
|
types: list | None = None,
|
|
limit: int | None = None,
|
|
) -> list[dict]:
|
|
"""Query decisions for a project with optional filters.
|
|
|
|
tags: matches if ANY tag is present (OR logic via json_each).
|
|
types: filter by decision type (decision, gotcha, workaround, etc).
|
|
"""
|
|
query = "SELECT DISTINCT d.* FROM decisions d WHERE d.project_id = ?"
|
|
params: list = [project_id]
|
|
if category:
|
|
query += " AND d.category = ?"
|
|
params.append(category)
|
|
if types:
|
|
placeholders = ", ".join("?" for _ in types)
|
|
query += f" AND d.type IN ({placeholders})"
|
|
params.extend(types)
|
|
if tags:
|
|
query += """ AND d.id IN (
|
|
SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t
|
|
WHERE t.value IN ({})
|
|
)""".format(", ".join("?" for _ in tags))
|
|
params.extend(tags)
|
|
query += " ORDER BY d.created_at DESC"
|
|
if limit:
|
|
query += " LIMIT ?"
|
|
params.append(limit)
|
|
return _rows_to_list(conn.execute(query, params).fetchall())
|
|
|
|
|
|
def get_decision(conn: sqlite3.Connection, decision_id: int) -> dict | None:
|
|
"""Get a single decision by id."""
|
|
row = conn.execute("SELECT * FROM decisions WHERE id = ?", (decision_id,)).fetchone()
|
|
return _row_to_dict(row) if row else None
|
|
|
|
|
|
def delete_decision(conn: sqlite3.Connection, decision_id: int) -> bool:
|
|
"""Delete a decision by id. Returns True if deleted, False if not found."""
|
|
cur = conn.execute("DELETE FROM decisions WHERE id = ?", (decision_id,))
|
|
conn.commit()
|
|
return cur.rowcount > 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Modules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def add_module(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
name: str,
|
|
type: str,
|
|
path: str,
|
|
description: str | None = None,
|
|
owner_role: str | None = None,
|
|
dependencies: list | None = None,
|
|
) -> dict:
|
|
"""Register a project module."""
|
|
cur = conn.execute(
|
|
"""INSERT OR IGNORE INTO modules (project_id, name, type, path, description,
|
|
owner_role, dependencies)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(project_id, name, type, path, description, owner_role,
|
|
_json_encode(dependencies)),
|
|
)
|
|
created = cur.rowcount > 0
|
|
conn.commit()
|
|
if cur.lastrowid:
|
|
row = conn.execute(
|
|
"SELECT * FROM modules WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
else:
|
|
row = conn.execute(
|
|
"SELECT * FROM modules WHERE project_id = ? AND name = ?",
|
|
(project_id, name),
|
|
).fetchone()
|
|
result = _row_to_dict(row)
|
|
result["_created"] = created
|
|
return result
|
|
|
|
|
|
def get_modules(conn: sqlite3.Connection, project_id: str) -> list[dict]:
|
|
"""Get all modules for a project."""
|
|
rows = conn.execute(
|
|
"SELECT * FROM modules WHERE project_id = ? ORDER BY type, name",
|
|
(project_id,),
|
|
).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Agent Logs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def log_agent_run(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
agent_role: str,
|
|
action: str,
|
|
task_id: str | None = None,
|
|
session_id: str | None = None,
|
|
input_summary: str | None = None,
|
|
output_summary: str | None = None,
|
|
tokens_used: int | None = None,
|
|
model: str | None = None,
|
|
cost_usd: float | None = None,
|
|
success: bool = True,
|
|
error_message: str | None = None,
|
|
duration_seconds: int | None = None,
|
|
) -> dict:
|
|
"""Log an agent execution run."""
|
|
cur = conn.execute(
|
|
"""INSERT INTO agent_logs (project_id, task_id, agent_role, session_id,
|
|
action, input_summary, output_summary, tokens_used, model,
|
|
cost_usd, success, error_message, duration_seconds)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(project_id, task_id, agent_role, session_id, action, input_summary,
|
|
output_summary, tokens_used, model, cost_usd, success,
|
|
error_message, duration_seconds),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM agent_logs WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pipelines
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_pipeline(
|
|
conn: sqlite3.Connection,
|
|
task_id: str,
|
|
project_id: str,
|
|
route_type: str,
|
|
steps: list | dict,
|
|
parent_pipeline_id: int | None = None,
|
|
department: str | None = None,
|
|
) -> dict:
|
|
"""Create a new pipeline run."""
|
|
cur = conn.execute(
|
|
"""INSERT INTO pipelines (task_id, project_id, route_type, steps, parent_pipeline_id, department)
|
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
|
(task_id, project_id, route_type, _json_encode(steps), parent_pipeline_id, department),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM pipelines WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def update_pipeline(
|
|
conn: sqlite3.Connection,
|
|
id: int,
|
|
status: str | None = None,
|
|
total_cost_usd: float | None = None,
|
|
total_tokens: int | None = None,
|
|
total_duration_seconds: int | None = None,
|
|
pid: int | None = None,
|
|
) -> dict:
|
|
"""Update pipeline status and stats."""
|
|
fields: dict[str, Any] = {}
|
|
if status is not None:
|
|
fields["status"] = status
|
|
if status in ("completed", "failed", "cancelled"):
|
|
fields["completed_at"] = datetime.now().isoformat()
|
|
if total_cost_usd is not None:
|
|
fields["total_cost_usd"] = total_cost_usd
|
|
if total_tokens is not None:
|
|
fields["total_tokens"] = total_tokens
|
|
if total_duration_seconds is not None:
|
|
fields["total_duration_seconds"] = total_duration_seconds
|
|
if pid is not None:
|
|
fields["pid"] = pid
|
|
if fields:
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values()) + [id]
|
|
conn.execute(f"UPDATE pipelines SET {sets} WHERE id = ?", vals)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM pipelines WHERE id = ?", (id,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def get_running_pipelines_with_pid(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Return all running pipelines that have a known PID (used by watchdog)."""
|
|
rows = conn.execute(
|
|
"SELECT id, task_id, pid FROM pipelines WHERE status = 'running' AND pid IS NOT NULL"
|
|
).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
def get_pipeline_for_watch(conn: sqlite3.Connection, task_id: str) -> dict | None:
|
|
"""Return the most recent top-level pipeline for a task (for kin watch)."""
|
|
row = conn.execute(
|
|
"""SELECT id, task_id, project_id, status, pid, steps, created_at, completed_at
|
|
FROM pipelines
|
|
WHERE task_id = ? AND parent_pipeline_id IS NULL
|
|
ORDER BY created_at DESC, id DESC LIMIT 1""",
|
|
(task_id,),
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def count_agent_logs_since(conn: sqlite3.Connection, task_id: str, since_iso: str) -> int:
|
|
"""Return number of agent_logs for a task created at or after since_iso."""
|
|
row = conn.execute(
|
|
"SELECT COUNT(*) FROM agent_logs WHERE task_id = ? AND created_at >= ?",
|
|
(task_id, since_iso),
|
|
).fetchone()
|
|
return row[0]
|
|
|
|
|
|
def get_current_agent_log(
|
|
conn: sqlite3.Connection, task_id: str, since_iso: str
|
|
) -> dict | None:
|
|
"""Return the most recent agent log for a task since a given datetime (for kin watch)."""
|
|
row = conn.execute(
|
|
"""SELECT agent_role, output_summary, duration_seconds, success, created_at
|
|
FROM agent_logs
|
|
WHERE task_id = ? AND created_at >= ?
|
|
ORDER BY id DESC LIMIT 1""",
|
|
(task_id, since_iso),
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def write_log(
|
|
conn: sqlite3.Connection,
|
|
pipeline_id: int,
|
|
message: str,
|
|
level: str = "INFO",
|
|
extra: dict | list | None = None,
|
|
ts: str | None = None,
|
|
) -> dict:
|
|
"""Insert a pipeline log entry. Returns inserted row as dict.
|
|
|
|
ts: optional ISO-8601 UTC timestamp override (e.g. for retrospective PM entries).
|
|
If None, the DB DEFAULT (datetime('now')) is used.
|
|
"""
|
|
extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None
|
|
if ts is not None:
|
|
cur = conn.execute(
|
|
"""INSERT INTO pipeline_log (pipeline_id, ts, message, level, extra_json)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(pipeline_id, ts, message, level, extra_json),
|
|
)
|
|
else:
|
|
cur = conn.execute(
|
|
"""INSERT INTO pipeline_log (pipeline_id, message, level, extra_json)
|
|
VALUES (?, ?, ?, ?)""",
|
|
(pipeline_id, message, level, extra_json),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def get_pipeline_logs(
|
|
conn: sqlite3.Connection,
|
|
pipeline_id: int,
|
|
since_id: int = 0,
|
|
) -> list[dict]:
|
|
"""Get pipeline log entries after since_id in chronological order."""
|
|
rows = conn.execute(
|
|
"""SELECT * FROM pipeline_log
|
|
WHERE pipeline_id = ? AND id > ?
|
|
ORDER BY id ASC""",
|
|
(pipeline_id, since_id),
|
|
).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
def get_all_running_pipelines(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Return all running pipelines with task/project info and current agent (for kin ps)."""
|
|
rows = conn.execute(
|
|
"""SELECT p.id, p.task_id, p.status, p.pid, p.created_at,
|
|
p.parent_pipeline_id,
|
|
t.title, proj.name AS project_name,
|
|
(SELECT agent_role FROM agent_logs
|
|
WHERE task_id = p.task_id AND created_at >= p.created_at
|
|
ORDER BY id DESC LIMIT 1) AS current_agent
|
|
FROM pipelines p
|
|
JOIN tasks t ON p.task_id = t.id
|
|
JOIN projects proj ON p.project_id = proj.id
|
|
WHERE p.status = 'running'
|
|
ORDER BY p.created_at DESC"""
|
|
).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Support
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_ticket(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
source: str,
|
|
client_message: str,
|
|
client_id: str | None = None,
|
|
classification: str | None = None,
|
|
) -> dict:
|
|
"""Create a support ticket."""
|
|
cur = conn.execute(
|
|
"""INSERT INTO support_tickets (project_id, source, client_id,
|
|
client_message, classification)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(project_id, source, client_id, client_message, classification),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM support_tickets WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def list_tickets(
|
|
conn: sqlite3.Connection,
|
|
project_id: str | None = None,
|
|
status: str | None = None,
|
|
) -> list[dict]:
|
|
"""List support tickets with optional filters."""
|
|
query = "SELECT * FROM support_tickets WHERE 1=1"
|
|
params: list = []
|
|
if project_id:
|
|
query += " AND project_id = ?"
|
|
params.append(project_id)
|
|
if status:
|
|
query += " AND status = ?"
|
|
params.append(status)
|
|
query += " ORDER BY created_at DESC"
|
|
return _rows_to_list(conn.execute(query, params).fetchall())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Audit Log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def log_audit_event(
|
|
conn: sqlite3.Connection,
|
|
event_type: str,
|
|
task_id: str | None = None,
|
|
step_id: str | None = None,
|
|
reason: str | None = None,
|
|
project_id: str | None = None,
|
|
) -> dict:
|
|
"""Log a security-sensitive event to audit_log.
|
|
|
|
event_type='dangerous_skip' is used when --dangerously-skip-permissions is invoked.
|
|
"""
|
|
cur = conn.execute(
|
|
"""INSERT INTO audit_log (event_type, task_id, step_id, reason, project_id)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(event_type, task_id, step_id, reason, project_id),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM audit_log WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def get_audit_log(
|
|
conn: sqlite3.Connection,
|
|
task_id: str | None = None,
|
|
project_id: str | None = None,
|
|
event_type: str | None = None,
|
|
limit: int = 100,
|
|
) -> list[dict]:
|
|
"""Query audit log entries with optional filters."""
|
|
query = "SELECT * FROM audit_log WHERE 1=1"
|
|
params: list = []
|
|
if task_id:
|
|
query += " AND task_id = ?"
|
|
params.append(task_id)
|
|
if project_id:
|
|
query += " AND project_id = ?"
|
|
params.append(project_id)
|
|
if event_type:
|
|
query += " AND event_type = ?"
|
|
params.append(event_type)
|
|
query += " ORDER BY timestamp DESC LIMIT ?"
|
|
params.append(limit)
|
|
return _rows_to_list(conn.execute(query, params).fetchall())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Statistics / Dashboard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_project_summary(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Get all projects with task counts by status."""
|
|
rows = conn.execute("""
|
|
SELECT p.*,
|
|
COUNT(t.id) AS total_tasks,
|
|
SUM(CASE WHEN t.status = 'done' THEN 1 ELSE 0 END) AS done_tasks,
|
|
SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) AS active_tasks,
|
|
SUM(CASE WHEN t.status = 'blocked' THEN 1 ELSE 0 END) AS blocked_tasks,
|
|
SUM(CASE WHEN t.status = 'review' THEN 1 ELSE 0 END) AS review_tasks
|
|
FROM projects p
|
|
LEFT JOIN tasks t ON t.project_id = p.id
|
|
GROUP BY p.id
|
|
ORDER BY p.priority, p.name
|
|
""").fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
def get_cost_summary(conn: sqlite3.Connection, days: int = 7) -> list[dict]:
|
|
"""Get cost summary by project for the last N days."""
|
|
rows = conn.execute("""
|
|
SELECT
|
|
p.id AS project_id,
|
|
p.name AS project_name,
|
|
COUNT(a.id) AS runs,
|
|
COALESCE(SUM(a.tokens_used), 0) AS total_tokens,
|
|
COALESCE(SUM(a.cost_usd), 0) AS total_cost_usd,
|
|
COALESCE(SUM(a.duration_seconds), 0) AS total_duration_seconds
|
|
FROM projects p
|
|
LEFT JOIN agent_logs a ON a.project_id = p.id
|
|
AND a.created_at >= datetime('now', ?)
|
|
GROUP BY p.id
|
|
HAVING runs > 0
|
|
ORDER BY total_cost_usd DESC
|
|
""", (f"-{days} days",)).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Project Phases (KIN-059)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_phase(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
role: str,
|
|
phase_order: int,
|
|
) -> dict:
|
|
"""Create a research phase for a project."""
|
|
cur = conn.execute(
|
|
"""INSERT INTO project_phases (project_id, role, phase_order, status)
|
|
VALUES (?, ?, ?, 'pending')""",
|
|
(project_id, role, phase_order),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM project_phases WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def get_phase(conn: sqlite3.Connection, phase_id: int) -> dict | None:
|
|
"""Get a project phase by id."""
|
|
row = conn.execute(
|
|
"SELECT * FROM project_phases WHERE id = ?", (phase_id,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def list_phases(conn: sqlite3.Connection, project_id: str) -> list[dict]:
|
|
"""List all phases for a project ordered by phase_order."""
|
|
rows = conn.execute(
|
|
"SELECT * FROM project_phases WHERE project_id = ? ORDER BY phase_order",
|
|
(project_id,),
|
|
).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
def update_phase(conn: sqlite3.Connection, phase_id: int, **fields) -> dict:
|
|
"""Update phase fields. Auto-sets updated_at."""
|
|
if not fields:
|
|
return get_phase(conn, phase_id)
|
|
fields["updated_at"] = datetime.now().isoformat()
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values()) + [phase_id]
|
|
conn.execute(f"UPDATE project_phases SET {sets} WHERE id = ?", vals)
|
|
conn.commit()
|
|
return get_phase(conn, phase_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Project Environments (KIN-087)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_fernet():
|
|
"""Get Fernet instance using KIN_SECRET_KEY env var.
|
|
|
|
Raises RuntimeError if KIN_SECRET_KEY is not set.
|
|
"""
|
|
key = os.environ.get("KIN_SECRET_KEY")
|
|
if not key:
|
|
raise RuntimeError(
|
|
"KIN_SECRET_KEY environment variable is not set. "
|
|
"Generate with: python -c \"from cryptography.fernet import Fernet; "
|
|
"print(Fernet.generate_key().decode())\""
|
|
)
|
|
from cryptography.fernet import Fernet
|
|
return Fernet(key.encode())
|
|
|
|
|
|
def _encrypt_auth(value: str) -> str:
|
|
"""Encrypt auth_value using Fernet (AES-128-CBC + HMAC-SHA256)."""
|
|
return _get_fernet().encrypt(value.encode()).decode()
|
|
|
|
|
|
def _decrypt_auth(
|
|
stored: str,
|
|
conn: sqlite3.Connection | None = None,
|
|
env_id: int | None = None,
|
|
) -> str:
|
|
"""Decrypt auth_value. Handles migration from legacy base64 obfuscation.
|
|
|
|
If stored value uses the old b64: prefix, decodes it and re-encrypts
|
|
in the DB (re-encrypt on read) if conn and env_id are provided.
|
|
"""
|
|
if not stored:
|
|
return stored
|
|
from cryptography.fernet import InvalidToken
|
|
try:
|
|
return _get_fernet().decrypt(stored.encode()).decode()
|
|
except (InvalidToken, Exception):
|
|
# Legacy b64: format — migrate on read
|
|
if stored.startswith("b64:"):
|
|
plaintext = base64.b64decode(stored[4:]).decode()
|
|
if conn is not None and env_id is not None:
|
|
new_encrypted = _encrypt_auth(plaintext)
|
|
conn.execute(
|
|
"UPDATE project_environments SET auth_value = ? WHERE id = ?",
|
|
(new_encrypted, env_id),
|
|
)
|
|
conn.commit()
|
|
return plaintext
|
|
return stored
|
|
|
|
|
|
def create_environment(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
name: str,
|
|
host: str,
|
|
username: str,
|
|
port: int = 22,
|
|
auth_type: str = "password",
|
|
auth_value: str | None = None,
|
|
is_installed: bool = False,
|
|
) -> dict:
|
|
"""Create a project environment. auth_value stored Fernet-encrypted; returned as None."""
|
|
obfuscated = _encrypt_auth(auth_value) if auth_value else None
|
|
cur = conn.execute(
|
|
"""INSERT INTO project_environments
|
|
(project_id, name, host, port, username, auth_type, auth_value, is_installed)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(project_id, name, host, port, username, auth_type, obfuscated, int(is_installed)),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM project_environments WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
result = _row_to_dict(row)
|
|
result["auth_value"] = None # never expose in API responses
|
|
return result
|
|
|
|
|
|
def get_environment(conn: sqlite3.Connection, env_id: int) -> dict | None:
|
|
"""Get environment by id. auth_value is returned decrypted (for internal use)."""
|
|
row = conn.execute(
|
|
"SELECT * FROM project_environments WHERE id = ?", (env_id,)
|
|
).fetchone()
|
|
result = _row_to_dict(row)
|
|
if result and result.get("auth_value"):
|
|
result["auth_value"] = _decrypt_auth(result["auth_value"], conn=conn, env_id=env_id)
|
|
return result
|
|
|
|
|
|
def list_environments(conn: sqlite3.Connection, project_id: str) -> list[dict]:
|
|
"""List all environments for a project. auth_value is always None in response."""
|
|
rows = conn.execute(
|
|
"SELECT * FROM project_environments WHERE project_id = ? ORDER BY created_at",
|
|
(project_id,),
|
|
).fetchall()
|
|
result = _rows_to_list(rows)
|
|
for env in result:
|
|
env["auth_value"] = None
|
|
return result
|
|
|
|
|
|
def update_environment(conn: sqlite3.Connection, env_id: int, **fields) -> dict:
|
|
"""Update environment fields. Auto-sets updated_at. Returns record with auth_value=None."""
|
|
if not fields:
|
|
result = get_environment(conn, env_id)
|
|
if result:
|
|
result["auth_value"] = None
|
|
return result
|
|
if "auth_value" in fields and fields["auth_value"]:
|
|
fields["auth_value"] = _encrypt_auth(fields["auth_value"])
|
|
elif "auth_value" in fields:
|
|
del fields["auth_value"] # empty/None = don't update auth_value
|
|
fields["updated_at"] = datetime.now().isoformat()
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values()) + [env_id]
|
|
conn.execute(f"UPDATE project_environments SET {sets} WHERE id = ?", vals)
|
|
conn.commit()
|
|
result = get_environment(conn, env_id)
|
|
if result:
|
|
result["auth_value"] = None
|
|
return result
|
|
|
|
|
|
def delete_environment(conn: sqlite3.Connection, env_id: int) -> bool:
|
|
"""Delete environment by id. Returns True if deleted, False if not found."""
|
|
cur = conn.execute(
|
|
"DELETE FROM project_environments WHERE id = ?", (env_id,)
|
|
)
|
|
conn.commit()
|
|
return cur.rowcount > 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chat Messages (KIN-OBS-012)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def add_chat_message(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
role: str,
|
|
content: str,
|
|
message_type: str = "text",
|
|
task_id: str | None = None,
|
|
) -> dict:
|
|
"""Add a chat message and return it as dict.
|
|
|
|
role: 'user' | 'assistant' | 'system'
|
|
message_type: 'text' | 'task_created' | 'error'
|
|
task_id: set for message_type='task_created' to link to the created task.
|
|
"""
|
|
cur = conn.execute(
|
|
"""INSERT INTO chat_messages (project_id, role, content, message_type, task_id)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(project_id, role, content, message_type, task_id),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM chat_messages WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Task Attachments (KIN-090)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_attachment(
|
|
conn: sqlite3.Connection,
|
|
task_id: str,
|
|
filename: str,
|
|
path: str,
|
|
mime_type: str,
|
|
size: int,
|
|
) -> dict:
|
|
"""Create a task attachment record. path must be absolute."""
|
|
cur = conn.execute(
|
|
"""INSERT INTO task_attachments (task_id, filename, path, mime_type, size)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(task_id, filename, path, mime_type, size),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM task_attachments WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def list_attachments(conn: sqlite3.Connection, task_id: str) -> list[dict]:
|
|
"""List all attachments for a task ordered by creation time."""
|
|
rows = conn.execute(
|
|
"SELECT * FROM task_attachments WHERE task_id = ? ORDER BY created_at",
|
|
(task_id,),
|
|
).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
def get_attachment(conn: sqlite3.Connection, attachment_id: int) -> dict | None:
|
|
"""Get a single attachment by id."""
|
|
row = conn.execute(
|
|
"SELECT * FROM task_attachments WHERE id = ?", (attachment_id,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def delete_attachment(conn: sqlite3.Connection, attachment_id: int) -> bool:
|
|
"""Delete attachment record. Returns True if deleted, False if not found."""
|
|
cur = conn.execute("DELETE FROM task_attachments WHERE id = ?", (attachment_id,))
|
|
conn.commit()
|
|
return cur.rowcount > 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Department Handoffs (KIN-098)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_handoff(
|
|
conn: sqlite3.Connection,
|
|
pipeline_id: int,
|
|
task_id: str,
|
|
from_department: str,
|
|
to_department: str | None = None,
|
|
artifacts: dict | None = None,
|
|
decisions_made: list | None = None,
|
|
blockers: list | None = None,
|
|
status: str = "pending",
|
|
) -> dict:
|
|
"""Record a department handoff with artifacts for inter-department context."""
|
|
cur = conn.execute(
|
|
"""INSERT INTO department_handoffs
|
|
(pipeline_id, task_id, from_department, to_department, artifacts, decisions_made, blockers, status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(pipeline_id, task_id, from_department, to_department,
|
|
_json_encode(artifacts), _json_encode(decisions_made), _json_encode(blockers), status),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM department_handoffs WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def get_handoffs_for_task(conn: sqlite3.Connection, task_id: str) -> list[dict]:
|
|
"""Get all handoffs for a task ordered by creation time."""
|
|
rows = conn.execute(
|
|
"SELECT * FROM department_handoffs WHERE task_id = ? ORDER BY created_at, id ASC",
|
|
(task_id,),
|
|
).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
def get_last_handoff(
|
|
conn: sqlite3.Connection,
|
|
task_id: str,
|
|
to_department: str | None = None,
|
|
) -> dict | None:
|
|
"""Get the most recent handoff for a task, optionally filtered by destination department."""
|
|
if to_department:
|
|
row = conn.execute(
|
|
"""SELECT * FROM department_handoffs
|
|
WHERE task_id = ? AND to_department = ?
|
|
ORDER BY created_at DESC, id DESC LIMIT 1""",
|
|
(task_id, to_department),
|
|
).fetchone()
|
|
else:
|
|
row = conn.execute(
|
|
"""SELECT * FROM department_handoffs
|
|
WHERE task_id = ?
|
|
ORDER BY created_at DESC, id DESC LIMIT 1""",
|
|
(task_id,),
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Project Links (KIN-079)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_project_link(
|
|
conn: sqlite3.Connection,
|
|
from_project: str,
|
|
to_project: str,
|
|
type: str,
|
|
description: str | None = None,
|
|
) -> dict:
|
|
"""Create a project dependency link. Returns the created link as dict."""
|
|
cur = conn.execute(
|
|
"""INSERT INTO project_links (from_project, to_project, type, description)
|
|
VALUES (?, ?, ?, ?)""",
|
|
(from_project, to_project, type, description),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM project_links WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def get_project_links(conn: sqlite3.Connection, project_id: str) -> list[dict]:
|
|
"""Get all project links where project_id is from_project or to_project."""
|
|
rows = conn.execute(
|
|
"SELECT * FROM project_links WHERE from_project = ? OR to_project = ?"
|
|
" ORDER BY created_at",
|
|
(project_id, project_id),
|
|
).fetchall()
|
|
return _rows_to_list(rows)
|
|
|
|
|
|
def delete_project_link(conn: sqlite3.Connection, link_id: int) -> bool:
|
|
"""Delete a project link by id. Returns True if deleted, False if not found."""
|
|
cur = conn.execute("DELETE FROM project_links WHERE id = ?", (link_id,))
|
|
conn.commit()
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def get_chat_messages(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
limit: int = 50,
|
|
before_id: int | None = None,
|
|
) -> list[dict]:
|
|
"""Get chat messages for a project in chronological order (oldest first).
|
|
|
|
before_id: pagination cursor — return messages with id < before_id.
|
|
"""
|
|
query = "SELECT * FROM chat_messages WHERE project_id = ?"
|
|
params: list = [project_id]
|
|
if before_id is not None:
|
|
query += " AND id < ?"
|
|
params.append(before_id)
|
|
query += " ORDER BY created_at ASC, id ASC LIMIT ?"
|
|
params.append(limit)
|
|
return _rows_to_list(conn.execute(query, params).fetchall())
|