kin/agents/bootstrap.py
johnfrum1234 da4a8aae72 Add bootstrap command — auto-detect project stack, modules, decisions
kin bootstrap <path> --id <id> --name <name> [--vault <path>]

Detects: package.json, requirements.txt, go.mod, config files → tech_stack.
Scans src/app/lib/frontend/backend dirs → modules with type detection.
Parses CLAUDE.md for GOTCHA/WORKAROUND/FIXME/ВАЖНО → decisions.
Scans Obsidian vault for kanban tasks, checkboxes, and decisions.
Preview before save, -y to skip confirmation.
18 bootstrap tests, 57 total passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 13:29:01 +02:00

564 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Kin bootstrap — auto-detect project tech stack, modules, and decisions.
Scans project directory, CLAUDE.md, and optionally Obsidian vault.
Writes results to kin.db via core.models.
"""
import json
import re
from pathlib import Path
from typing import Any
DEFAULT_VAULT = Path.home() / "Library" / "Mobile Documents" / "iCloud~md~obsidian" / "Documents"
# ---------------------------------------------------------------------------
# Tech stack detection
# ---------------------------------------------------------------------------
# package.json dependency → tech label
_NPM_MARKERS = {
"vue": "vue3", "nuxt": "nuxt3", "react": "react", "next": "nextjs",
"svelte": "svelte", "angular": "angular",
"typescript": "typescript", "vite": "vite", "webpack": "webpack",
"express": "express", "fastify": "fastify", "koa": "koa",
"pinia": "pinia", "vuex": "vuex", "redux": "redux",
"tailwindcss": "tailwind", "prisma": "prisma", "drizzle-orm": "drizzle",
"pg": "postgresql", "mysql2": "mysql", "better-sqlite3": "sqlite",
"axios": "axios", "puppeteer": "puppeteer", "playwright": "playwright",
}
# Config files → tech label
_FILE_MARKERS = {
"nuxt.config.ts": "nuxt3", "nuxt.config.js": "nuxt3",
"vite.config.ts": "vite", "vite.config.js": "vite",
"tsconfig.json": "typescript",
"tailwind.config.js": "tailwind", "tailwind.config.ts": "tailwind",
"docker-compose.yml": "docker", "docker-compose.yaml": "docker",
"Dockerfile": "docker",
"go.mod": "go", "Cargo.toml": "rust",
"requirements.txt": "python", "pyproject.toml": "python",
"setup.py": "python", "Pipfile": "python",
".eslintrc.js": "eslint", ".prettierrc": "prettier",
}
def detect_tech_stack(project_path: Path) -> list[str]:
"""Detect tech stack from project files."""
stack: set[str] = set()
# Config file markers
for fname, tech in _FILE_MARKERS.items():
# Check root and one level deep
if (project_path / fname).exists():
stack.add(tech)
for sub in ("frontend", "backend", "server", "client", "app"):
if (project_path / sub / fname).exists():
stack.add(tech)
# package.json (root + subdirs)
for pj_path in _find_package_jsons(project_path):
stack.update(_parse_package_json(pj_path))
# requirements.txt
for req_path in project_path.glob("**/requirements.txt"):
if _is_inside_node_modules(req_path, project_path):
continue
stack.update(_parse_requirements_txt(req_path))
# go.mod
go_mod = project_path / "go.mod"
if go_mod.exists():
stack.add("go")
text = go_mod.read_text(errors="replace")
if "gin-gonic" in text:
stack.add("gin")
if "fiber" in text:
stack.add("fiber")
return sorted(stack)
def _find_package_jsons(root: Path) -> list[Path]:
"""Find package.json files (root + immediate subdirs, skip node_modules)."""
results = []
pj = root / "package.json"
if pj.exists():
results.append(pj)
for sub in root.iterdir():
if sub.is_dir() and sub.name != "node_modules" and not sub.name.startswith("."):
pj = sub / "package.json"
if pj.exists():
results.append(pj)
return results
def _parse_package_json(path: Path) -> list[str]:
"""Extract tech labels from package.json."""
try:
data = json.loads(path.read_text(errors="replace"))
except (json.JSONDecodeError, OSError):
return []
stack = []
all_deps = {}
for key in ("dependencies", "devDependencies"):
all_deps.update(data.get(key, {}))
for dep_name, tech in _NPM_MARKERS.items():
if dep_name in all_deps:
stack.append(tech)
return stack
def _parse_requirements_txt(path: Path) -> list[str]:
"""Extract tech labels from requirements.txt."""
markers = {
"fastapi": "fastapi", "flask": "flask", "django": "django",
"sqlalchemy": "sqlalchemy", "celery": "celery", "redis": "redis",
"pydantic": "pydantic", "click": "click", "pytest": "pytest",
}
stack = []
try:
text = path.read_text(errors="replace").lower()
except OSError:
return stack
for pkg, tech in markers.items():
if pkg in text:
stack.append(tech)
return stack
def _is_inside_node_modules(path: Path, root: Path) -> bool:
rel = path.relative_to(root)
return "node_modules" in rel.parts
# ---------------------------------------------------------------------------
# Module detection
# ---------------------------------------------------------------------------
_FRONTEND_EXTS = {".vue", ".jsx", ".tsx", ".svelte"}
_BACKEND_MARKERS = {"express", "fastify", "koa", "router", "controller", "middleware"}
def detect_modules(project_path: Path) -> list[dict]:
"""Scan src/ (or app/, lib/, frontend/, backend/) for modules."""
modules = []
scan_dirs = []
# Prioritized source dirs
for name in ("src", "app", "lib", "frontend", "backend", "server", "client"):
d = project_path / name
if d.is_dir():
scan_dirs.append(d)
# Also check frontend/src, backend/src patterns
for name in ("frontend/src", "backend/src", "backend-pg/src"):
d = project_path / name
if d.is_dir():
scan_dirs.append(d)
seen = set()
for scan_dir in scan_dirs:
for child in sorted(scan_dir.iterdir()):
if not child.is_dir() or child.name.startswith(".") or child.name == "node_modules":
continue
mod = _analyze_module(child, project_path)
key = (mod["name"], mod["path"])
if key not in seen:
seen.add(key)
modules.append(mod)
return modules
def _analyze_module(dir_path: Path, project_root: Path) -> dict:
"""Analyze a directory to determine module type and file count."""
rel_path = str(dir_path.relative_to(project_root)) + "/"
files = list(dir_path.rglob("*"))
source_files = [f for f in files if f.is_file() and not f.name.startswith(".")]
file_count = len(source_files)
# Determine type
exts = {f.suffix for f in source_files}
mod_type = _guess_module_type(dir_path, exts, source_files)
return {
"name": dir_path.name,
"type": mod_type,
"path": rel_path,
"file_count": file_count,
}
def _guess_module_type(dir_path: Path, exts: set[str], files: list[Path]) -> str:
"""Guess if module is frontend, backend, shared, or infra."""
# Obvious frontend
if exts & _FRONTEND_EXTS:
return "frontend"
# Check file contents for backend markers
has_backend_marker = False
for f in files[:20]: # Sample first 20 files
if f.suffix in (".ts", ".js", ".mjs"):
try:
text = f.read_text(errors="replace")[:2000]
text_lower = text.lower()
if any(m in text_lower for m in _BACKEND_MARKERS):
has_backend_marker = True
break
except OSError:
continue
if has_backend_marker:
return "backend"
# Infra patterns
name = dir_path.name.lower()
if name in ("infra", "deploy", "scripts", "ci", "docker", "nginx", "config"):
return "infra"
# Shared by default if ambiguous
if exts & {".ts", ".js", ".py"}:
return "shared"
return "shared"
# ---------------------------------------------------------------------------
# Decisions from CLAUDE.md
# ---------------------------------------------------------------------------
_DECISION_PATTERNS = [
(r"(?i)\b(GOTCHA|ВАЖНО|WARNING|ВНИМАНИЕ)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "gotcha"),
(r"(?i)\b(WORKAROUND|ОБХОДНОЙ|ХАК)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "workaround"),
(r"(?i)\b(FIXME|TODO|БАГИ?)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "gotcha"),
(r"(?i)\b(РЕШЕНИЕ|DECISION)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "decision"),
(r"(?i)\b(CONVENTION|СОГЛАШЕНИЕ|ПРАВИЛО)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "convention"),
]
# Section headers that likely contain decisions
_DECISION_SECTIONS = [
r"(?i)known\s+issues?", r"(?i)workaround", r"(?i)gotcha",
r"(?i)решени[яе]", r"(?i)грабл[ия]", r"(?i)важно",
r"(?i)conventions?", r"(?i)правила", r"(?i)нюансы",
]
def extract_decisions_from_claude_md(project_path: Path) -> list[dict]:
"""Parse CLAUDE.md for decisions, gotchas, workarounds."""
claude_md = project_path / "CLAUDE.md"
if not claude_md.exists():
return []
try:
text = claude_md.read_text(errors="replace")
except OSError:
return []
decisions = []
seen_titles = set()
# Pattern-based extraction
for pattern, dec_type in _DECISION_PATTERNS:
for m in re.finditer(pattern, text, re.DOTALL):
label = m.group(1).strip()
body = m.group(2).strip()
if not body or len(body) < 10:
continue
# First line as title, rest as description
lines = body.split("\n")
title = lines[0].strip().rstrip(".")[:100]
desc = body
if title not in seen_titles:
seen_titles.add(title)
decisions.append({
"type": dec_type,
"title": title,
"description": desc,
"category": _guess_category(title + " " + desc),
})
# Section-based extraction: find headers matching decision sections
sections = re.split(r"(?m)^(#{1,4}\s+.*?)$", text)
for i, section in enumerate(sections):
if any(re.search(pat, section) for pat in _DECISION_SECTIONS):
# The content is in the next section
if i + 1 < len(sections):
content = sections[i + 1].strip()
# Extract bullet points
for line in content.split("\n"):
line = line.strip()
if line.startswith(("- ", "* ", "")):
item = line.lstrip("-*• ").strip()
if item and len(item) > 10 and item[:80] not in seen_titles:
seen_titles.add(item[:80])
decisions.append({
"type": "decision",
"title": item[:100],
"description": item,
"category": _guess_category(item),
})
return decisions
def _guess_category(text: str) -> str:
"""Best-effort category guess from text content."""
t = text.lower()
if any(w in t for w in ("css", "ui", "vue", "компонент", "стил", "layout", "mobile", "safari", "bottom-sheet")):
return "ui"
if any(w in t for w in ("api", "endpoint", "rest", "route", "запрос", "fetch")):
return "api"
if any(w in t for w in ("sql", "база", "миграц", "postgres", "sqlite", "бд", "schema")):
return "architecture"
if any(w in t for w in ("безопас", "security", "xss", "auth", "token", "csrf", "injection")):
return "security"
if any(w in t for w in ("docker", "deploy", "nginx", "ci", "cd", "infra", "сервер")):
return "devops"
if any(w in t for w in ("performance", "cache", "оптимиз", "lazy", "скорость")):
return "performance"
return "architecture"
# ---------------------------------------------------------------------------
# Obsidian vault scanning
# ---------------------------------------------------------------------------
def find_vault_root(vault_path: Path | None = None) -> Path | None:
"""Find the Obsidian vault root directory.
If vault_path is given but doesn't exist, returns None (don't fallback).
If vault_path is None, tries the default iCloud Obsidian location.
"""
if vault_path is not None:
return vault_path if vault_path.is_dir() else None
# Default: iCloud Obsidian path
default = DEFAULT_VAULT
if default.is_dir():
# Look for a vault inside (usually one level deep)
for child in default.iterdir():
if child.is_dir() and not child.name.startswith("."):
return child
return None
def scan_obsidian(
vault_root: Path,
project_id: str,
project_name: str,
project_dir_name: str | None = None,
) -> dict:
"""Scan Obsidian vault for project-related notes.
Returns {"tasks": [...], "decisions": [...], "files_scanned": int}
"""
result = {"tasks": [], "decisions": [], "files_scanned": 0}
# Build search terms
search_terms = {project_id.lower()}
if project_name:
search_terms.add(project_name.lower())
if project_dir_name:
search_terms.add(project_dir_name.lower())
# Find project folder in vault
project_files: list[Path] = []
for term in list(search_terms):
for child in vault_root.iterdir():
if child.is_dir() and term in child.name.lower():
for f in child.rglob("*.md"):
if f not in project_files:
project_files.append(f)
# Also search for files mentioning the project by name
for md_file in vault_root.glob("*.md"):
try:
text = md_file.read_text(errors="replace")[:5000].lower()
except OSError:
continue
if any(term in text for term in search_terms):
if md_file not in project_files:
project_files.append(md_file)
result["files_scanned"] = len(project_files)
for f in project_files:
try:
text = f.read_text(errors="replace")
except OSError:
continue
_extract_obsidian_tasks(text, f.stem, result["tasks"])
_extract_obsidian_decisions(text, f.stem, result["decisions"])
return result
def _extract_obsidian_tasks(text: str, source: str, tasks: list[dict]):
"""Extract checkbox items from Obsidian markdown."""
for m in re.finditer(r"^[-*]\s+\[([ xX])\]\s+(.+)$", text, re.MULTILINE):
done = m.group(1).lower() == "x"
title = m.group(2).strip()
# Remove Obsidian wiki-links
title = re.sub(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]", r"\1", title)
if len(title) > 5:
tasks.append({
"title": title[:200],
"done": done,
"source": source,
})
def _extract_obsidian_decisions(text: str, source: str, decisions: list[dict]):
"""Extract decisions/gotchas from Obsidian notes."""
for pattern, dec_type in _DECISION_PATTERNS:
for m in re.finditer(pattern, text, re.DOTALL):
body = m.group(2).strip()
if body and len(body) > 10:
title = body.split("\n")[0].strip()[:100]
decisions.append({
"type": dec_type,
"title": title,
"description": body,
"category": _guess_category(body),
"source": source,
})
# Also look for ВАЖНО/GOTCHA/FIXME inline markers not caught above
for m in re.finditer(r"(?i)\*\*(ВАЖНО|GOTCHA|FIXME)\*\*[:\s]*(.*?)(?=\n|$)", text):
body = m.group(2).strip()
if body and len(body) > 10:
decisions.append({
"type": "gotcha",
"title": body[:100],
"description": body,
"category": _guess_category(body),
"source": source,
})
# ---------------------------------------------------------------------------
# Formatting for CLI preview
# ---------------------------------------------------------------------------
def format_preview(
project_id: str,
name: str,
path: str,
tech_stack: list[str],
modules: list[dict],
decisions: list[dict],
obsidian: dict | None = None,
) -> str:
"""Format bootstrap results for user review."""
lines = [
f"Project: {project_id}{name}",
f"Path: {path}",
"",
f"Tech stack: {', '.join(tech_stack) if tech_stack else '(not detected)'}",
"",
]
if modules:
lines.append(f"Modules ({len(modules)}):")
for m in modules:
lines.append(f" {m['name']} ({m['type']}) — {m['path']} ({m['file_count']} files)")
else:
lines.append("Modules: (none detected)")
lines.append("")
if decisions:
lines.append(f"Decisions from CLAUDE.md ({len(decisions)}):")
for i, d in enumerate(decisions, 1):
lines.append(f" #{i} {d['type']}: {d['title']}")
else:
lines.append("Decisions from CLAUDE.md: (none found)")
if obsidian:
lines.append("")
lines.append(f"Obsidian vault ({obsidian['files_scanned']} files scanned):")
if obsidian["tasks"]:
pending = [t for t in obsidian["tasks"] if not t["done"]]
done = [t for t in obsidian["tasks"] if t["done"]]
lines.append(f" Tasks: {len(pending)} pending, {len(done)} done")
for t in pending[:10]:
lines.append(f" [ ] {t['title']}")
if len(pending) > 10:
lines.append(f" ... and {len(pending) - 10} more")
for t in done[:5]:
lines.append(f" [x] {t['title']}")
if len(done) > 5:
lines.append(f" ... and {len(done) - 5} more done")
else:
lines.append(" Tasks: (none found)")
if obsidian["decisions"]:
lines.append(f" Decisions: {len(obsidian['decisions'])}")
for d in obsidian["decisions"][:5]:
lines.append(f" {d['type']}: {d['title']} (from {d['source']})")
if len(obsidian["decisions"]) > 5:
lines.append(f" ... and {len(obsidian['decisions']) - 5} more")
else:
lines.append(" Decisions: (none found)")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Write to DB
# ---------------------------------------------------------------------------
def save_to_db(
conn,
project_id: str,
name: str,
path: str,
tech_stack: list[str],
modules: list[dict],
decisions: list[dict],
obsidian: dict | None = None,
):
"""Save all bootstrap data to kin.db via models."""
from core import models
# Create project
claude_md = Path(path).expanduser() / "CLAUDE.md"
models.create_project(
conn, project_id, name, path,
tech_stack=tech_stack,
claude_md_path=str(claude_md) if claude_md.exists() else None,
)
# Add modules
for m in modules:
models.add_module(
conn, project_id, m["name"], m["type"], m["path"],
description=f"{m['file_count']} files",
)
# Add decisions from CLAUDE.md
for d in decisions:
models.add_decision(
conn, project_id, d["type"], d["title"], d["description"],
category=d.get("category"),
)
# Add Obsidian decisions
if obsidian:
for d in obsidian.get("decisions", []):
models.add_decision(
conn, project_id, d["type"], d["title"], d["description"],
category=d.get("category"),
tags=[f"obsidian:{d['source']}"],
)
# Import Obsidian tasks
task_num = 1
for t in obsidian.get("tasks", []):
task_id = f"{project_id.upper()}-OBS-{task_num:03d}"
status = "done" if t["done"] else "pending"
models.create_task(
conn, task_id, project_id, t["title"],
status=status,
brief={"source": f"obsidian:{t['source']}"},
)
task_num += 1