""" Kin CLI — command-line interface for the multi-agent orchestrator. Uses core.models for all data access, never raw SQL. """ import json import os import sys from pathlib import Path import click # Ensure project root is on sys.path sys.path.insert(0, str(Path(__file__).parent.parent)) from core.db import init_db from core import models from core import hooks as hooks_module from agents.bootstrap import ( detect_tech_stack, detect_modules, extract_decisions_from_claude_md, find_vault_root, scan_obsidian, format_preview, save_to_db, ) DEFAULT_DB = Path.home() / ".kin" / "kin.db" def get_conn(db_path: Path = DEFAULT_DB): db_path.parent.mkdir(parents=True, exist_ok=True) return init_db(db_path) def _parse_json(ctx, param, value): """Click callback: parse a JSON string or return None.""" if value is None: return None try: return json.loads(value) except json.JSONDecodeError: raise click.BadParameter(f"Invalid JSON: {value}") def _table(headers: list[str], rows: list[list[str]], min_width: int = 6): """Render a simple aligned text table.""" widths = [max(min_width, len(h)) for h in headers] for row in rows: for i, cell in enumerate(row): if i < len(widths): widths[i] = max(widths[i], len(str(cell))) fmt = " ".join(f"{{:<{w}}}" for w in widths) lines = [fmt.format(*headers), fmt.format(*("-" * w for w in widths))] for row in rows: lines.append(fmt.format(*[str(c) for c in row])) return "\n".join(lines) # =========================================================================== # Root group # =========================================================================== @click.group() @click.option("--db", type=click.Path(), default=None, envvar="KIN_DB", help="Path to kin.db (default: ~/.kin/kin.db, or $KIN_DB)") @click.pass_context def cli(ctx, db): """Kin — multi-agent project orchestrator.""" ctx.ensure_object(dict) db_path = Path(db) if db else DEFAULT_DB ctx.obj["conn"] = get_conn(db_path) # =========================================================================== # project # =========================================================================== @cli.group() def project(): """Manage projects.""" @project.command("add") @click.argument("id") @click.argument("name") @click.argument("path") @click.option("--tech-stack", callback=_parse_json, default=None, help='JSON array, e.g. \'["vue3","nuxt"]\'') @click.option("--status", default="active") @click.option("--priority", type=int, default=5) @click.option("--language", default="ru", help="Response language for agents (ru, en, etc.)") @click.pass_context def project_add(ctx, id, name, path, tech_stack, status, priority, language): """Add a new project.""" conn = ctx.obj["conn"] p = models.create_project(conn, id, name, path, tech_stack=tech_stack, status=status, priority=priority, language=language) click.echo(f"Created project: {p['id']} ({p['name']})") @cli.command("new-project") @click.argument("description") @click.option("--id", "project_id", required=True, help="Project ID") @click.option("--name", required=True, help="Project name") @click.option("--path", required=True, help="Project path") @click.option("--roles", default="business,market,tech", show_default=True, help="Comma-separated roles: business,market,legal,tech,ux,marketer") @click.option("--tech-stack", default=None, help="Comma-separated tech stack") @click.option("--priority", type=int, default=5, show_default=True) @click.option("--language", default="ru", show_default=True) @click.pass_context def new_project(ctx, description, project_id, name, path, roles, tech_stack, priority, language): """Create a new project with a sequential research phase pipeline. DESCRIPTION — free-text project description for the agents. Role aliases: business=business_analyst, market=market_researcher, legal=legal_researcher, tech=tech_researcher, ux=ux_designer, marketer=marketer. Architect is added automatically as the last phase. """ from core.phases import create_project_with_phases, validate_roles, ROLE_LABELS _ALIASES = { "business": "business_analyst", "market": "market_researcher", "legal": "legal_researcher", "tech": "tech_researcher", "ux": "ux_designer", } raw_roles = [r.strip().lower() for r in roles.split(",") if r.strip()] expanded = [_ALIASES.get(r, r) for r in raw_roles] clean_roles = validate_roles(expanded) if not clean_roles: click.echo("Error: no valid research roles specified.", err=True) raise SystemExit(1) ts = [s.strip() for s in tech_stack.split(",") if s.strip()] if tech_stack else None conn = ctx.obj["conn"] if models.get_project(conn, project_id): click.echo(f"Error: project '{project_id}' already exists.", err=True) raise SystemExit(1) try: result = create_project_with_phases( conn, project_id, name, path, description=description, selected_roles=clean_roles, tech_stack=ts, priority=priority, language=language, ) except ValueError as e: click.echo(f"Error: {e}", err=True) raise SystemExit(1) click.echo(f"Created project: {result['project']['id']} ({result['project']['name']})") click.echo(f"Description: {description}") click.echo("") phases = result["phases"] rows = [ [str(p["id"]), str(p["phase_order"] + 1), p["role"], p["status"], p.get("task_id") or "—"] for p in phases ] click.echo(_table(["ID", "#", "Role", "Status", "Task"], rows)) @project.command("list") @click.option("--status", default=None) @click.pass_context def project_list(ctx, status): """List projects.""" conn = ctx.obj["conn"] projects = models.list_projects(conn, status=status) if not projects: click.echo("No projects found.") return rows = [[p["id"], p["name"], p["status"], str(p["priority"]), p["path"]] for p in projects] click.echo(_table(["ID", "Name", "Status", "Pri", "Path"], rows)) @project.command("show") @click.argument("id") @click.pass_context def project_show(ctx, id): """Show project details.""" conn = ctx.obj["conn"] p = models.get_project(conn, id) if not p: click.echo(f"Project '{id}' not found.", err=True) raise SystemExit(1) click.echo(f"Project: {p['id']}") click.echo(f" Name: {p['name']}") click.echo(f" Path: {p['path']}") click.echo(f" Status: {p['status']}") click.echo(f" Priority: {p['priority']}") click.echo(f" Mode: {p.get('execution_mode') or 'review'}") if p.get("tech_stack"): click.echo(f" Tech stack: {', '.join(p['tech_stack'])}") if p.get("forgejo_repo"): click.echo(f" Forgejo: {p['forgejo_repo']}") click.echo(f" Created: {p['created_at']}") @project.command("set-mode") @click.option("--project", "project_id", required=True, help="Project ID") @click.argument("mode", type=click.Choice(["auto", "review"])) @click.pass_context def project_set_mode(ctx, project_id, mode): """Set execution mode for a project (auto|review).""" conn = ctx.obj["conn"] p = models.get_project(conn, project_id) if not p: click.echo(f"Project '{project_id}' not found.", err=True) raise SystemExit(1) models.update_project(conn, project_id, execution_mode=mode) click.echo(f"Project '{project_id}' execution_mode set to '{mode}'.") # =========================================================================== # task # =========================================================================== @cli.group() def task(): """Manage tasks.""" @task.command("add") @click.argument("project_id") @click.argument("title") @click.option("--type", "route_type", type=click.Choice(["debug", "feature", "refactor", "hotfix"]), default=None) @click.option("--priority", type=int, default=5) @click.option("--category", "-c", default=None, help=f"Task category: {', '.join(models.TASK_CATEGORIES)}") @click.pass_context def task_add(ctx, project_id, title, route_type, priority, category): """Add a task to a project. ID is auto-generated (PROJ-001 or PROJ-CAT-001).""" conn = ctx.obj["conn"] p = models.get_project(conn, project_id) if not p: click.echo(f"Project '{project_id}' not found.", err=True) raise SystemExit(1) if category: category = category.upper() if category not in models.TASK_CATEGORIES: click.echo( f"Invalid category '{category}'. Must be one of: {', '.join(models.TASK_CATEGORIES)}", err=True, ) raise SystemExit(1) task_id = models.next_task_id(conn, project_id, category=category) brief = {"route_type": route_type} if route_type else None t = models.create_task(conn, task_id, project_id, title, priority=priority, brief=brief, category=category) click.echo(f"Created task: {t['id']} — {t['title']}") @task.command("list") @click.option("--project", "project_id", default=None) @click.option("--status", default=None) @click.pass_context def task_list(ctx, project_id, status): """List tasks.""" conn = ctx.obj["conn"] tasks = models.list_tasks(conn, project_id=project_id, status=status) if not tasks: click.echo("No tasks found.") return rows = [[t["id"], t["project_id"], t["title"][:40], t["status"], str(t["priority"]), t.get("assigned_role") or "-"] for t in tasks] click.echo(_table(["ID", "Project", "Title", "Status", "Pri", "Role"], rows)) @task.command("show") @click.argument("id") @click.pass_context def task_show(ctx, id): """Show task details.""" conn = ctx.obj["conn"] t = models.get_task(conn, id) if not t: click.echo(f"Task '{id}' not found.", err=True) raise SystemExit(1) effective_mode = models.get_effective_mode(conn, t["project_id"], t["id"]) task_mode = t.get("execution_mode") mode_label = f"{effective_mode} (overridden)" if task_mode else f"{effective_mode} (inherited)" click.echo(f"Task: {t['id']}") click.echo(f" Project: {t['project_id']}") click.echo(f" Title: {t['title']}") click.echo(f" Status: {t['status']}") click.echo(f" Priority: {t['priority']}") click.echo(f" Mode: {mode_label}") if t.get("assigned_role"): click.echo(f" Role: {t['assigned_role']}") if t.get("parent_task_id"): click.echo(f" Parent: {t['parent_task_id']}") if t.get("brief"): click.echo(f" Brief: {json.dumps(t['brief'], ensure_ascii=False)}") if t.get("spec"): click.echo(f" Spec: {json.dumps(t['spec'], ensure_ascii=False)}") click.echo(f" Created: {t['created_at']}") click.echo(f" Updated: {t['updated_at']}") @task.command("update") @click.argument("task_id") @click.option("--status", type=click.Choice(models.VALID_TASK_STATUSES), default=None, help="New status") @click.option("--priority", type=int, default=None, help="New priority (1-10)") @click.option("--mode", "mode", type=click.Choice(["auto", "review"]), default=None, help="Override execution mode for this task") @click.pass_context def task_update(ctx, task_id, status, priority, mode): """Update a task's status, priority, or execution mode.""" conn = ctx.obj["conn"] t = models.get_task(conn, task_id) if not t: click.echo(f"Task '{task_id}' not found.", err=True) raise SystemExit(1) fields = {} if status is not None: fields["status"] = status if priority is not None: fields["priority"] = priority if mode is not None: fields["execution_mode"] = mode if not fields: click.echo("Nothing to update. Use --status, --priority, or --mode.", err=True) raise SystemExit(1) updated = models.update_task(conn, task_id, **fields) click.echo(f"Updated {updated['id']}: status={updated['status']}, priority={updated['priority']}, mode={updated.get('execution_mode') or '(inherited)'}") # =========================================================================== # decision # =========================================================================== @cli.group() def decision(): """Manage decisions and gotchas.""" @decision.command("add") @click.argument("project_id") @click.argument("type", type=click.Choice(["decision", "gotcha", "workaround", "rejected_approach", "convention"])) @click.argument("title") @click.argument("description") @click.option("--category", default=None) @click.option("--tags", callback=_parse_json, default=None, help='JSON array, e.g. \'["ios","css"]\'') @click.option("--task-id", default=None) @click.pass_context def decision_add(ctx, project_id, type, title, description, category, tags, task_id): """Record a decision, gotcha, or convention.""" conn = ctx.obj["conn"] p = models.get_project(conn, project_id) if not p: click.echo(f"Project '{project_id}' not found.", err=True) raise SystemExit(1) d = models.add_decision(conn, project_id, type, title, description, category=category, tags=tags, task_id=task_id) click.echo(f"Added {d['type']}: #{d['id']} — {d['title']}") @decision.command("list") @click.argument("project_id") @click.option("--category", default=None) @click.option("--tag", multiple=True, help="Filter by tag (can repeat)") @click.option("--type", "types", multiple=True, type=click.Choice(["decision", "gotcha", "workaround", "rejected_approach", "convention"]), help="Filter by type (can repeat)") @click.pass_context def decision_list(ctx, project_id, category, tag, types): """List decisions for a project.""" conn = ctx.obj["conn"] tags_list = list(tag) if tag else None types_list = list(types) if types else None decisions = models.get_decisions(conn, project_id, category=category, tags=tags_list, types=types_list) if not decisions: click.echo("No decisions found.") return rows = [[str(d["id"]), d["type"], d["category"] or "-", d["title"][:50], d["created_at"][:10]] for d in decisions] click.echo(_table(["#", "Type", "Category", "Title", "Date"], rows)) # =========================================================================== # module # =========================================================================== @cli.group() def module(): """Manage project modules.""" @module.command("add") @click.argument("project_id") @click.argument("name") @click.argument("type", type=click.Choice(["frontend", "backend", "shared", "infra"])) @click.argument("path") @click.option("--description", default=None) @click.option("--owner-role", default=None) @click.pass_context def module_add(ctx, project_id, name, type, path, description, owner_role): """Register a project module.""" conn = ctx.obj["conn"] p = models.get_project(conn, project_id) if not p: click.echo(f"Project '{project_id}' not found.", err=True) raise SystemExit(1) m = models.add_module(conn, project_id, name, type, path, description=description, owner_role=owner_role) click.echo(f"Added module: {m['name']} ({m['type']}) at {m['path']}") @module.command("list") @click.argument("project_id") @click.pass_context def module_list(ctx, project_id): """List modules for a project.""" conn = ctx.obj["conn"] mods = models.get_modules(conn, project_id) if not mods: click.echo("No modules found.") return rows = [[m["name"], m["type"], m["path"], m.get("owner_role") or "-", m.get("description") or ""] for m in mods] click.echo(_table(["Name", "Type", "Path", "Owner", "Description"], rows)) # =========================================================================== # status # =========================================================================== @cli.command("status") @click.argument("project_id", required=False) @click.pass_context def status(ctx, project_id): """Project status overview. Without args — all projects. With id — detailed.""" conn = ctx.obj["conn"] if project_id: p = models.get_project(conn, project_id) if not p: click.echo(f"Project '{project_id}' not found.", err=True) raise SystemExit(1) tasks = models.list_tasks(conn, project_id=project_id) counts = {} for t in tasks: counts[t["status"]] = counts.get(t["status"], 0) + 1 click.echo(f"Project: {p['id']} — {p['name']} [{p['status']}]") click.echo(f" Path: {p['path']}") if p.get("tech_stack"): click.echo(f" Stack: {', '.join(p['tech_stack'])}") click.echo(f" Tasks: {len(tasks)} total") for s in ["pending", "in_progress", "review", "done", "blocked"]: if counts.get(s, 0) > 0: click.echo(f" {s}: {counts[s]}") if tasks: click.echo("") rows = [[t["id"], t["title"][:40], t["status"], t.get("assigned_role") or "-"] for t in tasks] click.echo(_table(["ID", "Title", "Status", "Role"], rows)) else: summary = models.get_project_summary(conn) if not summary: click.echo("No projects.") return rows = [[s["id"], s["name"][:25], s["status"], str(s["priority"]), str(s["total_tasks"]), str(s["done_tasks"]), str(s["active_tasks"]), str(s["blocked_tasks"])] for s in summary] click.echo(_table( ["ID", "Name", "Status", "Pri", "Total", "Done", "Active", "Blocked"], rows, )) # =========================================================================== # cost # =========================================================================== @cli.command("cost") @click.option("--last", "period", default="7d", help="Period: 7d, 30d, etc.") @click.pass_context def cost(ctx, period): """Show cost summary by project.""" # Parse period like "7d", "30d" period = period.strip().lower() if period.endswith("d"): try: days = int(period[:-1]) except ValueError: click.echo(f"Invalid period: {period}. Use e.g. 7d, 30d.", err=True) raise SystemExit(1) else: try: days = int(period) except ValueError: click.echo(f"Invalid period: {period}. Use e.g. 7d, 30d.", err=True) raise SystemExit(1) conn = ctx.obj["conn"] costs = models.get_cost_summary(conn, days=days) if not costs: click.echo(f"No agent runs in the last {days} days.") return rows = [[c["project_id"], c["project_name"][:25], str(c["runs"]), f"{c['total_tokens']:,}", f"${c['total_cost_usd']:.4f}", f"{c['total_duration_seconds']}s"] for c in costs] click.echo(f"Cost summary (last {days} days):\n") click.echo(_table( ["Project", "Name", "Runs", "Tokens", "Cost", "Time"], rows, )) total = sum(c["total_cost_usd"] for c in costs) click.echo(f"\nTotal: ${total:.4f}") # =========================================================================== # approve # =========================================================================== @cli.command("approve") @click.argument("task_id") @click.option("--followup", is_flag=True, help="Generate follow-up tasks from pipeline results") @click.option("--decision", "decision_text", default=None, help="Record a decision with this text") @click.pass_context def approve_task(ctx, task_id, followup, decision_text): """Approve a task (set status=done). Optionally generate follow-ups.""" from core.followup import generate_followups, resolve_pending_action conn = ctx.obj["conn"] task = models.get_task(conn, task_id) if not task: click.echo(f"Task '{task_id}' not found.", err=True) raise SystemExit(1) models.update_task(conn, task_id, status="done") click.echo(f"Approved: {task_id} → done") if decision_text: models.add_decision( conn, task["project_id"], "decision", decision_text, decision_text, task_id=task_id, ) click.echo(f"Decision recorded.") if followup: click.echo("Generating follow-up tasks...") result = generate_followups(conn, task_id) created = result["created"] pending = result["pending_actions"] if created: click.echo(f"Created {len(created)} follow-up tasks:") for t in created: click.echo(f" {t['id']}: {t['title']} (pri {t['priority']})") for action in pending: click.echo(f"\nPermission issue: {action['description']}") click.echo(" 1. Rerun with --dangerously-skip-permissions") click.echo(" 2. Create task for manual fix") click.echo(" 3. Skip") choice_input = click.prompt("Choice", type=click.Choice(["1", "2", "3"]), default="2") choice_map = {"1": "rerun", "2": "manual_task", "3": "skip"} choice = choice_map[choice_input] result = resolve_pending_action(conn, task_id, action, choice) if choice == "rerun" and result: rr = result.get("rerun_result", {}) if rr.get("success"): click.echo(" Re-run completed successfully.") else: click.echo(f" Re-run failed: {rr.get('error', 'unknown')}") elif choice == "manual_task" and result: click.echo(f" Created: {result['id']}: {result['title']}") elif choice == "skip": click.echo(" Skipped.") if not created and not pending: click.echo("No follow-up tasks generated.") # =========================================================================== # run # =========================================================================== @cli.command("run") @click.argument("task_id") @click.option("--dry-run", is_flag=True, help="Show pipeline plan without executing") @click.option("--allow-write", is_flag=True, help="Allow agents to write files (skip permissions)") @click.pass_context def run_task(ctx, task_id, dry_run, allow_write): """Run a task through the agent pipeline. PM decomposes the task into specialist steps, then the pipeline executes. With --dry-run, shows the plan without running agents. """ from agents.runner import run_agent, run_pipeline conn = ctx.obj["conn"] task = models.get_task(conn, task_id) if not task: click.echo(f"Task '{task_id}' not found.", err=True) raise SystemExit(1) project_id = task["project_id"] is_noninteractive = os.environ.get("KIN_NONINTERACTIVE") == "1" click.echo(f"Task: {task['id']} — {task['title']}") # Step 1: PM decomposes click.echo("Running PM to decompose task...") pm_result = run_agent( conn, "pm", task_id, project_id, model="sonnet", dry_run=dry_run, allow_write=allow_write, noninteractive=is_noninteractive, ) if dry_run: click.echo("\n--- PM Prompt (dry-run) ---") click.echo(pm_result.get("prompt", "")[:2000]) click.echo("\n(Dry-run: PM would produce a pipeline JSON)") return if not pm_result["success"]: click.echo(f"PM failed: {pm_result.get('output', 'unknown error')}", err=True) raise SystemExit(1) # Parse PM output for pipeline output = pm_result.get("output") if isinstance(output, str): try: output = json.loads(output) except json.JSONDecodeError: click.echo(f"PM returned non-JSON output:\n{output[:500]}", err=True) raise SystemExit(1) if not isinstance(output, dict) or "pipeline" not in output: click.echo(f"PM output missing 'pipeline' key:\n{json.dumps(output, indent=2)[:500]}", err=True) raise SystemExit(1) pipeline_steps = output["pipeline"] analysis = output.get("analysis", "") # Save completion_mode from PM output to task (only if neither task nor project has explicit mode) task_current = models.get_task(conn, task_id) update_fields = {} project = models.get_project(conn, project_id) project_mode = project.get("execution_mode") if project else None if not task_current.get("execution_mode") and not project_mode: pm_completion_mode = models.validate_completion_mode( output.get("completion_mode", "review") ) update_fields["execution_mode"] = pm_completion_mode import logging logging.getLogger("kin").info( "PM set completion_mode=%s for task %s", pm_completion_mode, task_id ) # Save category from PM output (only if task has no category yet) if not task_current.get("category"): pm_category = output.get("category") if pm_category and isinstance(pm_category, str): pm_category = pm_category.upper() if pm_category in models.TASK_CATEGORIES: update_fields["category"] = pm_category if update_fields: models.update_task(conn, task_id, **update_fields) click.echo(f"\nAnalysis: {analysis}") click.echo(f"Pipeline ({len(pipeline_steps)} steps):") for i, step in enumerate(pipeline_steps, 1): click.echo(f" {i}. {step['role']} ({step.get('model', 'sonnet')}): {step.get('brief', '')}") if is_noninteractive: click.echo("\n[non-interactive] Auto-executing pipeline...") elif not click.confirm("\nExecute pipeline?"): click.echo("Aborted.") return # Step 2: Execute pipeline click.echo("\nExecuting pipeline...") result = run_pipeline(conn, task_id, pipeline_steps, allow_write=allow_write, noninteractive=is_noninteractive) if result["success"]: click.echo(f"\nPipeline completed: {result['steps_completed']} steps") else: click.echo(f"\nPipeline failed at step: {result.get('error', 'unknown')}", err=True) if result.get("total_cost_usd"): click.echo(f"Cost: ${result['total_cost_usd']:.4f}") if result.get("total_duration_seconds"): click.echo(f"Duration: {result['total_duration_seconds']}s") # =========================================================================== # audit # =========================================================================== @cli.command("audit") @click.argument("project_id") @click.pass_context def audit_backlog(ctx, project_id): """Audit pending tasks — check which are already implemented in the code.""" from agents.runner import run_audit conn = ctx.obj["conn"] p = models.get_project(conn, project_id) if not p: click.echo(f"Project '{project_id}' not found.", err=True) raise SystemExit(1) pending = models.list_tasks(conn, project_id=project_id, status="pending") if not pending: click.echo("No pending tasks to audit.") return click.echo(f"Auditing {len(pending)} pending tasks for {project_id}...") # First pass: get results only (no auto_apply yet) result = run_audit(conn, project_id) if not result["success"]: click.echo(f"Audit failed: {result.get('error', 'unknown')}", err=True) raise SystemExit(1) done = result.get("already_done", []) still = result.get("still_pending", []) unclear = result.get("unclear", []) if done: click.echo(f"\nAlready done ({len(done)}):") for item in done: click.echo(f" {item['id']}: {item.get('reason', '')}") if still: click.echo(f"\nStill pending ({len(still)}):") for item in still: click.echo(f" {item['id']}: {item.get('reason', '')}") if unclear: click.echo(f"\nUnclear ({len(unclear)}):") for item in unclear: click.echo(f" {item['id']}: {item.get('reason', '')}") if result.get("cost_usd"): click.echo(f"\nCost: ${result['cost_usd']:.4f}") if result.get("duration_seconds"): click.echo(f"Duration: {result['duration_seconds']}s") # Apply: mark tasks as done after user confirmation if done and click.confirm(f"\nMark {len(done)} tasks as done?"): for item in done: tid = item.get("id") if tid: t = models.get_task(conn, tid) if t and t["project_id"] == project_id and t["status"] == "pending": models.update_task(conn, tid, status="done") click.echo(f"Marked {len(done)} tasks as done.") # =========================================================================== # bootstrap # =========================================================================== @cli.command("bootstrap") @click.argument("path", type=click.Path(exists=True)) @click.option("--id", "project_id", required=True, help="Short project ID (e.g. vdol)") @click.option("--name", required=True, help="Project display name") @click.option("--vault", "vault_path", type=click.Path(), default=None, help="Obsidian vault path (auto-detected if omitted)") @click.option("-y", "--yes", is_flag=True, help="Skip confirmation") @click.pass_context def bootstrap(ctx, path, project_id, name, vault_path, yes): """Auto-detect project stack, modules, decisions and import into Kin.""" conn = ctx.obj["conn"] project_path = Path(path).expanduser().resolve() # Check if project already exists existing = models.get_project(conn, project_id) if existing: click.echo(f"Project '{project_id}' already exists. Use 'kin project show {project_id}'.", err=True) raise SystemExit(1) # Detect everything click.echo(f"Scanning {project_path} ...") tech_stack = detect_tech_stack(project_path) modules = detect_modules(project_path) decisions = extract_decisions_from_claude_md(project_path, project_id, name) # Obsidian obsidian = None vault_root = find_vault_root(Path(vault_path) if vault_path else None) if vault_root: dir_name = project_path.name obsidian = scan_obsidian(vault_root, project_id, name, dir_name) if not obsidian["tasks"] and not obsidian["decisions"]: obsidian = None # Nothing found, don't clutter output # Preview click.echo("") click.echo(format_preview( project_id, name, str(project_path), tech_stack, modules, decisions, obsidian, )) click.echo("") if not yes: if not click.confirm("Save to kin.db?"): click.echo("Aborted.") return save_to_db(conn, project_id, name, str(project_path), tech_stack, modules, decisions, obsidian) # Summary task_count = 0 dec_count = len(decisions) if obsidian: task_count += len(obsidian.get("tasks", [])) dec_count += len(obsidian.get("decisions", [])) click.echo(f"Saved: 1 project, {len(modules)} modules, " f"{dec_count} decisions, {task_count} tasks.") # =========================================================================== # hook # =========================================================================== @cli.group() def hook(): """Manage post-pipeline hooks.""" @hook.command("add") @click.option("--project", "project_id", required=True, help="Project ID") @click.option("--name", required=True, help="Hook name") @click.option("--event", required=True, help="Event: pipeline_completed, step_completed") @click.option("--command", required=True, help="Shell command to run") @click.option("--module-path", default=None, help="Trigger only when module path matches (fnmatch)") @click.option("--working-dir", default=None, help="Working directory for the command") @click.pass_context def hook_add(ctx, project_id, name, event, command, module_path, working_dir): """Add a post-pipeline hook to a project.""" conn = ctx.obj["conn"] p = models.get_project(conn, project_id) if not p: click.echo(f"Project '{project_id}' not found.", err=True) raise SystemExit(1) h = hooks_module.create_hook( conn, project_id, name, event, command, trigger_module_path=module_path, working_dir=working_dir, ) click.echo(f"Created hook: #{h['id']} {h['name']} [{h['event']}] → {h['command']}") @hook.command("list") @click.option("--project", "project_id", required=True, help="Project ID") @click.pass_context def hook_list(ctx, project_id): """List hooks for a project.""" conn = ctx.obj["conn"] hs = hooks_module.get_hooks(conn, project_id, enabled_only=False) if not hs: click.echo("No hooks found.") return rows = [ [str(h["id"]), h["name"], h["event"], h["command"][:40], h.get("trigger_module_path") or "-", "yes" if h["enabled"] else "no"] for h in hs ] click.echo(_table(["ID", "Name", "Event", "Command", "Module", "Enabled"], rows)) @hook.command("remove") @click.argument("hook_id", type=int) @click.pass_context def hook_remove(ctx, hook_id): """Remove a hook by ID.""" conn = ctx.obj["conn"] row = conn.execute("SELECT * FROM hooks WHERE id = ?", (hook_id,)).fetchone() if not row: click.echo(f"Hook #{hook_id} not found.", err=True) raise SystemExit(1) hooks_module.delete_hook(conn, hook_id) click.echo(f"Removed hook #{hook_id}.") @hook.command("logs") @click.option("--project", "project_id", required=True, help="Project ID") @click.option("--limit", default=20, help="Number of log entries (default: 20)") @click.pass_context def hook_logs(ctx, project_id, limit): """Show recent hook execution logs for a project.""" conn = ctx.obj["conn"] logs = hooks_module.get_hook_logs(conn, project_id=project_id, limit=limit) if not logs: click.echo("No hook logs found.") return rows = [ [str(l["hook_id"]), l.get("task_id") or "-", "ok" if l["success"] else "fail", str(l["exit_code"]), f"{l['duration_seconds']:.1f}s", l["created_at"][:19]] for l in logs ] click.echo(_table(["Hook", "Task", "Result", "Exit", "Duration", "Time"], rows)) @hook.command("setup") @click.option("--project", "project_id", required=True, help="Project ID") @click.option("--scripts-dir", default=None, help="Directory with hook scripts (default: /scripts)") @click.pass_context def hook_setup(ctx, project_id, scripts_dir): """Register standard hooks for a project. Registers: rebuild-frontend (fires on web/frontend/* changes), auto-commit (fires on task_done — git add -A && git commit). Idempotent — skips hooks that already exist. """ conn = ctx.obj["conn"] p = models.get_project(conn, project_id) if not p: click.echo(f"Project '{project_id}' not found.", err=True) raise SystemExit(1) if scripts_dir is None: scripts_dir = str(Path(__file__).parent.parent / "scripts") existing_names = {h["name"] for h in hooks_module.get_hooks(conn, project_id, enabled_only=False)} created = [] if "rebuild-frontend" not in existing_names: rebuild_cmd = str(Path(scripts_dir) / "rebuild-frontend.sh") hooks_module.create_hook( conn, project_id, name="rebuild-frontend", event="pipeline_completed", command=rebuild_cmd, working_dir=p.get("path"), timeout_seconds=300, ) created.append("rebuild-frontend") else: click.echo("Hook 'rebuild-frontend' already exists, skipping.") if "auto-commit" not in existing_names: project_path = str(Path(p.get("path", ".")).expanduser()) hooks_module.create_hook( conn, project_id, name="auto-commit", event="task_done", command='git add -A && git commit -m "kin: {task_id} {title}"', working_dir=project_path, timeout_seconds=30, ) created.append("auto-commit") else: click.echo("Hook 'auto-commit' already exists, skipping.") if created: click.echo(f"Registered hooks: {', '.join(created)}") # =========================================================================== # Entry point # =========================================================================== if __name__ == "__main__": cli()