kin/core/models.py

683 lines
22 KiB
Python

"""
Kin — data access functions for all tables.
Pure functions: (conn, params) → dict | list[dict]. No ORM, no classes.
"""
import json
import sqlite3
from datetime import datetime
from typing import Any
VALID_TASK_STATUSES = [
"pending", "in_progress", "review", "done",
"blocked", "decomposed", "cancelled",
]
VALID_COMPLETION_MODES = {"auto_complete", "review"}
TASK_CATEGORIES = [
"SEC", "UI", "API", "INFRA", "BIZ", "DB",
"ARCH", "TEST", "PERF", "DOCS", "FIX", "OBS",
]
def validate_completion_mode(value: str) -> str:
"""Validate completion mode from LLM output. Falls back to 'review' if invalid."""
if value in VALID_COMPLETION_MODES:
return value
return "review"
def _row_to_dict(row: sqlite3.Row | None) -> dict | None:
"""Convert sqlite3.Row to dict with JSON fields decoded."""
if row is None:
return None
d = dict(row)
for key, val in d.items():
if isinstance(val, str) and val.startswith(("[", "{")):
try:
d[key] = json.loads(val)
except (json.JSONDecodeError, ValueError):
pass
return d
def _rows_to_list(rows: list[sqlite3.Row]) -> list[dict]:
"""Convert list of sqlite3.Row to list of dicts."""
return [_row_to_dict(r) for r in rows]
def _json_encode(val: Any) -> Any:
"""Encode lists/dicts to JSON strings for storage."""
if isinstance(val, (list, dict)):
return json.dumps(val, ensure_ascii=False)
return val
# ---------------------------------------------------------------------------
# Projects
# ---------------------------------------------------------------------------
def create_project(
conn: sqlite3.Connection,
id: str,
name: str,
path: str | None = None,
tech_stack: list | None = None,
status: str = "active",
priority: int = 5,
pm_prompt: str | None = None,
claude_md_path: str | None = None,
forgejo_repo: str | None = None,
language: str = "ru",
execution_mode: str = "review",
project_type: str = "development",
ssh_host: str | None = None,
ssh_user: str | None = None,
ssh_key_path: str | None = None,
ssh_proxy_jump: str | None = None,
description: str | None = None,
) -> dict:
"""Create a new project and return it as dict."""
conn.execute(
"""INSERT INTO projects (id, name, path, tech_stack, status, priority,
pm_prompt, claude_md_path, forgejo_repo, language, execution_mode,
project_type, ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump, description)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(id, name, path, _json_encode(tech_stack), status, priority,
pm_prompt, claude_md_path, forgejo_repo, language, execution_mode,
project_type, ssh_host, ssh_user, ssh_key_path, ssh_proxy_jump, description),
)
conn.commit()
return get_project(conn, id)
def get_project(conn: sqlite3.Connection, id: str) -> dict | None:
"""Get project by id."""
row = conn.execute("SELECT * FROM projects WHERE id = ?", (id,)).fetchone()
return _row_to_dict(row)
def get_effective_mode(conn: sqlite3.Connection, project_id: str, task_id: str) -> str:
"""Return effective execution mode: 'auto' or 'review'.
Priority: task.execution_mode > project.execution_mode > 'review'
"""
task = get_task(conn, task_id)
if task and task.get("execution_mode"):
return task["execution_mode"]
project = get_project(conn, project_id)
if project:
return project.get("execution_mode") or "review"
return "review"
def list_projects(conn: sqlite3.Connection, status: str | None = None) -> list[dict]:
"""List projects, optionally filtered by status."""
if status:
rows = conn.execute(
"SELECT * FROM projects WHERE status = ? ORDER BY priority, name",
(status,),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM projects ORDER BY priority, name"
).fetchall()
return _rows_to_list(rows)
def update_project(conn: sqlite3.Connection, id: str, **fields) -> dict:
"""Update project fields. Returns updated project."""
if not fields:
return get_project(conn, id)
for key in ("tech_stack",):
if key in fields:
fields[key] = _json_encode(fields[key])
sets = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values()) + [id]
conn.execute(f"UPDATE projects SET {sets} WHERE id = ?", vals)
conn.commit()
return get_project(conn, id)
# ---------------------------------------------------------------------------
# Tasks
# ---------------------------------------------------------------------------
def next_task_id(
conn: sqlite3.Connection,
project_id: str,
category: str | None = None,
) -> str:
"""Generate next task ID.
Without category: PROJ-001 (backward-compatible old format)
With category: PROJ-CAT-001 (new format, per-category counter)
"""
prefix = project_id.upper()
existing = list_tasks(conn, project_id=project_id)
if category:
cat_prefix = f"{prefix}-{category}-"
max_num = 0
for t in existing:
tid = t["id"]
if tid.startswith(cat_prefix):
try:
max_num = max(max_num, int(tid[len(cat_prefix):]))
except ValueError:
pass
return f"{prefix}-{category}-{max_num + 1:03d}"
else:
# Old format: global max across project (integers only, skip CAT-NNN)
max_num = 0
for t in existing:
tid = t["id"]
if tid.startswith(prefix + "-"):
suffix = tid[len(prefix) + 1:]
try:
max_num = max(max_num, int(suffix))
except ValueError:
pass
return f"{prefix}-{max_num + 1:03d}"
def create_task(
conn: sqlite3.Connection,
id: str,
project_id: str,
title: str,
status: str = "pending",
priority: int = 5,
assigned_role: str | None = None,
parent_task_id: str | None = None,
brief: dict | None = None,
spec: dict | None = None,
forgejo_issue_id: int | None = None,
execution_mode: str | None = None,
category: str | None = None,
) -> dict:
"""Create a task linked to a project."""
conn.execute(
"""INSERT INTO tasks (id, project_id, title, status, priority,
assigned_role, parent_task_id, brief, spec, forgejo_issue_id,
execution_mode, category)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(id, project_id, title, status, priority, assigned_role,
parent_task_id, _json_encode(brief), _json_encode(spec),
forgejo_issue_id, execution_mode, category),
)
conn.commit()
return get_task(conn, id)
def get_task(conn: sqlite3.Connection, id: str) -> dict | None:
"""Get task by id."""
row = conn.execute("SELECT * FROM tasks WHERE id = ?", (id,)).fetchone()
return _row_to_dict(row)
def list_tasks(
conn: sqlite3.Connection,
project_id: str | None = None,
status: str | None = None,
) -> list[dict]:
"""List tasks with optional project/status filters."""
query = "SELECT * FROM tasks WHERE 1=1"
params: list = []
if project_id:
query += " AND project_id = ?"
params.append(project_id)
if status:
query += " AND status = ?"
params.append(status)
query += " ORDER BY priority, created_at"
return _rows_to_list(conn.execute(query, params).fetchall())
def update_task(conn: sqlite3.Connection, id: str, **fields) -> dict:
"""Update task fields. Auto-sets updated_at."""
if not fields:
return get_task(conn, id)
json_cols = ("brief", "spec", "review", "test_result", "security_result")
for key in json_cols:
if key in fields:
fields[key] = _json_encode(fields[key])
fields["updated_at"] = datetime.now().isoformat()
sets = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values()) + [id]
conn.execute(f"UPDATE tasks SET {sets} WHERE id = ?", vals)
conn.commit()
return get_task(conn, id)
def mark_telegram_sent(conn: sqlite3.Connection, task_id: str) -> None:
"""Mark that a Telegram escalation was sent for this task."""
conn.execute(
"UPDATE tasks SET telegram_sent = 1 WHERE id = ?",
(task_id,),
)
conn.commit()
# ---------------------------------------------------------------------------
# Decisions
# ---------------------------------------------------------------------------
def add_decision(
conn: sqlite3.Connection,
project_id: str,
type: str,
title: str,
description: str,
category: str | None = None,
tags: list | None = None,
task_id: str | None = None,
) -> dict:
"""Record a decision, gotcha, or convention for a project."""
cur = conn.execute(
"""INSERT INTO decisions (project_id, task_id, type, category,
title, description, tags)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(project_id, task_id, type, category, title, description,
_json_encode(tags)),
)
conn.commit()
row = conn.execute(
"SELECT * FROM decisions WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
def add_decision_if_new(
conn: sqlite3.Connection,
project_id: str,
type: str,
title: str,
description: str,
category: str | None = None,
tags: list | None = None,
task_id: str | None = None,
) -> dict | None:
"""Add a decision only if no existing one matches (project_id, type, normalized title).
Returns the new decision dict, or None if skipped as duplicate.
"""
existing = conn.execute(
"""SELECT id FROM decisions
WHERE project_id = ? AND type = ?
AND lower(trim(title)) = lower(trim(?))""",
(project_id, type, title),
).fetchone()
if existing:
return None
return add_decision(conn, project_id, type, title, description,
category=category, tags=tags, task_id=task_id)
def get_decisions(
conn: sqlite3.Connection,
project_id: str,
category: str | None = None,
tags: list | None = None,
types: list | None = None,
limit: int | None = None,
) -> list[dict]:
"""Query decisions for a project with optional filters.
tags: matches if ANY tag is present (OR logic via json_each).
types: filter by decision type (decision, gotcha, workaround, etc).
"""
query = "SELECT DISTINCT d.* FROM decisions d WHERE d.project_id = ?"
params: list = [project_id]
if category:
query += " AND d.category = ?"
params.append(category)
if types:
placeholders = ", ".join("?" for _ in types)
query += f" AND d.type IN ({placeholders})"
params.extend(types)
if tags:
query += """ AND d.id IN (
SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t
WHERE t.value IN ({})
)""".format(", ".join("?" for _ in tags))
params.extend(tags)
query += " ORDER BY d.created_at DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
return _rows_to_list(conn.execute(query, params).fetchall())
def get_decision(conn: sqlite3.Connection, decision_id: int) -> dict | None:
"""Get a single decision by id."""
row = conn.execute("SELECT * FROM decisions WHERE id = ?", (decision_id,)).fetchone()
return _row_to_dict(row) if row else None
def delete_decision(conn: sqlite3.Connection, decision_id: int) -> bool:
"""Delete a decision by id. Returns True if deleted, False if not found."""
cur = conn.execute("DELETE FROM decisions WHERE id = ?", (decision_id,))
conn.commit()
return cur.rowcount > 0
# ---------------------------------------------------------------------------
# Modules
# ---------------------------------------------------------------------------
def add_module(
conn: sqlite3.Connection,
project_id: str,
name: str,
type: str,
path: str,
description: str | None = None,
owner_role: str | None = None,
dependencies: list | None = None,
) -> dict:
"""Register a project module."""
cur = conn.execute(
"""INSERT INTO modules (project_id, name, type, path, description,
owner_role, dependencies)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(project_id, name, type, path, description, owner_role,
_json_encode(dependencies)),
)
conn.commit()
row = conn.execute(
"SELECT * FROM modules WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
def get_modules(conn: sqlite3.Connection, project_id: str) -> list[dict]:
"""Get all modules for a project."""
rows = conn.execute(
"SELECT * FROM modules WHERE project_id = ? ORDER BY type, name",
(project_id,),
).fetchall()
return _rows_to_list(rows)
# ---------------------------------------------------------------------------
# Agent Logs
# ---------------------------------------------------------------------------
def log_agent_run(
conn: sqlite3.Connection,
project_id: str,
agent_role: str,
action: str,
task_id: str | None = None,
session_id: str | None = None,
input_summary: str | None = None,
output_summary: str | None = None,
tokens_used: int | None = None,
model: str | None = None,
cost_usd: float | None = None,
success: bool = True,
error_message: str | None = None,
duration_seconds: int | None = None,
) -> dict:
"""Log an agent execution run."""
cur = conn.execute(
"""INSERT INTO agent_logs (project_id, task_id, agent_role, session_id,
action, input_summary, output_summary, tokens_used, model,
cost_usd, success, error_message, duration_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(project_id, task_id, agent_role, session_id, action, input_summary,
output_summary, tokens_used, model, cost_usd, success,
error_message, duration_seconds),
)
conn.commit()
row = conn.execute(
"SELECT * FROM agent_logs WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
# ---------------------------------------------------------------------------
# Pipelines
# ---------------------------------------------------------------------------
def create_pipeline(
conn: sqlite3.Connection,
task_id: str,
project_id: str,
route_type: str,
steps: list | dict,
) -> dict:
"""Create a new pipeline run."""
cur = conn.execute(
"""INSERT INTO pipelines (task_id, project_id, route_type, steps)
VALUES (?, ?, ?, ?)""",
(task_id, project_id, route_type, _json_encode(steps)),
)
conn.commit()
row = conn.execute(
"SELECT * FROM pipelines WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
def update_pipeline(
conn: sqlite3.Connection,
id: int,
status: str | None = None,
total_cost_usd: float | None = None,
total_tokens: int | None = None,
total_duration_seconds: int | None = None,
) -> dict:
"""Update pipeline status and stats."""
fields: dict[str, Any] = {}
if status is not None:
fields["status"] = status
if status in ("completed", "failed", "cancelled"):
fields["completed_at"] = datetime.now().isoformat()
if total_cost_usd is not None:
fields["total_cost_usd"] = total_cost_usd
if total_tokens is not None:
fields["total_tokens"] = total_tokens
if total_duration_seconds is not None:
fields["total_duration_seconds"] = total_duration_seconds
if fields:
sets = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values()) + [id]
conn.execute(f"UPDATE pipelines SET {sets} WHERE id = ?", vals)
conn.commit()
row = conn.execute(
"SELECT * FROM pipelines WHERE id = ?", (id,)
).fetchone()
return _row_to_dict(row)
# ---------------------------------------------------------------------------
# Support
# ---------------------------------------------------------------------------
def create_ticket(
conn: sqlite3.Connection,
project_id: str,
source: str,
client_message: str,
client_id: str | None = None,
classification: str | None = None,
) -> dict:
"""Create a support ticket."""
cur = conn.execute(
"""INSERT INTO support_tickets (project_id, source, client_id,
client_message, classification)
VALUES (?, ?, ?, ?, ?)""",
(project_id, source, client_id, client_message, classification),
)
conn.commit()
row = conn.execute(
"SELECT * FROM support_tickets WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
def list_tickets(
conn: sqlite3.Connection,
project_id: str | None = None,
status: str | None = None,
) -> list[dict]:
"""List support tickets with optional filters."""
query = "SELECT * FROM support_tickets WHERE 1=1"
params: list = []
if project_id:
query += " AND project_id = ?"
params.append(project_id)
if status:
query += " AND status = ?"
params.append(status)
query += " ORDER BY created_at DESC"
return _rows_to_list(conn.execute(query, params).fetchall())
# ---------------------------------------------------------------------------
# Audit Log
# ---------------------------------------------------------------------------
def log_audit_event(
conn: sqlite3.Connection,
event_type: str,
task_id: str | None = None,
step_id: str | None = None,
reason: str | None = None,
project_id: str | None = None,
) -> dict:
"""Log a security-sensitive event to audit_log.
event_type='dangerous_skip' is used when --dangerously-skip-permissions is invoked.
"""
cur = conn.execute(
"""INSERT INTO audit_log (event_type, task_id, step_id, reason, project_id)
VALUES (?, ?, ?, ?, ?)""",
(event_type, task_id, step_id, reason, project_id),
)
conn.commit()
row = conn.execute(
"SELECT * FROM audit_log WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
def get_audit_log(
conn: sqlite3.Connection,
task_id: str | None = None,
project_id: str | None = None,
event_type: str | None = None,
limit: int = 100,
) -> list[dict]:
"""Query audit log entries with optional filters."""
query = "SELECT * FROM audit_log WHERE 1=1"
params: list = []
if task_id:
query += " AND task_id = ?"
params.append(task_id)
if project_id:
query += " AND project_id = ?"
params.append(project_id)
if event_type:
query += " AND event_type = ?"
params.append(event_type)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
return _rows_to_list(conn.execute(query, params).fetchall())
# ---------------------------------------------------------------------------
# Statistics / Dashboard
# ---------------------------------------------------------------------------
def get_project_summary(conn: sqlite3.Connection) -> list[dict]:
"""Get all projects with task counts by status."""
rows = conn.execute("""
SELECT p.*,
COUNT(t.id) AS total_tasks,
SUM(CASE WHEN t.status = 'done' THEN 1 ELSE 0 END) AS done_tasks,
SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) AS active_tasks,
SUM(CASE WHEN t.status = 'blocked' THEN 1 ELSE 0 END) AS blocked_tasks,
SUM(CASE WHEN t.status = 'review' THEN 1 ELSE 0 END) AS review_tasks
FROM projects p
LEFT JOIN tasks t ON t.project_id = p.id
GROUP BY p.id
ORDER BY p.priority, p.name
""").fetchall()
return _rows_to_list(rows)
def get_cost_summary(conn: sqlite3.Connection, days: int = 7) -> list[dict]:
"""Get cost summary by project for the last N days."""
rows = conn.execute("""
SELECT
p.id AS project_id,
p.name AS project_name,
COUNT(a.id) AS runs,
COALESCE(SUM(a.tokens_used), 0) AS total_tokens,
COALESCE(SUM(a.cost_usd), 0) AS total_cost_usd,
COALESCE(SUM(a.duration_seconds), 0) AS total_duration_seconds
FROM projects p
LEFT JOIN agent_logs a ON a.project_id = p.id
AND a.created_at >= datetime('now', ?)
GROUP BY p.id
HAVING runs > 0
ORDER BY total_cost_usd DESC
""", (f"-{days} days",)).fetchall()
return _rows_to_list(rows)
# ---------------------------------------------------------------------------
# Project Phases (KIN-059)
# ---------------------------------------------------------------------------
def create_phase(
conn: sqlite3.Connection,
project_id: str,
role: str,
phase_order: int,
) -> dict:
"""Create a research phase for a project."""
cur = conn.execute(
"""INSERT INTO project_phases (project_id, role, phase_order, status)
VALUES (?, ?, ?, 'pending')""",
(project_id, role, phase_order),
)
conn.commit()
row = conn.execute(
"SELECT * FROM project_phases WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
def get_phase(conn: sqlite3.Connection, phase_id: int) -> dict | None:
"""Get a project phase by id."""
row = conn.execute(
"SELECT * FROM project_phases WHERE id = ?", (phase_id,)
).fetchone()
return _row_to_dict(row)
def list_phases(conn: sqlite3.Connection, project_id: str) -> list[dict]:
"""List all phases for a project ordered by phase_order."""
rows = conn.execute(
"SELECT * FROM project_phases WHERE project_id = ? ORDER BY phase_order",
(project_id,),
).fetchall()
return _rows_to_list(rows)
def update_phase(conn: sqlite3.Connection, phase_id: int, **fields) -> dict:
"""Update phase fields. Auto-sets updated_at."""
if not fields:
return get_phase(conn, phase_id)
fields["updated_at"] = datetime.now().isoformat()
sets = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values()) + [phase_id]
conn.execute(f"UPDATE project_phases SET {sets} WHERE id = ?", vals)
conn.commit()
return get_phase(conn, phase_id)