kin/agents/bootstrap.py

712 lines
26 KiB
Python
Raw Normal View History

"""
Kin bootstrap auto-detect project tech stack, modules, and decisions.
Scans project directory, CLAUDE.md, and optionally Obsidian vault.
Writes results to kin.db via core.models.
"""
import json
import re
from pathlib import Path
from typing import Any
DEFAULT_VAULT = Path.home() / "Library" / "Mobile Documents" / "iCloud~md~obsidian" / "Documents"
# ---------------------------------------------------------------------------
# Tech stack detection
# ---------------------------------------------------------------------------
# package.json dependency → tech label
_NPM_MARKERS = {
"vue": "vue3", "nuxt": "nuxt3", "react": "react", "next": "nextjs",
"svelte": "svelte", "angular": "angular",
"typescript": "typescript", "vite": "vite", "webpack": "webpack",
"express": "express", "fastify": "fastify", "koa": "koa",
"pinia": "pinia", "vuex": "vuex", "redux": "redux",
"tailwindcss": "tailwind", "prisma": "prisma", "drizzle-orm": "drizzle",
"pg": "postgresql", "mysql2": "mysql", "better-sqlite3": "sqlite",
"axios": "axios", "puppeteer": "puppeteer", "playwright": "playwright",
}
# Config files → tech label
_FILE_MARKERS = {
"nuxt.config.ts": "nuxt3", "nuxt.config.js": "nuxt3",
"vite.config.ts": "vite", "vite.config.js": "vite",
"tsconfig.json": "typescript",
"tailwind.config.js": "tailwind", "tailwind.config.ts": "tailwind",
"docker-compose.yml": "docker", "docker-compose.yaml": "docker",
"Dockerfile": "docker",
"go.mod": "go", "Cargo.toml": "rust",
"requirements.txt": "python", "pyproject.toml": "python",
"setup.py": "python", "Pipfile": "python",
".eslintrc.js": "eslint", ".prettierrc": "prettier",
}
_SKIP_DIRS = {"node_modules", ".git", "dist", ".next", ".nuxt", "__pycache__", ".venv", "venv"}
def detect_tech_stack(project_path: Path) -> list[str]:
"""Detect tech stack from project files.
Searches recursively up to depth 3, skipping node_modules/.git/dist.
Falls back to CLAUDE.md heuristics if no files found.
"""
stack: set[str] = set()
# Recursive search for config files and package.json (depth ≤ 3)
for fpath in _walk_files(project_path, max_depth=3):
fname = fpath.name
if fname in _FILE_MARKERS:
stack.add(_FILE_MARKERS[fname])
if fname == "package.json":
stack.update(_parse_package_json(fpath))
if fname == "requirements.txt":
stack.update(_parse_requirements_txt(fpath))
if fname == "go.mod":
stack.add("go")
try:
text = fpath.read_text(errors="replace")
if "gin-gonic" in text:
stack.add("gin")
if "fiber" in text:
stack.add("fiber")
except OSError:
pass
# Fallback: extract tech hints from CLAUDE.md if no config files found
if not stack:
stack.update(_detect_stack_from_claude_md(project_path))
return sorted(stack)
# CLAUDE.md text → tech labels (for fallback when project files are on a remote server)
_CLAUDE_MD_TECH_HINTS = {
r"(?i)vue[\s.]?3": "vue3", r"(?i)vue[\s.]?2": "vue2",
r"(?i)\bnuxt\b": "nuxt3", r"(?i)\breact\b": "react",
r"(?i)\btypescript\b": "typescript", r"(?i)\bvite\b": "vite",
r"(?i)\btailwind": "tailwind",
r"(?i)node\.?js": "nodejs", r"(?i)\bexpress\b": "express",
r"(?i)postgresql|postgres": "postgresql",
r"(?i)\bsqlite\b": "sqlite", r"(?i)\bmysql\b": "mysql",
r"(?i)\bdocker\b": "docker",
r"(?i)\bpython\b": "python", r"(?i)\bfastapi\b": "fastapi",
r"(?i)\bdjango\b": "django", r"(?i)\bflask\b": "flask",
r"(?i)\bgo\b.*(?:gin|fiber|module)": "go",
r"(?i)\bnginx\b": "nginx",
r"(?i)\bpinia\b": "pinia", r"(?i)\bvuex\b": "vuex",
}
def _detect_stack_from_claude_md(project_path: Path) -> list[str]:
"""Fallback: infer tech stack from CLAUDE.md text when no config files exist."""
claude_md = project_path / "CLAUDE.md"
if not claude_md.exists():
return []
try:
text = claude_md.read_text(errors="replace")[:5000] # First 5KB is enough
except OSError:
return []
stack = []
for pattern, tech in _CLAUDE_MD_TECH_HINTS.items():
if re.search(pattern, text):
stack.append(tech)
return stack
def _walk_files(root: Path, max_depth: int = 3, _depth: int = 0):
"""Yield files up to max_depth, skipping node_modules/dist/.git."""
if _depth > max_depth:
return
try:
entries = sorted(root.iterdir())
except (OSError, PermissionError):
return
for entry in entries:
if entry.is_file():
yield entry
elif entry.is_dir() and entry.name not in _SKIP_DIRS and not entry.name.startswith("."):
yield from _walk_files(entry, max_depth, _depth + 1)
def _parse_package_json(path: Path) -> list[str]:
"""Extract tech labels from package.json."""
try:
data = json.loads(path.read_text(errors="replace"))
except (json.JSONDecodeError, OSError):
return []
stack = []
all_deps = {}
for key in ("dependencies", "devDependencies"):
all_deps.update(data.get(key, {}))
for dep_name, tech in _NPM_MARKERS.items():
if dep_name in all_deps:
stack.append(tech)
return stack
def _parse_requirements_txt(path: Path) -> list[str]:
"""Extract tech labels from requirements.txt."""
markers = {
"fastapi": "fastapi", "flask": "flask", "django": "django",
"sqlalchemy": "sqlalchemy", "celery": "celery", "redis": "redis",
"pydantic": "pydantic", "click": "click", "pytest": "pytest",
}
stack = []
try:
text = path.read_text(errors="replace").lower()
except OSError:
return stack
for pkg, tech in markers.items():
if pkg in text:
stack.append(tech)
return stack
def _is_inside_node_modules(path: Path, root: Path) -> bool:
rel = path.relative_to(root)
return "node_modules" in rel.parts
# ---------------------------------------------------------------------------
# Module detection
# ---------------------------------------------------------------------------
_FRONTEND_EXTS = {".vue", ".jsx", ".tsx", ".svelte"}
_BACKEND_MARKERS = {"express", "fastify", "koa", "router", "controller", "middleware"}
def detect_modules(project_path: Path) -> list[dict]:
"""Scan for modules: checks root subdirs, */src/ patterns, standard names.
Strategy:
1. Find all "source root" dirs (src/, app/, lib/ at root or inside top-level dirs)
2. Each first-level subdir of a source root = a module candidate
3. Top-level dirs with their own src/ are treated as component roots
(e.g. frontend/, backend-pg/) scan THEIR src/ for modules
"""
modules = []
scan_dirs: list[tuple[Path, str | None]] = [] # (dir, prefix_hint)
# Direct source dirs in root
for name in ("src", "app", "lib"):
d = project_path / name
if d.is_dir():
scan_dirs.append((d, None))
# Top-level component dirs (frontend/, backend/, backend-pg/, server/, client/)
# These get scanned for src/ inside, or directly if they contain source files
for child in sorted(project_path.iterdir()):
if not child.is_dir() or child.name in _SKIP_DIRS or child.name.startswith("."):
continue
child_src = child / "src"
if child_src.is_dir():
# e.g. frontend/src/, backend-pg/src/ — scan their subdirs
scan_dirs.append((child_src, child.name))
elif child.name in ("frontend", "backend", "server", "client", "web", "api"):
# No src/ but it's a known component dir — scan it directly
scan_dirs.append((child, child.name))
seen = set()
for scan_dir, prefix in scan_dirs:
for child in sorted(scan_dir.iterdir()):
if not child.is_dir() or child.name in _SKIP_DIRS or child.name.startswith("."):
continue
mod = _analyze_module(child, project_path)
key = (mod["name"], mod["path"])
if key not in seen:
seen.add(key)
modules.append(mod)
return modules
def _analyze_module(dir_path: Path, project_root: Path) -> dict:
"""Analyze a directory to determine module type and file count."""
rel_path = str(dir_path.relative_to(project_root)) + "/"
files = list(dir_path.rglob("*"))
source_files = [f for f in files if f.is_file() and not f.name.startswith(".")]
file_count = len(source_files)
# Determine type
exts = {f.suffix for f in source_files}
mod_type = _guess_module_type(dir_path, exts, source_files)
return {
"name": dir_path.name,
"type": mod_type,
"path": rel_path,
"file_count": file_count,
}
def _guess_module_type(dir_path: Path, exts: set[str], files: list[Path]) -> str:
"""Guess if module is frontend, backend, shared, or infra."""
# Obvious frontend
if exts & _FRONTEND_EXTS:
return "frontend"
# Check file contents for backend markers
has_backend_marker = False
for f in files[:20]: # Sample first 20 files
if f.suffix in (".ts", ".js", ".mjs"):
try:
text = f.read_text(errors="replace")[:2000]
text_lower = text.lower()
if any(m in text_lower for m in _BACKEND_MARKERS):
has_backend_marker = True
break
except OSError:
continue
if has_backend_marker:
return "backend"
# Infra patterns
name = dir_path.name.lower()
if name in ("infra", "deploy", "scripts", "ci", "docker", "nginx", "config"):
return "infra"
# Shared by default if ambiguous
if exts & {".ts", ".js", ".py"}:
return "shared"
return "shared"
# ---------------------------------------------------------------------------
# Decisions from CLAUDE.md
# ---------------------------------------------------------------------------
_DECISION_PATTERNS = [
(r"(?i)\b(GOTCHA|ВАЖНО|WARNING|ВНИМАНИЕ)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "gotcha"),
(r"(?i)\b(WORKAROUND|ОБХОДНОЙ|ХАК)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "workaround"),
(r"(?i)\b(FIXME|БАГИ?)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "gotcha"),
(r"(?i)\b(РЕШЕНИЕ|DECISION)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "decision"),
(r"(?i)\b(CONVENTION|СОГЛАШЕНИЕ|ПРАВИЛО)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "convention"),
]
# Section headers that likely contain decisions
_DECISION_SECTIONS = [
r"(?i)known\s+issues?", r"(?i)workaround", r"(?i)gotcha",
r"(?i)решени[яе]", r"(?i)грабл[ия]",
r"(?i)conventions?", r"(?i)правила", r"(?i)нюансы",
]
# Section headers about UNRELATED services — skip these entirely
_UNRELATED_SECTION_PATTERNS = [
r"(?i)jitsi", r"(?i)nextcloud", r"(?i)prosody",
r"(?i)coturn", r"(?i)turn\b", r"(?i)asterisk",
r"(?i)ghost\s+блог", r"(?i)onlyoffice",
r"(?i)git\s+sync", r"(?i)\.env\s+добав",
r"(?i)goip\s+watcher", r"(?i)tbank\s+monitor", # monitoring services
r"(?i)фикс\s+удален", # commit-level fixes (not decisions)
]
# Noise patterns — individual items that look like noise, not decisions
_NOISE_PATTERNS = [
r"^[0-9a-f]{6,40}$", # commit hashes
r"^\s*(docker|ssh|scp|git|curl|sudo)\s", # shell commands
r"^`[^`]+`$", # inline code-only items
r"(?i)(prosody|jitsi|jicofo|jvb|coturn|nextcloud|onlyoffice|ghost)", # unrelated services
r"(?i)\.jitsi-meet-cfg", # jitsi config paths
r"(?i)(meet\.jitsi|sitemeet\.org)", # jitsi domains
r"(?i)(cloud\.vault\.red|office\.vault)", # nextcloud domains
r"(?i)JWT_APP_(ID|SECRET)", # jwt config lines
r"(?i)XMPP_", # prosody config
r"\(коммит\s+`?[0-9a-f]+`?\)", # "(коммит `a33c2b9`)" references
r"(?i)known_uids|idle_loop|reconnect", # goip-watcher internals
]
def _is_noise(text: str) -> bool:
"""Check if a decision candidate is noise."""
# Clean markdown bold for matching
clean = re.sub(r"\*\*([^*]*)\*\*", r"\1", text).strip()
return any(re.search(p, clean) for p in _NOISE_PATTERNS)
def _split_into_sections(text: str) -> list[tuple[str, str]]:
"""Split markdown into (header, body) pairs by ## headers.
Returns list of (header_text, body_text) tuples.
Anything before the first ## is returned with header="".
"""
parts = re.split(r"(?m)^(##\s+.+)$", text)
sections = []
current_header = ""
current_body = parts[0] if parts else ""
for i in range(1, len(parts), 2):
if current_header or current_body.strip():
sections.append((current_header, current_body))
current_header = parts[i].strip()
current_body = parts[i + 1] if i + 1 < len(parts) else ""
if current_header or current_body.strip():
sections.append((current_header, current_body))
return sections
def _is_unrelated_section(header: str) -> bool:
"""Check if a section header is about an unrelated service."""
return any(re.search(p, header) for p in _UNRELATED_SECTION_PATTERNS)
def extract_decisions_from_claude_md(
project_path: Path,
project_id: str | None = None,
project_name: str | None = None,
) -> list[dict]:
"""Parse CLAUDE.md for decisions, gotchas, workarounds.
Filters out:
- Sections about unrelated services (Jitsi, Nextcloud, Prosody, etc.)
- Noise: commit hashes, docker/ssh commands, paths to external services
- If CLAUDE.md has multi-project sections, only extracts for current project
"""
claude_md = project_path / "CLAUDE.md"
if not claude_md.exists():
return []
try:
text = claude_md.read_text(errors="replace")
except OSError:
return []
# Split into sections and filter out unrelated ones
sections = _split_into_sections(text)
relevant_text = []
for header, body in sections:
if _is_unrelated_section(header):
continue
relevant_text.append(header + "\n" + body)
filtered_text = "\n".join(relevant_text)
decisions = []
seen_titles = set()
# Pattern-based extraction from relevant sections only
for pattern, dec_type in _DECISION_PATTERNS:
for m in re.finditer(pattern, filtered_text, re.DOTALL):
body = m.group(2).strip()
if not body or len(body) < 10:
continue
lines = body.split("\n")
title = lines[0].strip().rstrip(".")[:100]
desc = body
if _is_noise(title) or _is_noise(desc):
continue
if title not in seen_titles:
seen_titles.add(title)
decisions.append({
"type": dec_type,
"title": title,
"description": desc,
"category": _guess_category(title + " " + desc),
})
# Section-based extraction: find ### or #### headers matching decision patterns
sub_sections = re.split(r"(?m)^(#{1,4}\s+.*?)$", filtered_text)
for i, section in enumerate(sub_sections):
if any(re.search(pat, section) for pat in _DECISION_SECTIONS):
if i + 1 < len(sub_sections):
content = sub_sections[i + 1].strip()
for line in content.split("\n"):
line = line.strip()
# Numbered items (1. **text**) or bullet items
item = None
if re.match(r"^\d+\.\s+", line):
item = re.sub(r"^\d+\.\s+", "", line).strip()
elif line.startswith(("- ", "* ", "")):
item = line.lstrip("-*• ").strip()
if not item or len(item) < 10:
continue
# Clean bold markers for title
clean = re.sub(r"\*\*([^*]+)\*\*", r"\1", item)
if _is_noise(clean):
continue
title = clean[:100]
if title not in seen_titles:
seen_titles.add(title)
decisions.append({
"type": "gotcha",
"title": title,
"description": item,
"category": _guess_category(item),
})
return decisions
def _guess_category(text: str) -> str:
"""Best-effort category guess from text content."""
t = text.lower()
if any(w in t for w in ("css", "ui", "vue", "компонент", "стил", "layout", "mobile", "safari", "bottom-sheet")):
return "ui"
if any(w in t for w in ("api", "endpoint", "rest", "route", "запрос", "fetch")):
return "api"
if any(w in t for w in ("sql", "база", "миграц", "postgres", "sqlite", "бд", "schema")):
return "architecture"
if any(w in t for w in ("безопас", "security", "xss", "auth", "token", "csrf", "injection")):
return "security"
if any(w in t for w in ("docker", "deploy", "nginx", "ci", "cd", "infra", "сервер")):
return "devops"
if any(w in t for w in ("performance", "cache", "оптимиз", "lazy", "скорость")):
return "performance"
return "architecture"
# ---------------------------------------------------------------------------
# Obsidian vault scanning
# ---------------------------------------------------------------------------
def find_vault_root(vault_path: Path | None = None) -> Path | None:
"""Find the Obsidian vault root directory.
If vault_path is given but doesn't exist, returns None (don't fallback).
If vault_path is None, tries the default iCloud Obsidian location.
"""
if vault_path is not None:
return vault_path if vault_path.is_dir() else None
# Default: iCloud Obsidian path
default = DEFAULT_VAULT
if default.is_dir():
# Look for a vault inside (usually one level deep)
for child in default.iterdir():
if child.is_dir() and not child.name.startswith("."):
return child
return None
def scan_obsidian(
vault_root: Path,
project_id: str,
project_name: str,
project_dir_name: str | None = None,
) -> dict:
"""Scan Obsidian vault for project-related notes.
Returns {"tasks": [...], "decisions": [...], "files_scanned": int}
"""
result = {"tasks": [], "decisions": [], "files_scanned": 0}
# Build search terms
search_terms = {project_id.lower()}
if project_name:
search_terms.add(project_name.lower())
if project_dir_name:
search_terms.add(project_dir_name.lower())
# Find project folder in vault
project_files: list[Path] = []
for term in list(search_terms):
for child in vault_root.iterdir():
if child.is_dir() and term in child.name.lower():
for f in child.rglob("*.md"):
if f not in project_files:
project_files.append(f)
# Also search for files mentioning the project by name
for md_file in vault_root.glob("*.md"):
try:
text = md_file.read_text(errors="replace")[:5000].lower()
except OSError:
continue
if any(term in text for term in search_terms):
if md_file not in project_files:
project_files.append(md_file)
result["files_scanned"] = len(project_files)
for f in project_files:
try:
text = f.read_text(errors="replace")
except OSError:
continue
_extract_obsidian_tasks(text, f.stem, result["tasks"])
_extract_obsidian_decisions(text, f.stem, result["decisions"])
return result
def _extract_obsidian_tasks(text: str, source: str, tasks: list[dict]):
"""Extract checkbox items from Obsidian markdown."""
for m in re.finditer(r"^[-*]\s+\[([ xX])\]\s+(.+)$", text, re.MULTILINE):
done = m.group(1).lower() == "x"
title = m.group(2).strip()
# Remove Obsidian wiki-links
title = re.sub(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]", r"\1", title)
if len(title) > 5:
tasks.append({
"title": title[:200],
"done": done,
"source": source,
})
def _extract_obsidian_decisions(text: str, source: str, decisions: list[dict]):
"""Extract decisions/gotchas from Obsidian notes."""
for pattern, dec_type in _DECISION_PATTERNS:
for m in re.finditer(pattern, text, re.DOTALL):
body = m.group(2).strip()
if not body or len(body) < 10:
continue
title = body.split("\n")[0].strip()[:100]
if _is_noise(title) or _is_noise(body):
continue
decisions.append({
"type": dec_type,
"title": title,
"description": body,
"category": _guess_category(body),
"source": source,
})
# Also look for ВАЖНО/GOTCHA/FIXME inline markers not caught above
for m in re.finditer(r"(?i)\*\*(ВАЖНО|GOTCHA|FIXME)\*\*[:\s]*(.*?)(?=\n|$)", text):
body = m.group(2).strip()
if not body or len(body) < 10:
continue
if _is_noise(body):
continue
decisions.append({
"type": "gotcha",
"title": body[:100],
"description": body,
"category": _guess_category(body),
"source": source,
})
# ---------------------------------------------------------------------------
# Formatting for CLI preview
# ---------------------------------------------------------------------------
def format_preview(
project_id: str,
name: str,
path: str,
tech_stack: list[str],
modules: list[dict],
decisions: list[dict],
obsidian: dict | None = None,
) -> str:
"""Format bootstrap results for user review."""
lines = [
f"Project: {project_id}{name}",
f"Path: {path}",
"",
f"Tech stack: {', '.join(tech_stack) if tech_stack else '(not detected)'}",
"",
]
if modules:
lines.append(f"Modules ({len(modules)}):")
for m in modules:
lines.append(f" {m['name']} ({m['type']}) — {m['path']} ({m['file_count']} files)")
else:
lines.append("Modules: (none detected)")
lines.append("")
if decisions:
lines.append(f"Decisions from CLAUDE.md ({len(decisions)}):")
for i, d in enumerate(decisions, 1):
lines.append(f" #{i} {d['type']}: {d['title']}")
else:
lines.append("Decisions from CLAUDE.md: (none found)")
if obsidian:
lines.append("")
lines.append(f"Obsidian vault ({obsidian['files_scanned']} files scanned):")
if obsidian["tasks"]:
pending = [t for t in obsidian["tasks"] if not t["done"]]
done = [t for t in obsidian["tasks"] if t["done"]]
lines.append(f" Tasks: {len(pending)} pending, {len(done)} done")
for t in pending[:10]:
lines.append(f" [ ] {t['title']}")
if len(pending) > 10:
lines.append(f" ... and {len(pending) - 10} more")
for t in done[:5]:
lines.append(f" [x] {t['title']}")
if len(done) > 5:
lines.append(f" ... and {len(done) - 5} more done")
else:
lines.append(" Tasks: (none found)")
if obsidian["decisions"]:
lines.append(f" Decisions: {len(obsidian['decisions'])}")
for d in obsidian["decisions"][:5]:
lines.append(f" {d['type']}: {d['title']} (from {d['source']})")
if len(obsidian["decisions"]) > 5:
lines.append(f" ... and {len(obsidian['decisions']) - 5} more")
else:
lines.append(" Decisions: (none found)")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Write to DB
# ---------------------------------------------------------------------------
def save_to_db(
conn,
project_id: str,
name: str,
path: str,
tech_stack: list[str],
modules: list[dict],
decisions: list[dict],
obsidian: dict | None = None,
):
"""Save all bootstrap data to kin.db via models."""
from core import models
# Create project
claude_md = Path(path).expanduser() / "CLAUDE.md"
models.create_project(
conn, project_id, name, path,
tech_stack=tech_stack,
claude_md_path=str(claude_md) if claude_md.exists() else None,
)
# Add modules
for m in modules:
models.add_module(
conn, project_id, m["name"], m["type"], m["path"],
description=f"{m['file_count']} files",
)
# Add decisions from CLAUDE.md
for d in decisions:
models.add_decision(
conn, project_id, d["type"], d["title"], d["description"],
category=d.get("category"),
)
# Add Obsidian decisions
if obsidian:
for d in obsidian.get("decisions", []):
models.add_decision(
conn, project_id, d["type"], d["title"], d["description"],
category=d.get("category"),
tags=[f"obsidian:{d['source']}"],
)
# Import Obsidian tasks
task_num = 1
for t in obsidian.get("tasks", []):
task_id = f"{project_id.upper()}-OBS-{task_num:03d}"
status = "done" if t["done"] else "pending"
models.create_task(
conn, task_id, project_id, t["title"],
status=status,
brief={"source": f"obsidian:{t['source']}"},
)
task_num += 1