Add core/models.py — data access functions for all 9 tables
20 functions covering: projects, tasks, decisions, modules, agent_logs, pipelines, support tickets, and dashboard stats. Parameterized queries, JSON encode/decode, no ORM. 21 tests, all passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
d7491705d9
commit
3db73332ad
3 changed files with 685 additions and 0 deletions
445
core/models.py
Normal file
445
core/models.py
Normal file
|
|
@ -0,0 +1,445 @@
|
|||
"""
|
||||
Kin — data access functions for all tables.
|
||||
Pure functions: (conn, params) → dict | list[dict]. No ORM, no classes.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _row_to_dict(row: sqlite3.Row | None) -> dict | None:
|
||||
"""Convert sqlite3.Row to dict with JSON fields decoded."""
|
||||
if row is None:
|
||||
return None
|
||||
d = dict(row)
|
||||
for key, val in d.items():
|
||||
if isinstance(val, str) and val.startswith(("[", "{")):
|
||||
try:
|
||||
d[key] = json.loads(val)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
return d
|
||||
|
||||
|
||||
def _rows_to_list(rows: list[sqlite3.Row]) -> list[dict]:
|
||||
"""Convert list of sqlite3.Row to list of dicts."""
|
||||
return [_row_to_dict(r) for r in rows]
|
||||
|
||||
|
||||
def _json_encode(val: Any) -> Any:
|
||||
"""Encode lists/dicts to JSON strings for storage."""
|
||||
if isinstance(val, (list, dict)):
|
||||
return json.dumps(val, ensure_ascii=False)
|
||||
return val
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Projects
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def create_project(
|
||||
conn: sqlite3.Connection,
|
||||
id: str,
|
||||
name: str,
|
||||
path: str,
|
||||
tech_stack: list | None = None,
|
||||
status: str = "active",
|
||||
priority: int = 5,
|
||||
pm_prompt: str | None = None,
|
||||
claude_md_path: str | None = None,
|
||||
forgejo_repo: str | None = None,
|
||||
) -> dict:
|
||||
"""Create a new project and return it as dict."""
|
||||
conn.execute(
|
||||
"""INSERT INTO projects (id, name, path, tech_stack, status, priority,
|
||||
pm_prompt, claude_md_path, forgejo_repo)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(id, name, path, _json_encode(tech_stack), status, priority,
|
||||
pm_prompt, claude_md_path, forgejo_repo),
|
||||
)
|
||||
conn.commit()
|
||||
return get_project(conn, id)
|
||||
|
||||
|
||||
def get_project(conn: sqlite3.Connection, id: str) -> dict | None:
|
||||
"""Get project by id."""
|
||||
row = conn.execute("SELECT * FROM projects WHERE id = ?", (id,)).fetchone()
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
def list_projects(conn: sqlite3.Connection, status: str | None = None) -> list[dict]:
|
||||
"""List projects, optionally filtered by status."""
|
||||
if status:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM projects WHERE status = ? ORDER BY priority, name",
|
||||
(status,),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM projects ORDER BY priority, name"
|
||||
).fetchall()
|
||||
return _rows_to_list(rows)
|
||||
|
||||
|
||||
def update_project(conn: sqlite3.Connection, id: str, **fields) -> dict:
|
||||
"""Update project fields. Returns updated project."""
|
||||
if not fields:
|
||||
return get_project(conn, id)
|
||||
for key in ("tech_stack",):
|
||||
if key in fields:
|
||||
fields[key] = _json_encode(fields[key])
|
||||
sets = ", ".join(f"{k} = ?" for k in fields)
|
||||
vals = list(fields.values()) + [id]
|
||||
conn.execute(f"UPDATE projects SET {sets} WHERE id = ?", vals)
|
||||
conn.commit()
|
||||
return get_project(conn, id)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tasks
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def create_task(
|
||||
conn: sqlite3.Connection,
|
||||
id: str,
|
||||
project_id: str,
|
||||
title: str,
|
||||
status: str = "pending",
|
||||
priority: int = 5,
|
||||
assigned_role: str | None = None,
|
||||
parent_task_id: str | None = None,
|
||||
brief: dict | None = None,
|
||||
spec: dict | None = None,
|
||||
forgejo_issue_id: int | None = None,
|
||||
) -> dict:
|
||||
"""Create a task linked to a project."""
|
||||
conn.execute(
|
||||
"""INSERT INTO tasks (id, project_id, title, status, priority,
|
||||
assigned_role, parent_task_id, brief, spec, forgejo_issue_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(id, project_id, title, status, priority, assigned_role,
|
||||
parent_task_id, _json_encode(brief), _json_encode(spec),
|
||||
forgejo_issue_id),
|
||||
)
|
||||
conn.commit()
|
||||
return get_task(conn, id)
|
||||
|
||||
|
||||
def get_task(conn: sqlite3.Connection, id: str) -> dict | None:
|
||||
"""Get task by id."""
|
||||
row = conn.execute("SELECT * FROM tasks WHERE id = ?", (id,)).fetchone()
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
def list_tasks(
|
||||
conn: sqlite3.Connection,
|
||||
project_id: str | None = None,
|
||||
status: str | None = None,
|
||||
) -> list[dict]:
|
||||
"""List tasks with optional project/status filters."""
|
||||
query = "SELECT * FROM tasks WHERE 1=1"
|
||||
params: list = []
|
||||
if project_id:
|
||||
query += " AND project_id = ?"
|
||||
params.append(project_id)
|
||||
if status:
|
||||
query += " AND status = ?"
|
||||
params.append(status)
|
||||
query += " ORDER BY priority, created_at"
|
||||
return _rows_to_list(conn.execute(query, params).fetchall())
|
||||
|
||||
|
||||
def update_task(conn: sqlite3.Connection, id: str, **fields) -> dict:
|
||||
"""Update task fields. Auto-sets updated_at."""
|
||||
if not fields:
|
||||
return get_task(conn, id)
|
||||
json_cols = ("brief", "spec", "review", "test_result", "security_result")
|
||||
for key in json_cols:
|
||||
if key in fields:
|
||||
fields[key] = _json_encode(fields[key])
|
||||
fields["updated_at"] = datetime.now().isoformat()
|
||||
sets = ", ".join(f"{k} = ?" for k in fields)
|
||||
vals = list(fields.values()) + [id]
|
||||
conn.execute(f"UPDATE tasks SET {sets} WHERE id = ?", vals)
|
||||
conn.commit()
|
||||
return get_task(conn, id)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Decisions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def add_decision(
|
||||
conn: sqlite3.Connection,
|
||||
project_id: str,
|
||||
type: str,
|
||||
title: str,
|
||||
description: str,
|
||||
category: str | None = None,
|
||||
tags: list | None = None,
|
||||
task_id: str | None = None,
|
||||
) -> dict:
|
||||
"""Record a decision, gotcha, or convention for a project."""
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO decisions (project_id, task_id, type, category,
|
||||
title, description, tags)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(project_id, task_id, type, category, title, description,
|
||||
_json_encode(tags)),
|
||||
)
|
||||
conn.commit()
|
||||
row = conn.execute(
|
||||
"SELECT * FROM decisions WHERE id = ?", (cur.lastrowid,)
|
||||
).fetchone()
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
def get_decisions(
|
||||
conn: sqlite3.Connection,
|
||||
project_id: str,
|
||||
category: str | None = None,
|
||||
tags: list | None = None,
|
||||
types: list | None = None,
|
||||
limit: int | None = None,
|
||||
) -> list[dict]:
|
||||
"""Query decisions for a project with optional filters.
|
||||
|
||||
tags: matches if ANY tag is present (OR logic via json_each).
|
||||
types: filter by decision type (decision, gotcha, workaround, etc).
|
||||
"""
|
||||
query = "SELECT DISTINCT d.* FROM decisions d WHERE d.project_id = ?"
|
||||
params: list = [project_id]
|
||||
if category:
|
||||
query += " AND d.category = ?"
|
||||
params.append(category)
|
||||
if types:
|
||||
placeholders = ", ".join("?" for _ in types)
|
||||
query += f" AND d.type IN ({placeholders})"
|
||||
params.extend(types)
|
||||
if tags:
|
||||
query += """ AND d.id IN (
|
||||
SELECT d2.id FROM decisions d2, json_each(d2.tags) AS t
|
||||
WHERE t.value IN ({})
|
||||
)""".format(", ".join("?" for _ in tags))
|
||||
params.extend(tags)
|
||||
query += " ORDER BY d.created_at DESC"
|
||||
if limit:
|
||||
query += " LIMIT ?"
|
||||
params.append(limit)
|
||||
return _rows_to_list(conn.execute(query, params).fetchall())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Modules
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def add_module(
|
||||
conn: sqlite3.Connection,
|
||||
project_id: str,
|
||||
name: str,
|
||||
type: str,
|
||||
path: str,
|
||||
description: str | None = None,
|
||||
owner_role: str | None = None,
|
||||
dependencies: list | None = None,
|
||||
) -> dict:
|
||||
"""Register a project module."""
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO modules (project_id, name, type, path, description,
|
||||
owner_role, dependencies)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(project_id, name, type, path, description, owner_role,
|
||||
_json_encode(dependencies)),
|
||||
)
|
||||
conn.commit()
|
||||
row = conn.execute(
|
||||
"SELECT * FROM modules WHERE id = ?", (cur.lastrowid,)
|
||||
).fetchone()
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
def get_modules(conn: sqlite3.Connection, project_id: str) -> list[dict]:
|
||||
"""Get all modules for a project."""
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM modules WHERE project_id = ? ORDER BY type, name",
|
||||
(project_id,),
|
||||
).fetchall()
|
||||
return _rows_to_list(rows)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent Logs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def log_agent_run(
|
||||
conn: sqlite3.Connection,
|
||||
project_id: str,
|
||||
agent_role: str,
|
||||
action: str,
|
||||
task_id: str | None = None,
|
||||
session_id: str | None = None,
|
||||
input_summary: str | None = None,
|
||||
output_summary: str | None = None,
|
||||
tokens_used: int | None = None,
|
||||
model: str | None = None,
|
||||
cost_usd: float | None = None,
|
||||
success: bool = True,
|
||||
error_message: str | None = None,
|
||||
duration_seconds: int | None = None,
|
||||
) -> dict:
|
||||
"""Log an agent execution run."""
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO agent_logs (project_id, task_id, agent_role, session_id,
|
||||
action, input_summary, output_summary, tokens_used, model,
|
||||
cost_usd, success, error_message, duration_seconds)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(project_id, task_id, agent_role, session_id, action, input_summary,
|
||||
output_summary, tokens_used, model, cost_usd, success,
|
||||
error_message, duration_seconds),
|
||||
)
|
||||
conn.commit()
|
||||
row = conn.execute(
|
||||
"SELECT * FROM agent_logs WHERE id = ?", (cur.lastrowid,)
|
||||
).fetchone()
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pipelines
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def create_pipeline(
|
||||
conn: sqlite3.Connection,
|
||||
task_id: str,
|
||||
project_id: str,
|
||||
route_type: str,
|
||||
steps: list | dict,
|
||||
) -> dict:
|
||||
"""Create a new pipeline run."""
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO pipelines (task_id, project_id, route_type, steps)
|
||||
VALUES (?, ?, ?, ?)""",
|
||||
(task_id, project_id, route_type, _json_encode(steps)),
|
||||
)
|
||||
conn.commit()
|
||||
row = conn.execute(
|
||||
"SELECT * FROM pipelines WHERE id = ?", (cur.lastrowid,)
|
||||
).fetchone()
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
def update_pipeline(
|
||||
conn: sqlite3.Connection,
|
||||
id: int,
|
||||
status: str | None = None,
|
||||
total_cost_usd: float | None = None,
|
||||
total_tokens: int | None = None,
|
||||
total_duration_seconds: int | None = None,
|
||||
) -> dict:
|
||||
"""Update pipeline status and stats."""
|
||||
fields: dict[str, Any] = {}
|
||||
if status is not None:
|
||||
fields["status"] = status
|
||||
if status in ("completed", "failed", "cancelled"):
|
||||
fields["completed_at"] = datetime.now().isoformat()
|
||||
if total_cost_usd is not None:
|
||||
fields["total_cost_usd"] = total_cost_usd
|
||||
if total_tokens is not None:
|
||||
fields["total_tokens"] = total_tokens
|
||||
if total_duration_seconds is not None:
|
||||
fields["total_duration_seconds"] = total_duration_seconds
|
||||
if fields:
|
||||
sets = ", ".join(f"{k} = ?" for k in fields)
|
||||
vals = list(fields.values()) + [id]
|
||||
conn.execute(f"UPDATE pipelines SET {sets} WHERE id = ?", vals)
|
||||
conn.commit()
|
||||
row = conn.execute(
|
||||
"SELECT * FROM pipelines WHERE id = ?", (id,)
|
||||
).fetchone()
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Support
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def create_ticket(
|
||||
conn: sqlite3.Connection,
|
||||
project_id: str,
|
||||
source: str,
|
||||
client_message: str,
|
||||
client_id: str | None = None,
|
||||
classification: str | None = None,
|
||||
) -> dict:
|
||||
"""Create a support ticket."""
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO support_tickets (project_id, source, client_id,
|
||||
client_message, classification)
|
||||
VALUES (?, ?, ?, ?, ?)""",
|
||||
(project_id, source, client_id, client_message, classification),
|
||||
)
|
||||
conn.commit()
|
||||
row = conn.execute(
|
||||
"SELECT * FROM support_tickets WHERE id = ?", (cur.lastrowid,)
|
||||
).fetchone()
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
def list_tickets(
|
||||
conn: sqlite3.Connection,
|
||||
project_id: str | None = None,
|
||||
status: str | None = None,
|
||||
) -> list[dict]:
|
||||
"""List support tickets with optional filters."""
|
||||
query = "SELECT * FROM support_tickets WHERE 1=1"
|
||||
params: list = []
|
||||
if project_id:
|
||||
query += " AND project_id = ?"
|
||||
params.append(project_id)
|
||||
if status:
|
||||
query += " AND status = ?"
|
||||
params.append(status)
|
||||
query += " ORDER BY created_at DESC"
|
||||
return _rows_to_list(conn.execute(query, params).fetchall())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Statistics / Dashboard
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_project_summary(conn: sqlite3.Connection) -> list[dict]:
|
||||
"""Get all projects with task counts by status."""
|
||||
rows = conn.execute("""
|
||||
SELECT p.*,
|
||||
COUNT(t.id) AS total_tasks,
|
||||
SUM(CASE WHEN t.status = 'done' THEN 1 ELSE 0 END) AS done_tasks,
|
||||
SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) AS active_tasks,
|
||||
SUM(CASE WHEN t.status = 'blocked' THEN 1 ELSE 0 END) AS blocked_tasks
|
||||
FROM projects p
|
||||
LEFT JOIN tasks t ON t.project_id = p.id
|
||||
GROUP BY p.id
|
||||
ORDER BY p.priority, p.name
|
||||
""").fetchall()
|
||||
return _rows_to_list(rows)
|
||||
|
||||
|
||||
def get_cost_summary(conn: sqlite3.Connection, days: int = 7) -> list[dict]:
|
||||
"""Get cost summary by project for the last N days."""
|
||||
rows = conn.execute("""
|
||||
SELECT
|
||||
p.id AS project_id,
|
||||
p.name AS project_name,
|
||||
COUNT(a.id) AS runs,
|
||||
COALESCE(SUM(a.tokens_used), 0) AS total_tokens,
|
||||
COALESCE(SUM(a.cost_usd), 0) AS total_cost_usd,
|
||||
COALESCE(SUM(a.duration_seconds), 0) AS total_duration_seconds
|
||||
FROM projects p
|
||||
LEFT JOIN agent_logs a ON a.project_id = p.id
|
||||
AND a.created_at >= datetime('now', ?)
|
||||
GROUP BY p.id
|
||||
HAVING runs > 0
|
||||
ORDER BY total_cost_usd DESC
|
||||
""", (f"-{days} days",)).fetchall()
|
||||
return _rows_to_list(rows)
|
||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
240
tests/test_models.py
Normal file
240
tests/test_models.py
Normal file
|
|
@ -0,0 +1,240 @@
|
|||
"""Tests for core/models.py — all functions, in-memory SQLite."""
|
||||
|
||||
import pytest
|
||||
from core.db import init_db
|
||||
from core import models
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def conn():
|
||||
"""Fresh in-memory DB for each test."""
|
||||
c = init_db(db_path=":memory:")
|
||||
yield c
|
||||
c.close()
|
||||
|
||||
|
||||
# -- Projects --
|
||||
|
||||
def test_create_and_get_project(conn):
|
||||
p = models.create_project(conn, "vdol", "В долю поперёк", "~/projects/vdolipoperek",
|
||||
tech_stack=["vue3", "nuxt"])
|
||||
assert p["id"] == "vdol"
|
||||
assert p["tech_stack"] == ["vue3", "nuxt"]
|
||||
assert p["status"] == "active"
|
||||
|
||||
fetched = models.get_project(conn, "vdol")
|
||||
assert fetched["name"] == "В долю поперёк"
|
||||
|
||||
|
||||
def test_get_project_not_found(conn):
|
||||
assert models.get_project(conn, "nope") is None
|
||||
|
||||
|
||||
def test_list_projects_filter(conn):
|
||||
models.create_project(conn, "a", "A", "/a", status="active")
|
||||
models.create_project(conn, "b", "B", "/b", status="paused")
|
||||
models.create_project(conn, "c", "C", "/c", status="active")
|
||||
|
||||
assert len(models.list_projects(conn)) == 3
|
||||
assert len(models.list_projects(conn, status="active")) == 2
|
||||
assert len(models.list_projects(conn, status="paused")) == 1
|
||||
|
||||
|
||||
def test_update_project(conn):
|
||||
models.create_project(conn, "x", "X", "/x", priority=5)
|
||||
updated = models.update_project(conn, "x", priority=1, status="maintenance")
|
||||
assert updated["priority"] == 1
|
||||
assert updated["status"] == "maintenance"
|
||||
|
||||
|
||||
def test_update_project_tech_stack_json(conn):
|
||||
models.create_project(conn, "x", "X", "/x", tech_stack=["python"])
|
||||
updated = models.update_project(conn, "x", tech_stack=["python", "fastapi"])
|
||||
assert updated["tech_stack"] == ["python", "fastapi"]
|
||||
|
||||
|
||||
# -- Tasks --
|
||||
|
||||
def test_create_and_get_task(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
t = models.create_task(conn, "P1-001", "p1", "Fix bug",
|
||||
brief={"summary": "broken login"})
|
||||
assert t["id"] == "P1-001"
|
||||
assert t["brief"] == {"summary": "broken login"}
|
||||
assert t["status"] == "pending"
|
||||
|
||||
|
||||
def test_list_tasks_filters(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
models.create_project(conn, "p2", "P2", "/p2")
|
||||
models.create_task(conn, "P1-001", "p1", "Task A", status="pending")
|
||||
models.create_task(conn, "P1-002", "p1", "Task B", status="done")
|
||||
models.create_task(conn, "P2-001", "p2", "Task C", status="pending")
|
||||
|
||||
assert len(models.list_tasks(conn)) == 3
|
||||
assert len(models.list_tasks(conn, project_id="p1")) == 2
|
||||
assert len(models.list_tasks(conn, status="pending")) == 2
|
||||
assert len(models.list_tasks(conn, project_id="p1", status="done")) == 1
|
||||
|
||||
|
||||
def test_update_task(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
models.create_task(conn, "P1-001", "p1", "Task")
|
||||
updated = models.update_task(conn, "P1-001", status="in_progress",
|
||||
spec={"steps": [1, 2, 3]})
|
||||
assert updated["status"] == "in_progress"
|
||||
assert updated["spec"] == {"steps": [1, 2, 3]}
|
||||
assert updated["updated_at"] is not None
|
||||
|
||||
|
||||
def test_subtask(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
models.create_task(conn, "P1-001", "p1", "Parent")
|
||||
child = models.create_task(conn, "P1-001a", "p1", "Child",
|
||||
parent_task_id="P1-001")
|
||||
assert child["parent_task_id"] == "P1-001"
|
||||
|
||||
|
||||
# -- Decisions --
|
||||
|
||||
def test_add_and_get_decisions(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
d = models.add_decision(conn, "p1", "gotcha", "iOS Safari bottom sheet",
|
||||
"position:fixed breaks on iOS Safari",
|
||||
category="ui", tags=["ios-safari", "css"])
|
||||
assert d["type"] == "gotcha"
|
||||
assert d["tags"] == ["ios-safari", "css"]
|
||||
|
||||
results = models.get_decisions(conn, "p1")
|
||||
assert len(results) == 1
|
||||
|
||||
|
||||
def test_decisions_filter_by_category(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
models.add_decision(conn, "p1", "decision", "Use WAL", "perf",
|
||||
category="architecture")
|
||||
models.add_decision(conn, "p1", "gotcha", "Safari bug", "css",
|
||||
category="ui")
|
||||
assert len(models.get_decisions(conn, "p1", category="ui")) == 1
|
||||
|
||||
|
||||
def test_decisions_filter_by_tags(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
models.add_decision(conn, "p1", "gotcha", "Bug A", "desc",
|
||||
tags=["safari", "css"])
|
||||
models.add_decision(conn, "p1", "gotcha", "Bug B", "desc",
|
||||
tags=["chrome", "js"])
|
||||
models.add_decision(conn, "p1", "gotcha", "Bug C", "desc",
|
||||
tags=["safari", "js"])
|
||||
|
||||
assert len(models.get_decisions(conn, "p1", tags=["safari"])) == 2
|
||||
assert len(models.get_decisions(conn, "p1", tags=["js"])) == 2
|
||||
assert len(models.get_decisions(conn, "p1", tags=["css"])) == 1
|
||||
|
||||
|
||||
def test_decisions_filter_by_types(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
models.add_decision(conn, "p1", "decision", "A", "a")
|
||||
models.add_decision(conn, "p1", "gotcha", "B", "b")
|
||||
models.add_decision(conn, "p1", "workaround", "C", "c")
|
||||
|
||||
assert len(models.get_decisions(conn, "p1", types=["gotcha", "workaround"])) == 2
|
||||
|
||||
|
||||
def test_decisions_limit(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
for i in range(10):
|
||||
models.add_decision(conn, "p1", "decision", f"D{i}", f"desc{i}")
|
||||
assert len(models.get_decisions(conn, "p1", limit=3)) == 3
|
||||
|
||||
|
||||
# -- Modules --
|
||||
|
||||
def test_add_and_get_modules(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
m = models.add_module(conn, "p1", "search", "frontend", "src/search/",
|
||||
description="Search UI", dependencies=["auth"])
|
||||
assert m["name"] == "search"
|
||||
assert m["dependencies"] == ["auth"]
|
||||
|
||||
mods = models.get_modules(conn, "p1")
|
||||
assert len(mods) == 1
|
||||
|
||||
|
||||
# -- Agent Logs --
|
||||
|
||||
def test_log_agent_run(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
log = models.log_agent_run(conn, "p1", "developer", "implement",
|
||||
tokens_used=5000, model="sonnet",
|
||||
cost_usd=0.015, duration_seconds=45)
|
||||
assert log["agent_role"] == "developer"
|
||||
assert log["cost_usd"] == 0.015
|
||||
assert log["success"] == 1 # SQLite boolean
|
||||
|
||||
|
||||
# -- Pipelines --
|
||||
|
||||
def test_create_and_update_pipeline(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
models.create_task(conn, "P1-001", "p1", "Task")
|
||||
pipe = models.create_pipeline(conn, "P1-001", "p1", "feature",
|
||||
[{"step": "architect"}, {"step": "dev"}])
|
||||
assert pipe["status"] == "running"
|
||||
assert pipe["steps"] == [{"step": "architect"}, {"step": "dev"}]
|
||||
|
||||
updated = models.update_pipeline(conn, pipe["id"], status="completed",
|
||||
total_cost_usd=0.05, total_tokens=10000)
|
||||
assert updated["status"] == "completed"
|
||||
assert updated["completed_at"] is not None
|
||||
|
||||
|
||||
# -- Support --
|
||||
|
||||
def test_create_and_list_tickets(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
t = models.create_ticket(conn, "p1", "telegram_bot", "Не работает поиск",
|
||||
client_id="tg:12345", classification="bug")
|
||||
assert t["source"] == "telegram_bot"
|
||||
assert t["status"] == "new"
|
||||
|
||||
tickets = models.list_tickets(conn, project_id="p1")
|
||||
assert len(tickets) == 1
|
||||
|
||||
assert len(models.list_tickets(conn, status="resolved")) == 0
|
||||
|
||||
|
||||
# -- Statistics --
|
||||
|
||||
def test_project_summary(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
models.create_task(conn, "P1-001", "p1", "A", status="done")
|
||||
models.create_task(conn, "P1-002", "p1", "B", status="in_progress")
|
||||
models.create_task(conn, "P1-003", "p1", "C", status="blocked")
|
||||
|
||||
summary = models.get_project_summary(conn)
|
||||
assert len(summary) == 1
|
||||
s = summary[0]
|
||||
assert s["total_tasks"] == 3
|
||||
assert s["done_tasks"] == 1
|
||||
assert s["active_tasks"] == 1
|
||||
assert s["blocked_tasks"] == 1
|
||||
|
||||
|
||||
def test_cost_summary(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
models.log_agent_run(conn, "p1", "dev", "implement",
|
||||
cost_usd=0.10, tokens_used=5000)
|
||||
models.log_agent_run(conn, "p1", "reviewer", "review",
|
||||
cost_usd=0.05, tokens_used=2000)
|
||||
|
||||
costs = models.get_cost_summary(conn, days=1)
|
||||
assert len(costs) == 1
|
||||
assert costs[0]["total_cost_usd"] == pytest.approx(0.15)
|
||||
assert costs[0]["total_tokens"] == 7000
|
||||
assert costs[0]["runs"] == 2
|
||||
|
||||
|
||||
def test_cost_summary_empty(conn):
|
||||
models.create_project(conn, "p1", "P1", "/p1")
|
||||
assert models.get_cost_summary(conn, days=7) == []
|
||||
Loading…
Add table
Add a link
Reference in a new issue