kin/cli/main.py

1007 lines
37 KiB
Python
Raw Normal View History

"""
Kin CLI command-line interface for the multi-agent orchestrator.
Uses core.models for all data access, never raw SQL.
"""
import json
import os
import sys
from datetime import datetime
from pathlib import Path
import click
# Ensure project root is on sys.path
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.db import init_db
from core import models
from core import hooks as hooks_module
from agents.bootstrap import (
detect_tech_stack, detect_modules, extract_decisions_from_claude_md,
find_vault_root, scan_obsidian, format_preview, save_to_db,
)
DEFAULT_DB = Path.home() / ".kin" / "kin.db"
def get_conn(db_path: Path = DEFAULT_DB):
db_path.parent.mkdir(parents=True, exist_ok=True)
return init_db(db_path)
def _parse_json(ctx, param, value):
"""Click callback: parse a JSON string or return None."""
if value is None:
return None
try:
return json.loads(value)
except json.JSONDecodeError:
raise click.BadParameter(f"Invalid JSON: {value}")
def _table(headers: list[str], rows: list[list[str]], min_width: int = 6):
"""Render a simple aligned text table."""
widths = [max(min_width, len(h)) for h in headers]
for row in rows:
for i, cell in enumerate(row):
if i < len(widths):
widths[i] = max(widths[i], len(str(cell)))
fmt = " ".join(f"{{:<{w}}}" for w in widths)
lines = [fmt.format(*headers), fmt.format(*("-" * w for w in widths))]
for row in rows:
lines.append(fmt.format(*[str(c) for c in row]))
return "\n".join(lines)
# ===========================================================================
# Root group
# ===========================================================================
@click.group()
@click.option("--db", type=click.Path(), default=None, envvar="KIN_DB",
help="Path to kin.db (default: ~/.kin/kin.db, or $KIN_DB)")
@click.pass_context
def cli(ctx, db):
"""Kin — multi-agent project orchestrator."""
ctx.ensure_object(dict)
db_path = Path(db) if db else DEFAULT_DB
ctx.obj["conn"] = get_conn(db_path)
# ===========================================================================
# project
# ===========================================================================
@cli.group()
def project():
"""Manage projects."""
@project.command("add")
@click.argument("id")
@click.argument("name")
@click.argument("path")
@click.option("--tech-stack", callback=_parse_json, default=None, help='JSON array, e.g. \'["vue3","nuxt"]\'')
@click.option("--status", default="active")
@click.option("--priority", type=int, default=5)
@click.option("--language", default="ru", help="Response language for agents (ru, en, etc.)")
@click.pass_context
def project_add(ctx, id, name, path, tech_stack, status, priority, language):
"""Add a new project."""
conn = ctx.obj["conn"]
p = models.create_project(conn, id, name, path,
tech_stack=tech_stack, status=status, priority=priority,
language=language)
click.echo(f"Created project: {p['id']} ({p['name']})")
@cli.command("new-project")
@click.argument("description")
@click.option("--id", "project_id", required=True, help="Project ID")
@click.option("--name", required=True, help="Project name")
@click.option("--path", required=True, help="Project path")
@click.option("--roles", default="business,market,tech", show_default=True,
help="Comma-separated roles: business,market,legal,tech,ux,marketer")
@click.option("--tech-stack", default=None, help="Comma-separated tech stack")
@click.option("--priority", type=int, default=5, show_default=True)
@click.option("--language", default="ru", show_default=True)
@click.pass_context
def new_project(ctx, description, project_id, name, path, roles, tech_stack, priority, language):
"""Create a new project with a sequential research phase pipeline.
DESCRIPTION free-text project description for the agents.
Role aliases: business=business_analyst, market=market_researcher,
legal=legal_researcher, tech=tech_researcher, ux=ux_designer, marketer=marketer.
Architect is added automatically as the last phase.
"""
from core.phases import create_project_with_phases, validate_roles, ROLE_LABELS
_ALIASES = {
"business": "business_analyst",
"market": "market_researcher",
"legal": "legal_researcher",
"tech": "tech_researcher",
"ux": "ux_designer",
}
raw_roles = [r.strip().lower() for r in roles.split(",") if r.strip()]
expanded = [_ALIASES.get(r, r) for r in raw_roles]
clean_roles = validate_roles(expanded)
if not clean_roles:
click.echo("Error: no valid research roles specified.", err=True)
raise SystemExit(1)
ts = [s.strip() for s in tech_stack.split(",") if s.strip()] if tech_stack else None
conn = ctx.obj["conn"]
if models.get_project(conn, project_id):
click.echo(f"Error: project '{project_id}' already exists.", err=True)
raise SystemExit(1)
try:
result = create_project_with_phases(
conn, project_id, name, path,
description=description,
selected_roles=clean_roles,
tech_stack=ts,
priority=priority,
language=language,
)
except ValueError as e:
click.echo(f"Error: {e}", err=True)
raise SystemExit(1)
click.echo(f"Created project: {result['project']['id']} ({result['project']['name']})")
click.echo(f"Description: {description}")
click.echo("")
phases = result["phases"]
rows = [
[str(p["id"]), str(p["phase_order"] + 1), p["role"], p["status"], p.get("task_id") or ""]
for p in phases
]
click.echo(_table(["ID", "#", "Role", "Status", "Task"], rows))
@project.command("list")
@click.option("--status", default=None)
@click.pass_context
def project_list(ctx, status):
"""List projects."""
conn = ctx.obj["conn"]
projects = models.list_projects(conn, status=status)
if not projects:
click.echo("No projects found.")
return
rows = [[p["id"], p["name"], p["status"], str(p["priority"]), p["path"]]
for p in projects]
click.echo(_table(["ID", "Name", "Status", "Pri", "Path"], rows))
@project.command("show")
@click.argument("id")
@click.pass_context
def project_show(ctx, id):
"""Show project details."""
conn = ctx.obj["conn"]
p = models.get_project(conn, id)
if not p:
click.echo(f"Project '{id}' not found.", err=True)
raise SystemExit(1)
click.echo(f"Project: {p['id']}")
click.echo(f" Name: {p['name']}")
click.echo(f" Path: {p['path']}")
click.echo(f" Status: {p['status']}")
click.echo(f" Priority: {p['priority']}")
click.echo(f" Mode: {p.get('execution_mode') or 'review'}")
if p.get("tech_stack"):
click.echo(f" Tech stack: {', '.join(p['tech_stack'])}")
if p.get("forgejo_repo"):
click.echo(f" Forgejo: {p['forgejo_repo']}")
click.echo(f" Created: {p['created_at']}")
@project.command("set-mode")
@click.option("--project", "project_id", required=True, help="Project ID")
@click.argument("mode", type=click.Choice(["auto", "review"]))
@click.pass_context
def project_set_mode(ctx, project_id, mode):
"""Set execution mode for a project (auto|review)."""
conn = ctx.obj["conn"]
p = models.get_project(conn, project_id)
if not p:
click.echo(f"Project '{project_id}' not found.", err=True)
raise SystemExit(1)
models.update_project(conn, project_id, execution_mode=mode)
click.echo(f"Project '{project_id}' execution_mode set to '{mode}'.")
# ===========================================================================
# task
# ===========================================================================
@cli.group()
def task():
"""Manage tasks."""
@task.command("add")
@click.argument("project_id")
@click.argument("title")
@click.option("--type", "route_type", type=click.Choice(["debug", "feature", "refactor", "hotfix"]), default=None)
@click.option("--priority", type=int, default=5)
@click.option("--category", "-c", default=None,
help=f"Task category: {', '.join(models.TASK_CATEGORIES)}")
@click.pass_context
def task_add(ctx, project_id, title, route_type, priority, category):
"""Add a task to a project. ID is auto-generated (PROJ-001 or PROJ-CAT-001)."""
conn = ctx.obj["conn"]
p = models.get_project(conn, project_id)
if not p:
click.echo(f"Project '{project_id}' not found.", err=True)
raise SystemExit(1)
if category:
category = category.upper()
if category not in models.TASK_CATEGORIES:
click.echo(
f"Invalid category '{category}'. Must be one of: {', '.join(models.TASK_CATEGORIES)}",
err=True,
)
raise SystemExit(1)
task_id = models.next_task_id(conn, project_id, category=category)
brief = {"route_type": route_type} if route_type else None
t = models.create_task(conn, task_id, project_id, title,
priority=priority, brief=brief, category=category)
click.echo(f"Created task: {t['id']}{t['title']}")
@task.command("list")
@click.option("--project", "project_id", default=None)
@click.option("--status", default=None)
@click.pass_context
def task_list(ctx, project_id, status):
"""List tasks."""
conn = ctx.obj["conn"]
tasks = models.list_tasks(conn, project_id=project_id, status=status)
if not tasks:
click.echo("No tasks found.")
return
rows = [[t["id"], t["project_id"], t["title"][:40], t["status"],
str(t["priority"]), t.get("assigned_role") or "-"]
for t in tasks]
click.echo(_table(["ID", "Project", "Title", "Status", "Pri", "Role"], rows))
@task.command("show")
@click.argument("id")
@click.pass_context
def task_show(ctx, id):
"""Show task details."""
conn = ctx.obj["conn"]
t = models.get_task(conn, id)
if not t:
click.echo(f"Task '{id}' not found.", err=True)
raise SystemExit(1)
effective_mode = models.get_effective_mode(conn, t["project_id"], t["id"])
task_mode = t.get("execution_mode")
mode_label = f"{effective_mode} (overridden)" if task_mode else f"{effective_mode} (inherited)"
click.echo(f"Task: {t['id']}")
click.echo(f" Project: {t['project_id']}")
click.echo(f" Title: {t['title']}")
click.echo(f" Status: {t['status']}")
click.echo(f" Priority: {t['priority']}")
click.echo(f" Mode: {mode_label}")
if t.get("assigned_role"):
click.echo(f" Role: {t['assigned_role']}")
if t.get("parent_task_id"):
click.echo(f" Parent: {t['parent_task_id']}")
if t.get("brief"):
click.echo(f" Brief: {json.dumps(t['brief'], ensure_ascii=False)}")
if t.get("spec"):
click.echo(f" Spec: {json.dumps(t['spec'], ensure_ascii=False)}")
click.echo(f" Created: {t['created_at']}")
click.echo(f" Updated: {t['updated_at']}")
@task.command("update")
@click.argument("task_id")
@click.option("--status", type=click.Choice(models.VALID_TASK_STATUSES),
default=None, help="New status")
@click.option("--priority", type=int, default=None, help="New priority (1-10)")
@click.option("--mode", "mode", type=click.Choice(["auto", "review"]),
default=None, help="Override execution mode for this task")
@click.pass_context
def task_update(ctx, task_id, status, priority, mode):
"""Update a task's status, priority, or execution mode."""
conn = ctx.obj["conn"]
t = models.get_task(conn, task_id)
if not t:
click.echo(f"Task '{task_id}' not found.", err=True)
raise SystemExit(1)
fields = {}
if status is not None:
fields["status"] = status
if priority is not None:
fields["priority"] = priority
if mode is not None:
fields["execution_mode"] = mode
if not fields:
click.echo("Nothing to update. Use --status, --priority, or --mode.", err=True)
raise SystemExit(1)
updated = models.update_task(conn, task_id, **fields)
click.echo(f"Updated {updated['id']}: status={updated['status']}, priority={updated['priority']}, mode={updated.get('execution_mode') or '(inherited)'}")
# ===========================================================================
# decision
# ===========================================================================
@cli.group()
def decision():
"""Manage decisions and gotchas."""
@decision.command("add")
@click.argument("project_id")
@click.argument("type", type=click.Choice(["decision", "gotcha", "workaround", "rejected_approach", "convention"]))
@click.argument("title")
@click.argument("description")
@click.option("--category", default=None)
@click.option("--tags", callback=_parse_json, default=None, help='JSON array, e.g. \'["ios","css"]\'')
@click.option("--task-id", default=None)
@click.pass_context
def decision_add(ctx, project_id, type, title, description, category, tags, task_id):
"""Record a decision, gotcha, or convention."""
conn = ctx.obj["conn"]
p = models.get_project(conn, project_id)
if not p:
click.echo(f"Project '{project_id}' not found.", err=True)
raise SystemExit(1)
d = models.add_decision(conn, project_id, type, title, description,
category=category, tags=tags, task_id=task_id)
click.echo(f"Added {d['type']}: #{d['id']}{d['title']}")
@decision.command("list")
@click.argument("project_id")
@click.option("--category", default=None)
@click.option("--tag", multiple=True, help="Filter by tag (can repeat)")
@click.option("--type", "types", multiple=True,
type=click.Choice(["decision", "gotcha", "workaround", "rejected_approach", "convention"]),
help="Filter by type (can repeat)")
@click.pass_context
def decision_list(ctx, project_id, category, tag, types):
"""List decisions for a project."""
conn = ctx.obj["conn"]
tags_list = list(tag) if tag else None
types_list = list(types) if types else None
decisions = models.get_decisions(conn, project_id, category=category,
tags=tags_list, types=types_list)
if not decisions:
click.echo("No decisions found.")
return
rows = [[str(d["id"]), d["type"], d["category"] or "-",
d["title"][:50], d["created_at"][:10]]
for d in decisions]
click.echo(_table(["#", "Type", "Category", "Title", "Date"], rows))
# ===========================================================================
# module
# ===========================================================================
@cli.group()
def module():
"""Manage project modules."""
@module.command("add")
@click.argument("project_id")
@click.argument("name")
@click.argument("type", type=click.Choice(["frontend", "backend", "shared", "infra"]))
@click.argument("path")
@click.option("--description", default=None)
@click.option("--owner-role", default=None)
@click.pass_context
def module_add(ctx, project_id, name, type, path, description, owner_role):
"""Register a project module."""
conn = ctx.obj["conn"]
p = models.get_project(conn, project_id)
if not p:
click.echo(f"Project '{project_id}' not found.", err=True)
raise SystemExit(1)
m = models.add_module(conn, project_id, name, type, path,
description=description, owner_role=owner_role)
click.echo(f"Added module: {m['name']} ({m['type']}) at {m['path']}")
@module.command("list")
@click.argument("project_id")
@click.pass_context
def module_list(ctx, project_id):
"""List modules for a project."""
conn = ctx.obj["conn"]
mods = models.get_modules(conn, project_id)
if not mods:
click.echo("No modules found.")
return
rows = [[m["name"], m["type"], m["path"], m.get("owner_role") or "-",
m.get("description") or ""]
for m in mods]
click.echo(_table(["Name", "Type", "Path", "Owner", "Description"], rows))
# ===========================================================================
# status
# ===========================================================================
@cli.command("status")
@click.argument("project_id", required=False)
@click.pass_context
def status(ctx, project_id):
"""Project status overview. Without args — all projects. With id — detailed."""
conn = ctx.obj["conn"]
if project_id:
p = models.get_project(conn, project_id)
if not p:
click.echo(f"Project '{project_id}' not found.", err=True)
raise SystemExit(1)
tasks = models.list_tasks(conn, project_id=project_id)
counts = {}
for t in tasks:
counts[t["status"]] = counts.get(t["status"], 0) + 1
click.echo(f"Project: {p['id']}{p['name']} [{p['status']}]")
click.echo(f" Path: {p['path']}")
if p.get("tech_stack"):
click.echo(f" Stack: {', '.join(p['tech_stack'])}")
click.echo(f" Tasks: {len(tasks)} total")
for s in ["pending", "in_progress", "review", "done", "blocked"]:
if counts.get(s, 0) > 0:
click.echo(f" {s}: {counts[s]}")
if tasks:
click.echo("")
rows = [[t["id"], t["title"][:40], t["status"],
t.get("assigned_role") or "-"]
for t in tasks]
click.echo(_table(["ID", "Title", "Status", "Role"], rows))
else:
summary = models.get_project_summary(conn)
if not summary:
click.echo("No projects.")
return
rows = [[s["id"], s["name"][:25], s["status"], str(s["priority"]),
str(s["total_tasks"]), str(s["done_tasks"]),
str(s["active_tasks"]), str(s["blocked_tasks"])]
for s in summary]
click.echo(_table(
["ID", "Name", "Status", "Pri", "Total", "Done", "Active", "Blocked"],
rows,
))
# ===========================================================================
# cost
# ===========================================================================
@cli.command("cost")
@click.option("--last", "period", default="7d", help="Period: 7d, 30d, etc.")
@click.pass_context
def cost(ctx, period):
"""Show cost summary by project."""
# Parse period like "7d", "30d"
period = period.strip().lower()
if period.endswith("d"):
try:
days = int(period[:-1])
except ValueError:
click.echo(f"Invalid period: {period}. Use e.g. 7d, 30d.", err=True)
raise SystemExit(1)
else:
try:
days = int(period)
except ValueError:
click.echo(f"Invalid period: {period}. Use e.g. 7d, 30d.", err=True)
raise SystemExit(1)
conn = ctx.obj["conn"]
costs = models.get_cost_summary(conn, days=days)
if not costs:
click.echo(f"No agent runs in the last {days} days.")
return
rows = [[c["project_id"], c["project_name"][:25], str(c["runs"]),
f"{c['total_tokens']:,}", f"${c['total_cost_usd']:.4f}",
f"{c['total_duration_seconds']}s"]
for c in costs]
click.echo(f"Cost summary (last {days} days):\n")
click.echo(_table(
["Project", "Name", "Runs", "Tokens", "Cost", "Time"],
rows,
))
total = sum(c["total_cost_usd"] for c in costs)
click.echo(f"\nTotal: ${total:.4f}")
# ===========================================================================
# approve
# ===========================================================================
@cli.command("approve")
@click.argument("task_id")
@click.option("--followup", is_flag=True, help="Generate follow-up tasks from pipeline results")
@click.option("--decision", "decision_text", default=None, help="Record a decision with this text")
@click.pass_context
def approve_task(ctx, task_id, followup, decision_text):
"""Approve a task (set status=done). Optionally generate follow-ups."""
from core.followup import generate_followups, resolve_pending_action
conn = ctx.obj["conn"]
task = models.get_task(conn, task_id)
if not task:
click.echo(f"Task '{task_id}' not found.", err=True)
raise SystemExit(1)
models.update_task(conn, task_id, status="done")
click.echo(f"Approved: {task_id} → done")
if decision_text:
models.add_decision(
conn, task["project_id"], "decision", decision_text, decision_text,
task_id=task_id,
)
click.echo(f"Decision recorded.")
if followup:
click.echo("Generating follow-up tasks...")
result = generate_followups(conn, task_id)
created = result["created"]
pending = result["pending_actions"]
if created:
click.echo(f"Created {len(created)} follow-up tasks:")
for t in created:
click.echo(f" {t['id']}: {t['title']} (pri {t['priority']})")
for action in pending:
click.echo(f"\nPermission issue: {action['description']}")
click.echo(" 1. Rerun with --dangerously-skip-permissions")
click.echo(" 2. Create task for manual fix")
click.echo(" 3. Skip")
choice_input = click.prompt("Choice", type=click.Choice(["1", "2", "3"]), default="2")
choice_map = {"1": "rerun", "2": "manual_task", "3": "skip"}
choice = choice_map[choice_input]
result = resolve_pending_action(conn, task_id, action, choice)
if choice == "rerun" and result:
rr = result.get("rerun_result", {})
if rr.get("success"):
click.echo(" Re-run completed successfully.")
else:
click.echo(f" Re-run failed: {rr.get('error', 'unknown')}")
elif choice == "manual_task" and result:
click.echo(f" Created: {result['id']}: {result['title']}")
elif choice == "skip":
click.echo(" Skipped.")
if not created and not pending:
click.echo("No follow-up tasks generated.")
# ===========================================================================
# run
# ===========================================================================
@cli.command("run")
@click.argument("task_id")
@click.option("--dry-run", is_flag=True, help="Show pipeline plan without executing")
@click.option("--allow-write", is_flag=True, help="Allow agents to write files (skip permissions)")
@click.pass_context
def run_task(ctx, task_id, dry_run, allow_write):
"""Run a task through the agent pipeline.
PM decomposes the task into specialist steps, then the pipeline executes.
With --dry-run, shows the plan without running agents.
"""
from agents.runner import run_agent, run_pipeline
conn = ctx.obj["conn"]
task = models.get_task(conn, task_id)
if not task:
click.echo(f"Task '{task_id}' not found.", err=True)
raise SystemExit(1)
project_id = task["project_id"]
is_noninteractive = os.environ.get("KIN_NONINTERACTIVE") == "1"
click.echo(f"Task: {task['id']}{task['title']}")
# Step 1: PM decomposes
click.echo("Running PM to decompose task...")
pm_result = run_agent(
conn, "pm", task_id, project_id,
model="sonnet", dry_run=dry_run,
allow_write=allow_write, noninteractive=is_noninteractive,
)
if dry_run:
click.echo("\n--- PM Prompt (dry-run) ---")
click.echo(pm_result.get("prompt", "")[:2000])
click.echo("\n(Dry-run: PM would produce a pipeline JSON)")
return
if not pm_result["success"]:
click.echo(f"PM failed: {pm_result.get('output', 'unknown error')}", err=True)
raise SystemExit(1)
# Parse PM output for pipeline
output = pm_result.get("output")
if isinstance(output, str):
try:
output = json.loads(output)
except json.JSONDecodeError:
click.echo(f"PM returned non-JSON output:\n{output[:500]}", err=True)
raise SystemExit(1)
if not isinstance(output, dict) or "pipeline" not in output:
click.echo(f"PM output missing 'pipeline' key:\n{json.dumps(output, indent=2)[:500]}", err=True)
raise SystemExit(1)
pipeline_steps = output["pipeline"]
analysis = output.get("analysis", "")
2026-03-16 23:34:22 +02:00
# Save completion_mode from PM output to task (only if neither task nor project has explicit mode)
task_current = models.get_task(conn, task_id)
update_fields = {}
2026-03-16 23:34:22 +02:00
project = models.get_project(conn, project_id)
project_mode = project.get("execution_mode") if project else None
if not task_current.get("execution_mode") and not project_mode:
pm_completion_mode = models.validate_completion_mode(
output.get("completion_mode", "review")
)
update_fields["execution_mode"] = pm_completion_mode
import logging
logging.getLogger("kin").info(
"PM set completion_mode=%s for task %s", pm_completion_mode, task_id
)
# Save category from PM output (only if task has no category yet)
if not task_current.get("category"):
pm_category = output.get("category")
if pm_category and isinstance(pm_category, str):
pm_category = pm_category.upper()
if pm_category in models.TASK_CATEGORIES:
update_fields["category"] = pm_category
if update_fields:
models.update_task(conn, task_id, **update_fields)
click.echo(f"\nAnalysis: {analysis}")
click.echo(f"Pipeline ({len(pipeline_steps)} steps):")
for i, step in enumerate(pipeline_steps, 1):
click.echo(f" {i}. {step['role']} ({step.get('model', 'sonnet')}): {step.get('brief', '')}")
if is_noninteractive:
click.echo("\n[non-interactive] Auto-executing pipeline...")
elif not click.confirm("\nExecute pipeline?"):
click.echo("Aborted.")
return
# Step 2: Execute pipeline
click.echo("\nExecuting pipeline...")
result = run_pipeline(conn, task_id, pipeline_steps,
allow_write=allow_write,
noninteractive=is_noninteractive)
if result["success"]:
click.echo(f"\nPipeline completed: {result['steps_completed']} steps")
else:
click.echo(f"\nPipeline failed at step: {result.get('error', 'unknown')}", err=True)
if result.get("total_cost_usd"):
click.echo(f"Cost: ${result['total_cost_usd']:.4f}")
if result.get("total_duration_seconds"):
click.echo(f"Duration: {result['total_duration_seconds']}s")
2026-03-17 17:26:31 +02:00
# ===========================================================================
# watch / ps
# ===========================================================================
@cli.command("watch")
@click.argument("task_id")
@click.pass_context
def watch(ctx, task_id):
"""Monitor a running task in real time (updates every 5s)."""
from cli.watch import cmd_watch
cmd_watch(ctx.obj["conn"], task_id)
@cli.command("ps")
@click.pass_context
def ps(ctx):
"""List all running pipelines with PID and current step."""
from cli.watch import cmd_ps
cmd_ps(ctx.obj["conn"])
# ===========================================================================
# audit
# ===========================================================================
@cli.command("audit")
@click.argument("project_id")
@click.pass_context
def audit_backlog(ctx, project_id):
"""Audit pending tasks — check which are already implemented in the code."""
from agents.runner import run_audit
conn = ctx.obj["conn"]
p = models.get_project(conn, project_id)
if not p:
click.echo(f"Project '{project_id}' not found.", err=True)
raise SystemExit(1)
pending = models.list_tasks(conn, project_id=project_id, status="pending")
if not pending:
click.echo("No pending tasks to audit.")
return
click.echo(f"Auditing {len(pending)} pending tasks for {project_id}...")
# First pass: get results only (no auto_apply yet)
result = run_audit(conn, project_id)
if not result["success"]:
click.echo(f"Audit failed: {result.get('error', 'unknown')}", err=True)
raise SystemExit(1)
done = result.get("already_done", [])
still = result.get("still_pending", [])
unclear = result.get("unclear", [])
if done:
click.echo(f"\nAlready done ({len(done)}):")
for item in done:
click.echo(f" {item['id']}: {item.get('reason', '')}")
if still:
click.echo(f"\nStill pending ({len(still)}):")
for item in still:
click.echo(f" {item['id']}: {item.get('reason', '')}")
if unclear:
click.echo(f"\nUnclear ({len(unclear)}):")
for item in unclear:
click.echo(f" {item['id']}: {item.get('reason', '')}")
if result.get("cost_usd"):
click.echo(f"\nCost: ${result['cost_usd']:.4f}")
if result.get("duration_seconds"):
click.echo(f"Duration: {result['duration_seconds']}s")
# Apply: mark tasks as done after user confirmation
if done and click.confirm(f"\nMark {len(done)} tasks as done?"):
for item in done:
tid = item.get("id")
if tid:
t = models.get_task(conn, tid)
if t and t["project_id"] == project_id and t["status"] == "pending":
models.update_task(conn, tid, status="done")
click.echo(f"Marked {len(done)} tasks as done.")
# ===========================================================================
# bootstrap
# ===========================================================================
@cli.command("bootstrap")
@click.argument("path", type=click.Path(exists=True))
@click.option("--id", "project_id", required=True, help="Short project ID (e.g. vdol)")
@click.option("--name", required=True, help="Project display name")
@click.option("--vault", "vault_path", type=click.Path(), default=None,
help="Obsidian vault path (auto-detected if omitted)")
@click.option("-y", "--yes", is_flag=True, help="Skip confirmation")
@click.pass_context
def bootstrap(ctx, path, project_id, name, vault_path, yes):
"""Auto-detect project stack, modules, decisions and import into Kin."""
conn = ctx.obj["conn"]
project_path = Path(path).expanduser().resolve()
# Check if project already exists
existing = models.get_project(conn, project_id)
if existing:
click.echo(f"Project '{project_id}' already exists. Use 'kin project show {project_id}'.", err=True)
raise SystemExit(1)
# Detect everything
click.echo(f"Scanning {project_path} ...")
tech_stack = detect_tech_stack(project_path)
modules = detect_modules(project_path)
decisions = extract_decisions_from_claude_md(project_path, project_id, name)
# Obsidian
obsidian = None
vault_root = find_vault_root(Path(vault_path) if vault_path else None)
if vault_root:
dir_name = project_path.name
obsidian = scan_obsidian(vault_root, project_id, name, dir_name)
if not obsidian["tasks"] and not obsidian["decisions"]:
obsidian = None # Nothing found, don't clutter output
# Preview
click.echo("")
click.echo(format_preview(
project_id, name, str(project_path), tech_stack,
modules, decisions, obsidian,
))
click.echo("")
if not yes:
if not click.confirm("Save to kin.db?"):
click.echo("Aborted.")
return
save_to_db(conn, project_id, name, str(project_path),
tech_stack, modules, decisions, obsidian)
# Summary
task_count = 0
dec_count = len(decisions)
if obsidian:
task_count += len(obsidian.get("tasks", []))
dec_count += len(obsidian.get("decisions", []))
click.echo(f"Saved: 1 project, {len(modules)} modules, "
f"{dec_count} decisions, {task_count} tasks.")
# ===========================================================================
# hook
# ===========================================================================
@cli.group()
def hook():
"""Manage post-pipeline hooks."""
@hook.command("add")
@click.option("--project", "project_id", required=True, help="Project ID")
@click.option("--name", required=True, help="Hook name")
@click.option("--event", required=True, help="Event: pipeline_completed, step_completed")
@click.option("--command", required=True, help="Shell command to run")
@click.option("--module-path", default=None, help="Trigger only when module path matches (fnmatch)")
@click.option("--working-dir", default=None, help="Working directory for the command")
@click.pass_context
def hook_add(ctx, project_id, name, event, command, module_path, working_dir):
"""Add a post-pipeline hook to a project."""
conn = ctx.obj["conn"]
p = models.get_project(conn, project_id)
if not p:
click.echo(f"Project '{project_id}' not found.", err=True)
raise SystemExit(1)
h = hooks_module.create_hook(
conn, project_id, name, event, command,
trigger_module_path=module_path,
working_dir=working_dir,
)
click.echo(f"Created hook: #{h['id']} {h['name']} [{h['event']}] → {h['command']}")
@hook.command("list")
@click.option("--project", "project_id", required=True, help="Project ID")
@click.pass_context
def hook_list(ctx, project_id):
"""List hooks for a project."""
conn = ctx.obj["conn"]
hs = hooks_module.get_hooks(conn, project_id, enabled_only=False)
if not hs:
click.echo("No hooks found.")
return
rows = [
[str(h["id"]), h["name"], h["event"],
h["command"][:40], h.get("trigger_module_path") or "-",
"yes" if h["enabled"] else "no"]
for h in hs
]
click.echo(_table(["ID", "Name", "Event", "Command", "Module", "Enabled"], rows))
@hook.command("remove")
@click.argument("hook_id", type=int)
@click.pass_context
def hook_remove(ctx, hook_id):
"""Remove a hook by ID."""
conn = ctx.obj["conn"]
row = conn.execute("SELECT * FROM hooks WHERE id = ?", (hook_id,)).fetchone()
if not row:
click.echo(f"Hook #{hook_id} not found.", err=True)
raise SystemExit(1)
hooks_module.delete_hook(conn, hook_id)
click.echo(f"Removed hook #{hook_id}.")
@hook.command("logs")
@click.option("--project", "project_id", required=True, help="Project ID")
@click.option("--limit", default=20, help="Number of log entries (default: 20)")
@click.pass_context
def hook_logs(ctx, project_id, limit):
"""Show recent hook execution logs for a project."""
conn = ctx.obj["conn"]
logs = hooks_module.get_hook_logs(conn, project_id=project_id, limit=limit)
if not logs:
click.echo("No hook logs found.")
return
rows = [
[str(l["hook_id"]), l.get("task_id") or "-",
"ok" if l["success"] else "fail",
str(l["exit_code"]),
f"{l['duration_seconds']:.1f}s",
l["created_at"][:19]]
for l in logs
]
click.echo(_table(["Hook", "Task", "Result", "Exit", "Duration", "Time"], rows))
@hook.command("setup")
@click.option("--project", "project_id", required=True, help="Project ID")
@click.option("--scripts-dir", default=None,
help="Directory with hook scripts (default: <kin_root>/scripts)")
@click.pass_context
def hook_setup(ctx, project_id, scripts_dir):
"""Register standard hooks for a project.
Registers: rebuild-frontend (fires on web/frontend/* changes),
auto-commit (fires on task_done git add -A && git commit).
Idempotent skips hooks that already exist.
"""
conn = ctx.obj["conn"]
p = models.get_project(conn, project_id)
if not p:
click.echo(f"Project '{project_id}' not found.", err=True)
raise SystemExit(1)
if scripts_dir is None:
scripts_dir = str(Path(__file__).parent.parent / "scripts")
existing_names = {h["name"] for h in hooks_module.get_hooks(conn, project_id, enabled_only=False)}
created = []
if "rebuild-frontend" not in existing_names:
rebuild_cmd = str(Path(scripts_dir) / "rebuild-frontend.sh")
hooks_module.create_hook(
conn, project_id,
name="rebuild-frontend",
event="pipeline_completed",
command=rebuild_cmd,
working_dir=p.get("path"),
timeout_seconds=300,
)
created.append("rebuild-frontend")
else:
click.echo("Hook 'rebuild-frontend' already exists, skipping.")
if "auto-commit" not in existing_names:
project_path = str(Path(p.get("path", ".")).expanduser())
hooks_module.create_hook(
conn, project_id,
name="auto-commit",
event="task_done",
command='git add -A && git commit -m "kin: {task_id} {title}"',
working_dir=project_path,
timeout_seconds=30,
)
created.append("auto-commit")
else:
click.echo("Hook 'auto-commit' already exists, skipping.")
if created:
click.echo(f"Registered hooks: {', '.join(created)}")
# ===========================================================================
# Entry point
# ===========================================================================
if __name__ == "__main__":
cli()