""" Kin bootstrap — auto-detect project tech stack, modules, and decisions. Scans project directory, CLAUDE.md, and optionally Obsidian vault. Writes results to kin.db via core.models. """ import json import re from pathlib import Path from typing import Any DEFAULT_VAULT = Path.home() / "Library" / "Mobile Documents" / "iCloud~md~obsidian" / "Documents" # --------------------------------------------------------------------------- # Tech stack detection # --------------------------------------------------------------------------- # package.json dependency → tech label _NPM_MARKERS = { "vue": "vue3", "nuxt": "nuxt3", "react": "react", "next": "nextjs", "svelte": "svelte", "angular": "angular", "typescript": "typescript", "vite": "vite", "webpack": "webpack", "express": "express", "fastify": "fastify", "koa": "koa", "pinia": "pinia", "vuex": "vuex", "redux": "redux", "tailwindcss": "tailwind", "prisma": "prisma", "drizzle-orm": "drizzle", "pg": "postgresql", "mysql2": "mysql", "better-sqlite3": "sqlite", "axios": "axios", "puppeteer": "puppeteer", "playwright": "playwright", } # Config files → tech label _FILE_MARKERS = { "nuxt.config.ts": "nuxt3", "nuxt.config.js": "nuxt3", "vite.config.ts": "vite", "vite.config.js": "vite", "tsconfig.json": "typescript", "tailwind.config.js": "tailwind", "tailwind.config.ts": "tailwind", "docker-compose.yml": "docker", "docker-compose.yaml": "docker", "Dockerfile": "docker", "go.mod": "go", "Cargo.toml": "rust", "requirements.txt": "python", "pyproject.toml": "python", "setup.py": "python", "Pipfile": "python", ".eslintrc.js": "eslint", ".prettierrc": "prettier", } _SKIP_DIRS = {"node_modules", ".git", "dist", ".next", ".nuxt", "__pycache__", ".venv", "venv"} def detect_tech_stack(project_path: Path) -> list[str]: """Detect tech stack from project files. Searches recursively up to depth 3, skipping node_modules/.git/dist. Falls back to CLAUDE.md heuristics if no files found. """ stack: set[str] = set() # Recursive search for config files and package.json (depth ≤ 3) for fpath in _walk_files(project_path, max_depth=3): fname = fpath.name if fname in _FILE_MARKERS: stack.add(_FILE_MARKERS[fname]) if fname == "package.json": stack.update(_parse_package_json(fpath)) if fname == "requirements.txt": stack.update(_parse_requirements_txt(fpath)) if fname == "go.mod": stack.add("go") try: text = fpath.read_text(errors="replace") if "gin-gonic" in text: stack.add("gin") if "fiber" in text: stack.add("fiber") except OSError: pass # Fallback: extract tech hints from CLAUDE.md if no config files found if not stack: stack.update(_detect_stack_from_claude_md(project_path)) return sorted(stack) # CLAUDE.md text → tech labels (for fallback when project files are on a remote server) _CLAUDE_MD_TECH_HINTS = { r"(?i)vue[\s.]?3": "vue3", r"(?i)vue[\s.]?2": "vue2", r"(?i)\bnuxt\b": "nuxt3", r"(?i)\breact\b": "react", r"(?i)\btypescript\b": "typescript", r"(?i)\bvite\b": "vite", r"(?i)\btailwind": "tailwind", r"(?i)node\.?js": "nodejs", r"(?i)\bexpress\b": "express", r"(?i)postgresql|postgres": "postgresql", r"(?i)\bsqlite\b": "sqlite", r"(?i)\bmysql\b": "mysql", r"(?i)\bdocker\b": "docker", r"(?i)\bpython\b": "python", r"(?i)\bfastapi\b": "fastapi", r"(?i)\bdjango\b": "django", r"(?i)\bflask\b": "flask", r"(?i)\bgo\b.*(?:gin|fiber|module)": "go", r"(?i)\bnginx\b": "nginx", r"(?i)\bpinia\b": "pinia", r"(?i)\bvuex\b": "vuex", } def _detect_stack_from_claude_md(project_path: Path) -> list[str]: """Fallback: infer tech stack from CLAUDE.md text when no config files exist.""" claude_md = project_path / "CLAUDE.md" if not claude_md.exists(): return [] try: text = claude_md.read_text(errors="replace")[:5000] # First 5KB is enough except OSError: return [] stack = [] for pattern, tech in _CLAUDE_MD_TECH_HINTS.items(): if re.search(pattern, text): stack.append(tech) return stack def _walk_files(root: Path, max_depth: int = 3, _depth: int = 0): """Yield files up to max_depth, skipping node_modules/dist/.git.""" if _depth > max_depth: return try: entries = sorted(root.iterdir()) except (OSError, PermissionError): return for entry in entries: if entry.is_file(): yield entry elif entry.is_dir() and entry.name not in _SKIP_DIRS and not entry.name.startswith("."): yield from _walk_files(entry, max_depth, _depth + 1) def _parse_package_json(path: Path) -> list[str]: """Extract tech labels from package.json.""" try: data = json.loads(path.read_text(errors="replace")) except (json.JSONDecodeError, OSError): return [] stack = [] all_deps = {} for key in ("dependencies", "devDependencies"): all_deps.update(data.get(key, {})) for dep_name, tech in _NPM_MARKERS.items(): if dep_name in all_deps: stack.append(tech) return stack def _parse_requirements_txt(path: Path) -> list[str]: """Extract tech labels from requirements.txt.""" markers = { "fastapi": "fastapi", "flask": "flask", "django": "django", "sqlalchemy": "sqlalchemy", "celery": "celery", "redis": "redis", "pydantic": "pydantic", "click": "click", "pytest": "pytest", } stack = [] try: text = path.read_text(errors="replace").lower() except OSError: return stack for pkg, tech in markers.items(): if pkg in text: stack.append(tech) return stack def _is_inside_node_modules(path: Path, root: Path) -> bool: rel = path.relative_to(root) return "node_modules" in rel.parts # --------------------------------------------------------------------------- # Module detection # --------------------------------------------------------------------------- _FRONTEND_EXTS = {".vue", ".jsx", ".tsx", ".svelte"} _BACKEND_MARKERS = {"express", "fastify", "koa", "router", "controller", "middleware"} def detect_modules(project_path: Path) -> list[dict]: """Scan for modules: checks root subdirs, */src/ patterns, standard names. Strategy: 1. Find all "source root" dirs (src/, app/, lib/ at root or inside top-level dirs) 2. Each first-level subdir of a source root = a module candidate 3. Top-level dirs with their own src/ are treated as component roots (e.g. frontend/, backend-pg/) — scan THEIR src/ for modules """ modules = [] scan_dirs: list[tuple[Path, str | None]] = [] # (dir, prefix_hint) # Direct source dirs in root for name in ("src", "app", "lib"): d = project_path / name if d.is_dir(): scan_dirs.append((d, None)) # Top-level component dirs (frontend/, backend/, backend-pg/, server/, client/) # These get scanned for src/ inside, or directly if they contain source files for child in sorted(project_path.iterdir()): if not child.is_dir() or child.name in _SKIP_DIRS or child.name.startswith("."): continue child_src = child / "src" if child_src.is_dir(): # e.g. frontend/src/, backend-pg/src/ — scan their subdirs scan_dirs.append((child_src, child.name)) elif child.name in ("frontend", "backend", "server", "client", "web", "api"): # No src/ but it's a known component dir — scan it directly scan_dirs.append((child, child.name)) seen = set() for scan_dir, prefix in scan_dirs: for child in sorted(scan_dir.iterdir()): if not child.is_dir() or child.name in _SKIP_DIRS or child.name.startswith("."): continue mod = _analyze_module(child, project_path) key = mod["name"] if key not in seen: seen.add(key) modules.append(mod) return modules def _analyze_module(dir_path: Path, project_root: Path) -> dict: """Analyze a directory to determine module type and file count.""" rel_path = str(dir_path.relative_to(project_root)) + "/" files = list(dir_path.rglob("*")) source_files = [f for f in files if f.is_file() and not f.name.startswith(".")] file_count = len(source_files) # Determine type exts = {f.suffix for f in source_files} mod_type = _guess_module_type(dir_path, exts, source_files) return { "name": dir_path.name, "type": mod_type, "path": rel_path, "file_count": file_count, } def _guess_module_type(dir_path: Path, exts: set[str], files: list[Path]) -> str: """Guess if module is frontend, backend, shared, or infra.""" # Obvious frontend if exts & _FRONTEND_EXTS: return "frontend" # Check file contents for backend markers has_backend_marker = False for f in files[:20]: # Sample first 20 files if f.suffix in (".ts", ".js", ".mjs"): try: text = f.read_text(errors="replace")[:2000] text_lower = text.lower() if any(m in text_lower for m in _BACKEND_MARKERS): has_backend_marker = True break except OSError: continue if has_backend_marker: return "backend" # Infra patterns name = dir_path.name.lower() if name in ("infra", "deploy", "scripts", "ci", "docker", "nginx", "config"): return "infra" # Shared by default if ambiguous if exts & {".ts", ".js", ".py"}: return "shared" return "shared" # --------------------------------------------------------------------------- # Decisions from CLAUDE.md # --------------------------------------------------------------------------- _DECISION_PATTERNS = [ (r"(?i)\b(GOTCHA|ВАЖНО|WARNING|ВНИМАНИЕ)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "gotcha"), (r"(?i)\b(WORKAROUND|ОБХОДНОЙ|ХАК)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "workaround"), (r"(?i)\b(FIXME|БАГИ?)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "gotcha"), (r"(?i)\b(РЕШЕНИЕ|DECISION)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "decision"), (r"(?i)\b(CONVENTION|СОГЛАШЕНИЕ|ПРАВИЛО)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "convention"), ] # Section headers that likely contain decisions _DECISION_SECTIONS = [ r"(?i)known\s+issues?", r"(?i)workaround", r"(?i)gotcha", r"(?i)решени[яе]", r"(?i)грабл[ия]", r"(?i)conventions?", r"(?i)правила", r"(?i)нюансы", ] # Section headers about UNRELATED services — skip these entirely _UNRELATED_SECTION_PATTERNS = [ r"(?i)jitsi", r"(?i)nextcloud", r"(?i)prosody", r"(?i)coturn", r"(?i)turn\b", r"(?i)asterisk", r"(?i)ghost\s+блог", r"(?i)onlyoffice", r"(?i)git\s+sync", r"(?i)\.env\s+добав", r"(?i)goip\s+watcher", r"(?i)tbank\s+monitor", # monitoring services r"(?i)фикс\s+удален", # commit-level fixes (not decisions) ] # Noise patterns — individual items that look like noise, not decisions _NOISE_PATTERNS = [ r"^[0-9a-f]{6,40}$", # commit hashes r"^\s*(docker|ssh|scp|git|curl|sudo)\s", # shell commands r"^`[^`]+`$", # inline code-only items r"(?i)(prosody|jitsi|jicofo|jvb|coturn|nextcloud|onlyoffice|ghost)", # unrelated services r"(?i)\.jitsi-meet-cfg", # jitsi config paths r"(?i)(meet\.jitsi|sitemeet\.org)", # jitsi domains r"(?i)(cloud\.vault\.red|office\.vault)", # nextcloud domains r"(?i)JWT_APP_(ID|SECRET)", # jwt config lines r"(?i)XMPP_", # prosody config r"\(коммит\s+`?[0-9a-f]+`?\)", # "(коммит `a33c2b9`)" references r"(?i)known_uids|idle_loop|reconnect", # goip-watcher internals ] def _is_noise(text: str) -> bool: """Check if a decision candidate is noise.""" # Clean markdown bold for matching clean = re.sub(r"\*\*([^*]*)\*\*", r"\1", text).strip() return any(re.search(p, clean) for p in _NOISE_PATTERNS) def _split_into_sections(text: str) -> list[tuple[str, str]]: """Split markdown into (header, body) pairs by ## headers. Returns list of (header_text, body_text) tuples. Anything before the first ## is returned with header="". """ parts = re.split(r"(?m)^(##\s+.+)$", text) sections = [] current_header = "" current_body = parts[0] if parts else "" for i in range(1, len(parts), 2): if current_header or current_body.strip(): sections.append((current_header, current_body)) current_header = parts[i].strip() current_body = parts[i + 1] if i + 1 < len(parts) else "" if current_header or current_body.strip(): sections.append((current_header, current_body)) return sections def _is_unrelated_section(header: str) -> bool: """Check if a section header is about an unrelated service.""" return any(re.search(p, header) for p in _UNRELATED_SECTION_PATTERNS) def extract_decisions_from_claude_md( project_path: Path, project_id: str | None = None, project_name: str | None = None, ) -> list[dict]: """Parse CLAUDE.md for decisions, gotchas, workarounds. Filters out: - Sections about unrelated services (Jitsi, Nextcloud, Prosody, etc.) - Noise: commit hashes, docker/ssh commands, paths to external services - If CLAUDE.md has multi-project sections, only extracts for current project """ claude_md = project_path / "CLAUDE.md" if not claude_md.exists(): return [] try: text = claude_md.read_text(errors="replace") except OSError: return [] # Split into sections and filter out unrelated ones sections = _split_into_sections(text) relevant_text = [] for header, body in sections: if _is_unrelated_section(header): continue relevant_text.append(header + "\n" + body) filtered_text = "\n".join(relevant_text) decisions = [] seen_titles = set() # Pattern-based extraction from relevant sections only for pattern, dec_type in _DECISION_PATTERNS: for m in re.finditer(pattern, filtered_text, re.DOTALL): body = m.group(2).strip() if not body or len(body) < 10: continue lines = body.split("\n") title = lines[0].strip().rstrip(".")[:100] desc = body if _is_noise(title) or _is_noise(desc): continue if title not in seen_titles: seen_titles.add(title) decisions.append({ "type": dec_type, "title": title, "description": desc, "category": _guess_category(title + " " + desc), }) # Section-based extraction: find ### or #### headers matching decision patterns sub_sections = re.split(r"(?m)^(#{1,4}\s+.*?)$", filtered_text) for i, section in enumerate(sub_sections): if any(re.search(pat, section) for pat in _DECISION_SECTIONS): if i + 1 < len(sub_sections): content = sub_sections[i + 1].strip() for line in content.split("\n"): line = line.strip() # Numbered items (1. **text**) or bullet items item = None if re.match(r"^\d+\.\s+", line): item = re.sub(r"^\d+\.\s+", "", line).strip() elif line.startswith(("- ", "* ", "• ")): item = line.lstrip("-*• ").strip() if not item or len(item) < 10: continue # Clean bold markers for title clean = re.sub(r"\*\*([^*]+)\*\*", r"\1", item) if _is_noise(clean): continue title = clean[:100] if title not in seen_titles: seen_titles.add(title) decisions.append({ "type": "gotcha", "title": title, "description": item, "category": _guess_category(item), }) return decisions def _guess_category(text: str) -> str: """Best-effort category guess from text content.""" t = text.lower() if any(w in t for w in ("css", "ui", "vue", "компонент", "стил", "layout", "mobile", "safari", "bottom-sheet")): return "ui" if any(w in t for w in ("api", "endpoint", "rest", "route", "запрос", "fetch")): return "api" if any(w in t for w in ("sql", "база", "миграц", "postgres", "sqlite", "бд", "schema")): return "architecture" if any(w in t for w in ("безопас", "security", "xss", "auth", "token", "csrf", "injection")): return "security" if any(w in t for w in ("docker", "deploy", "nginx", "ci", "cd", "infra", "сервер")): return "devops" if any(w in t for w in ("performance", "cache", "оптимиз", "lazy", "скорость")): return "performance" return "architecture" # --------------------------------------------------------------------------- # Obsidian vault scanning # --------------------------------------------------------------------------- def find_vault_root(vault_path: Path | None = None) -> Path | None: """Find the Obsidian vault root directory. If vault_path is given but doesn't exist, returns None (don't fallback). If vault_path is None, tries the default iCloud Obsidian location. """ if vault_path is not None: return vault_path if vault_path.is_dir() else None # Default: iCloud Obsidian path default = DEFAULT_VAULT if default.is_dir(): # Look for a vault inside (usually one level deep) for child in default.iterdir(): if child.is_dir() and not child.name.startswith("."): return child return None def scan_obsidian( vault_root: Path, project_id: str, project_name: str, project_dir_name: str | None = None, ) -> dict: """Scan Obsidian vault for project-related notes. Returns {"tasks": [...], "decisions": [...], "files_scanned": int} """ result = {"tasks": [], "decisions": [], "files_scanned": 0} # Build search terms search_terms = {project_id.lower()} if project_name: search_terms.add(project_name.lower()) if project_dir_name: search_terms.add(project_dir_name.lower()) # Find project folder in vault project_files: list[Path] = [] for term in list(search_terms): for child in vault_root.iterdir(): if child.is_dir() and term in child.name.lower(): for f in child.rglob("*.md"): if f not in project_files: project_files.append(f) # Also search for files mentioning the project by name for md_file in vault_root.glob("*.md"): try: text = md_file.read_text(errors="replace")[:5000].lower() except OSError: continue if any(term in text for term in search_terms): if md_file not in project_files: project_files.append(md_file) result["files_scanned"] = len(project_files) for f in project_files: try: text = f.read_text(errors="replace") except OSError: continue _extract_obsidian_tasks(text, f.stem, result["tasks"]) _extract_obsidian_decisions(text, f.stem, result["decisions"]) return result def _extract_obsidian_tasks(text: str, source: str, tasks: list[dict]): """Extract checkbox items from Obsidian markdown.""" for m in re.finditer(r"^[-*]\s+\[([ xX])\]\s+(.+)$", text, re.MULTILINE): done = m.group(1).lower() == "x" title = m.group(2).strip() # Remove Obsidian wiki-links title = re.sub(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]", r"\1", title) if len(title) > 5: tasks.append({ "title": title[:200], "done": done, "source": source, }) def _extract_obsidian_decisions(text: str, source: str, decisions: list[dict]): """Extract decisions/gotchas from Obsidian notes.""" for pattern, dec_type in _DECISION_PATTERNS: for m in re.finditer(pattern, text, re.DOTALL): body = m.group(2).strip() if not body or len(body) < 10: continue title = body.split("\n")[0].strip()[:100] if _is_noise(title) or _is_noise(body): continue decisions.append({ "type": dec_type, "title": title, "description": body, "category": _guess_category(body), "source": source, }) # Also look for ВАЖНО/GOTCHA/FIXME inline markers not caught above for m in re.finditer(r"(?i)\*\*(ВАЖНО|GOTCHA|FIXME)\*\*[:\s]*(.*?)(?=\n|$)", text): body = m.group(2).strip() if not body or len(body) < 10: continue if _is_noise(body): continue decisions.append({ "type": "gotcha", "title": body[:100], "description": body, "category": _guess_category(body), "source": source, }) # --------------------------------------------------------------------------- # Formatting for CLI preview # --------------------------------------------------------------------------- def format_preview( project_id: str, name: str, path: str, tech_stack: list[str], modules: list[dict], decisions: list[dict], obsidian: dict | None = None, ) -> str: """Format bootstrap results for user review.""" lines = [ f"Project: {project_id} — {name}", f"Path: {path}", "", f"Tech stack: {', '.join(tech_stack) if tech_stack else '(not detected)'}", "", ] if modules: lines.append(f"Modules ({len(modules)}):") for m in modules: lines.append(f" {m['name']} ({m['type']}) — {m['path']} ({m['file_count']} files)") else: lines.append("Modules: (none detected)") lines.append("") if decisions: lines.append(f"Decisions from CLAUDE.md ({len(decisions)}):") for i, d in enumerate(decisions, 1): lines.append(f" #{i} {d['type']}: {d['title']}") else: lines.append("Decisions from CLAUDE.md: (none found)") if obsidian: lines.append("") lines.append(f"Obsidian vault ({obsidian['files_scanned']} files scanned):") if obsidian["tasks"]: pending = [t for t in obsidian["tasks"] if not t["done"]] done = [t for t in obsidian["tasks"] if t["done"]] lines.append(f" Tasks: {len(pending)} pending, {len(done)} done") for t in pending[:10]: lines.append(f" [ ] {t['title']}") if len(pending) > 10: lines.append(f" ... and {len(pending) - 10} more") for t in done[:5]: lines.append(f" [x] {t['title']}") if len(done) > 5: lines.append(f" ... and {len(done) - 5} more done") else: lines.append(" Tasks: (none found)") if obsidian["decisions"]: lines.append(f" Decisions: {len(obsidian['decisions'])}") for d in obsidian["decisions"][:5]: lines.append(f" {d['type']}: {d['title']} (from {d['source']})") if len(obsidian["decisions"]) > 5: lines.append(f" ... and {len(obsidian['decisions']) - 5} more") else: lines.append(" Decisions: (none found)") return "\n".join(lines) # --------------------------------------------------------------------------- # Write to DB # --------------------------------------------------------------------------- def save_to_db( conn, project_id: str, name: str, path: str, tech_stack: list[str], modules: list[dict], decisions: list[dict], obsidian: dict | None = None, ): """Save all bootstrap data to kin.db via models.""" from core import models # Create project claude_md = Path(path).expanduser() / "CLAUDE.md" models.create_project( conn, project_id, name, path, tech_stack=tech_stack, claude_md_path=str(claude_md) if claude_md.exists() else None, ) # Add modules for m in modules: models.add_module( conn, project_id, m["name"], m["type"], m["path"], description=f"{m['file_count']} files", ) # Add decisions from CLAUDE.md for d in decisions: models.add_decision( conn, project_id, d["type"], d["title"], d["description"], category=d.get("category"), ) # Add Obsidian decisions if obsidian: for d in obsidian.get("decisions", []): models.add_decision( conn, project_id, d["type"], d["title"], d["description"], category=d.get("category"), tags=[f"obsidian:{d['source']}"], ) # Import Obsidian tasks task_num = 1 for t in obsidian.get("tasks", []): task_id = f"{project_id.upper()}-OBS-{task_num:03d}" status = "done" if t["done"] else "pending" models.create_task( conn, task_id, project_id, t["title"], status=status, brief={"source": f"obsidian:{t['source']}"}, ) task_num += 1