711 lines
26 KiB
Python
711 lines
26 KiB
Python
"""
|
||
Kin bootstrap — auto-detect project tech stack, modules, and decisions.
|
||
Scans project directory, CLAUDE.md, and optionally Obsidian vault.
|
||
Writes results to kin.db via core.models.
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
DEFAULT_VAULT = Path.home() / "Library" / "Mobile Documents" / "iCloud~md~obsidian" / "Documents"
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Tech stack detection
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# package.json dependency → tech label
|
||
_NPM_MARKERS = {
|
||
"vue": "vue3", "nuxt": "nuxt3", "react": "react", "next": "nextjs",
|
||
"svelte": "svelte", "angular": "angular",
|
||
"typescript": "typescript", "vite": "vite", "webpack": "webpack",
|
||
"express": "express", "fastify": "fastify", "koa": "koa",
|
||
"pinia": "pinia", "vuex": "vuex", "redux": "redux",
|
||
"tailwindcss": "tailwind", "prisma": "prisma", "drizzle-orm": "drizzle",
|
||
"pg": "postgresql", "mysql2": "mysql", "better-sqlite3": "sqlite",
|
||
"axios": "axios", "puppeteer": "puppeteer", "playwright": "playwright",
|
||
}
|
||
|
||
# Config files → tech label
|
||
_FILE_MARKERS = {
|
||
"nuxt.config.ts": "nuxt3", "nuxt.config.js": "nuxt3",
|
||
"vite.config.ts": "vite", "vite.config.js": "vite",
|
||
"tsconfig.json": "typescript",
|
||
"tailwind.config.js": "tailwind", "tailwind.config.ts": "tailwind",
|
||
"docker-compose.yml": "docker", "docker-compose.yaml": "docker",
|
||
"Dockerfile": "docker",
|
||
"go.mod": "go", "Cargo.toml": "rust",
|
||
"requirements.txt": "python", "pyproject.toml": "python",
|
||
"setup.py": "python", "Pipfile": "python",
|
||
".eslintrc.js": "eslint", ".prettierrc": "prettier",
|
||
}
|
||
|
||
|
||
_SKIP_DIRS = {"node_modules", ".git", "dist", ".next", ".nuxt", "__pycache__", ".venv", "venv"}
|
||
|
||
|
||
def detect_tech_stack(project_path: Path) -> list[str]:
|
||
"""Detect tech stack from project files.
|
||
|
||
Searches recursively up to depth 3, skipping node_modules/.git/dist.
|
||
Falls back to CLAUDE.md heuristics if no files found.
|
||
"""
|
||
stack: set[str] = set()
|
||
|
||
# Recursive search for config files and package.json (depth ≤ 3)
|
||
for fpath in _walk_files(project_path, max_depth=3):
|
||
fname = fpath.name
|
||
if fname in _FILE_MARKERS:
|
||
stack.add(_FILE_MARKERS[fname])
|
||
if fname == "package.json":
|
||
stack.update(_parse_package_json(fpath))
|
||
if fname == "requirements.txt":
|
||
stack.update(_parse_requirements_txt(fpath))
|
||
if fname == "go.mod":
|
||
stack.add("go")
|
||
try:
|
||
text = fpath.read_text(errors="replace")
|
||
if "gin-gonic" in text:
|
||
stack.add("gin")
|
||
if "fiber" in text:
|
||
stack.add("fiber")
|
||
except OSError:
|
||
pass
|
||
|
||
# Fallback: extract tech hints from CLAUDE.md if no config files found
|
||
if not stack:
|
||
stack.update(_detect_stack_from_claude_md(project_path))
|
||
|
||
return sorted(stack)
|
||
|
||
|
||
# CLAUDE.md text → tech labels (for fallback when project files are on a remote server)
|
||
_CLAUDE_MD_TECH_HINTS = {
|
||
r"(?i)vue[\s.]?3": "vue3", r"(?i)vue[\s.]?2": "vue2",
|
||
r"(?i)\bnuxt\b": "nuxt3", r"(?i)\breact\b": "react",
|
||
r"(?i)\btypescript\b": "typescript", r"(?i)\bvite\b": "vite",
|
||
r"(?i)\btailwind": "tailwind",
|
||
r"(?i)node\.?js": "nodejs", r"(?i)\bexpress\b": "express",
|
||
r"(?i)postgresql|postgres": "postgresql",
|
||
r"(?i)\bsqlite\b": "sqlite", r"(?i)\bmysql\b": "mysql",
|
||
r"(?i)\bdocker\b": "docker",
|
||
r"(?i)\bpython\b": "python", r"(?i)\bfastapi\b": "fastapi",
|
||
r"(?i)\bdjango\b": "django", r"(?i)\bflask\b": "flask",
|
||
r"(?i)\bgo\b.*(?:gin|fiber|module)": "go",
|
||
r"(?i)\bnginx\b": "nginx",
|
||
r"(?i)\bpinia\b": "pinia", r"(?i)\bvuex\b": "vuex",
|
||
}
|
||
|
||
|
||
def _detect_stack_from_claude_md(project_path: Path) -> list[str]:
|
||
"""Fallback: infer tech stack from CLAUDE.md text when no config files exist."""
|
||
claude_md = project_path / "CLAUDE.md"
|
||
if not claude_md.exists():
|
||
return []
|
||
try:
|
||
text = claude_md.read_text(errors="replace")[:5000] # First 5KB is enough
|
||
except OSError:
|
||
return []
|
||
stack = []
|
||
for pattern, tech in _CLAUDE_MD_TECH_HINTS.items():
|
||
if re.search(pattern, text):
|
||
stack.append(tech)
|
||
return stack
|
||
|
||
|
||
def _walk_files(root: Path, max_depth: int = 3, _depth: int = 0):
|
||
"""Yield files up to max_depth, skipping node_modules/dist/.git."""
|
||
if _depth > max_depth:
|
||
return
|
||
try:
|
||
entries = sorted(root.iterdir())
|
||
except (OSError, PermissionError):
|
||
return
|
||
for entry in entries:
|
||
if entry.is_file():
|
||
yield entry
|
||
elif entry.is_dir() and entry.name not in _SKIP_DIRS and not entry.name.startswith("."):
|
||
yield from _walk_files(entry, max_depth, _depth + 1)
|
||
|
||
|
||
def _parse_package_json(path: Path) -> list[str]:
|
||
"""Extract tech labels from package.json."""
|
||
try:
|
||
data = json.loads(path.read_text(errors="replace"))
|
||
except (json.JSONDecodeError, OSError):
|
||
return []
|
||
stack = []
|
||
all_deps = {}
|
||
for key in ("dependencies", "devDependencies"):
|
||
all_deps.update(data.get(key, {}))
|
||
for dep_name, tech in _NPM_MARKERS.items():
|
||
if dep_name in all_deps:
|
||
stack.append(tech)
|
||
return stack
|
||
|
||
|
||
def _parse_requirements_txt(path: Path) -> list[str]:
|
||
"""Extract tech labels from requirements.txt."""
|
||
markers = {
|
||
"fastapi": "fastapi", "flask": "flask", "django": "django",
|
||
"sqlalchemy": "sqlalchemy", "celery": "celery", "redis": "redis",
|
||
"pydantic": "pydantic", "click": "click", "pytest": "pytest",
|
||
}
|
||
stack = []
|
||
try:
|
||
text = path.read_text(errors="replace").lower()
|
||
except OSError:
|
||
return stack
|
||
for pkg, tech in markers.items():
|
||
if pkg in text:
|
||
stack.append(tech)
|
||
return stack
|
||
|
||
|
||
def _is_inside_node_modules(path: Path, root: Path) -> bool:
|
||
rel = path.relative_to(root)
|
||
return "node_modules" in rel.parts
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Module detection
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_FRONTEND_EXTS = {".vue", ".jsx", ".tsx", ".svelte"}
|
||
_BACKEND_MARKERS = {"express", "fastify", "koa", "router", "controller", "middleware"}
|
||
|
||
|
||
def detect_modules(project_path: Path) -> list[dict]:
|
||
"""Scan for modules: checks root subdirs, */src/ patterns, standard names.
|
||
|
||
Strategy:
|
||
1. Find all "source root" dirs (src/, app/, lib/ at root or inside top-level dirs)
|
||
2. Each first-level subdir of a source root = a module candidate
|
||
3. Top-level dirs with their own src/ are treated as component roots
|
||
(e.g. frontend/, backend-pg/) — scan THEIR src/ for modules
|
||
"""
|
||
modules = []
|
||
scan_dirs: list[tuple[Path, str | None]] = [] # (dir, prefix_hint)
|
||
|
||
# Direct source dirs in root
|
||
for name in ("src", "app", "lib"):
|
||
d = project_path / name
|
||
if d.is_dir():
|
||
scan_dirs.append((d, None))
|
||
|
||
# Top-level component dirs (frontend/, backend/, backend-pg/, server/, client/)
|
||
# These get scanned for src/ inside, or directly if they contain source files
|
||
for child in sorted(project_path.iterdir()):
|
||
if not child.is_dir() or child.name in _SKIP_DIRS or child.name.startswith("."):
|
||
continue
|
||
child_src = child / "src"
|
||
if child_src.is_dir():
|
||
# e.g. frontend/src/, backend-pg/src/ — scan their subdirs
|
||
scan_dirs.append((child_src, child.name))
|
||
elif child.name in ("frontend", "backend", "server", "client", "web", "api"):
|
||
# No src/ but it's a known component dir — scan it directly
|
||
scan_dirs.append((child, child.name))
|
||
|
||
seen = set()
|
||
for scan_dir, prefix in scan_dirs:
|
||
for child in sorted(scan_dir.iterdir()):
|
||
if not child.is_dir() or child.name in _SKIP_DIRS or child.name.startswith("."):
|
||
continue
|
||
mod = _analyze_module(child, project_path)
|
||
key = mod["name"]
|
||
if key not in seen:
|
||
seen.add(key)
|
||
modules.append(mod)
|
||
|
||
return modules
|
||
|
||
|
||
def _analyze_module(dir_path: Path, project_root: Path) -> dict:
|
||
"""Analyze a directory to determine module type and file count."""
|
||
rel_path = str(dir_path.relative_to(project_root)) + "/"
|
||
files = list(dir_path.rglob("*"))
|
||
source_files = [f for f in files if f.is_file() and not f.name.startswith(".")]
|
||
file_count = len(source_files)
|
||
|
||
# Determine type
|
||
exts = {f.suffix for f in source_files}
|
||
mod_type = _guess_module_type(dir_path, exts, source_files)
|
||
|
||
return {
|
||
"name": dir_path.name,
|
||
"type": mod_type,
|
||
"path": rel_path,
|
||
"file_count": file_count,
|
||
}
|
||
|
||
|
||
def _guess_module_type(dir_path: Path, exts: set[str], files: list[Path]) -> str:
|
||
"""Guess if module is frontend, backend, shared, or infra."""
|
||
# Obvious frontend
|
||
if exts & _FRONTEND_EXTS:
|
||
return "frontend"
|
||
|
||
# Check file contents for backend markers
|
||
has_backend_marker = False
|
||
for f in files[:20]: # Sample first 20 files
|
||
if f.suffix in (".ts", ".js", ".mjs"):
|
||
try:
|
||
text = f.read_text(errors="replace")[:2000]
|
||
text_lower = text.lower()
|
||
if any(m in text_lower for m in _BACKEND_MARKERS):
|
||
has_backend_marker = True
|
||
break
|
||
except OSError:
|
||
continue
|
||
|
||
if has_backend_marker:
|
||
return "backend"
|
||
|
||
# Infra patterns
|
||
name = dir_path.name.lower()
|
||
if name in ("infra", "deploy", "scripts", "ci", "docker", "nginx", "config"):
|
||
return "infra"
|
||
|
||
# Shared by default if ambiguous
|
||
if exts & {".ts", ".js", ".py"}:
|
||
return "shared"
|
||
|
||
return "shared"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Decisions from CLAUDE.md
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_DECISION_PATTERNS = [
|
||
(r"(?i)\b(GOTCHA|ВАЖНО|WARNING|ВНИМАНИЕ)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "gotcha"),
|
||
(r"(?i)\b(WORKAROUND|ОБХОДНОЙ|ХАК)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "workaround"),
|
||
(r"(?i)\b(FIXME|БАГИ?)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "gotcha"),
|
||
(r"(?i)\b(РЕШЕНИЕ|DECISION)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "decision"),
|
||
(r"(?i)\b(CONVENTION|СОГЛАШЕНИЕ|ПРАВИЛО)[:\s]+(.*?)(?=\n[#\-]|\n\n|\Z)", "convention"),
|
||
]
|
||
|
||
# Section headers that likely contain decisions
|
||
_DECISION_SECTIONS = [
|
||
r"(?i)known\s+issues?", r"(?i)workaround", r"(?i)gotcha",
|
||
r"(?i)решени[яе]", r"(?i)грабл[ия]",
|
||
r"(?i)conventions?", r"(?i)правила", r"(?i)нюансы",
|
||
]
|
||
|
||
# Section headers about UNRELATED services — skip these entirely
|
||
_UNRELATED_SECTION_PATTERNS = [
|
||
r"(?i)jitsi", r"(?i)nextcloud", r"(?i)prosody",
|
||
r"(?i)coturn", r"(?i)turn\b", r"(?i)asterisk",
|
||
r"(?i)ghost\s+блог", r"(?i)onlyoffice",
|
||
r"(?i)git\s+sync", r"(?i)\.env\s+добав",
|
||
r"(?i)goip\s+watcher", r"(?i)tbank\s+monitor", # monitoring services
|
||
r"(?i)фикс\s+удален", # commit-level fixes (not decisions)
|
||
]
|
||
|
||
# Noise patterns — individual items that look like noise, not decisions
|
||
_NOISE_PATTERNS = [
|
||
r"^[0-9a-f]{6,40}$", # commit hashes
|
||
r"^\s*(docker|ssh|scp|git|curl|sudo)\s", # shell commands
|
||
r"^`[^`]+`$", # inline code-only items
|
||
r"(?i)(prosody|jitsi|jicofo|jvb|coturn|nextcloud|onlyoffice|ghost)", # unrelated services
|
||
r"(?i)\.jitsi-meet-cfg", # jitsi config paths
|
||
r"(?i)(meet\.jitsi|sitemeet\.org)", # jitsi domains
|
||
r"(?i)(cloud\.vault\.red|office\.vault)", # nextcloud domains
|
||
r"(?i)JWT_APP_(ID|SECRET)", # jwt config lines
|
||
r"(?i)XMPP_", # prosody config
|
||
r"\(коммит\s+`?[0-9a-f]+`?\)", # "(коммит `a33c2b9`)" references
|
||
r"(?i)known_uids|idle_loop|reconnect", # goip-watcher internals
|
||
]
|
||
|
||
|
||
def _is_noise(text: str) -> bool:
|
||
"""Check if a decision candidate is noise."""
|
||
# Clean markdown bold for matching
|
||
clean = re.sub(r"\*\*([^*]*)\*\*", r"\1", text).strip()
|
||
return any(re.search(p, clean) for p in _NOISE_PATTERNS)
|
||
|
||
|
||
def _split_into_sections(text: str) -> list[tuple[str, str]]:
|
||
"""Split markdown into (header, body) pairs by ## headers.
|
||
|
||
Returns list of (header_text, body_text) tuples.
|
||
Anything before the first ## is returned with header="".
|
||
"""
|
||
parts = re.split(r"(?m)^(##\s+.+)$", text)
|
||
sections = []
|
||
current_header = ""
|
||
current_body = parts[0] if parts else ""
|
||
|
||
for i in range(1, len(parts), 2):
|
||
if current_header or current_body.strip():
|
||
sections.append((current_header, current_body))
|
||
current_header = parts[i].strip()
|
||
current_body = parts[i + 1] if i + 1 < len(parts) else ""
|
||
|
||
if current_header or current_body.strip():
|
||
sections.append((current_header, current_body))
|
||
|
||
return sections
|
||
|
||
|
||
def _is_unrelated_section(header: str) -> bool:
|
||
"""Check if a section header is about an unrelated service."""
|
||
return any(re.search(p, header) for p in _UNRELATED_SECTION_PATTERNS)
|
||
|
||
|
||
def extract_decisions_from_claude_md(
|
||
project_path: Path,
|
||
project_id: str | None = None,
|
||
project_name: str | None = None,
|
||
) -> list[dict]:
|
||
"""Parse CLAUDE.md for decisions, gotchas, workarounds.
|
||
|
||
Filters out:
|
||
- Sections about unrelated services (Jitsi, Nextcloud, Prosody, etc.)
|
||
- Noise: commit hashes, docker/ssh commands, paths to external services
|
||
- If CLAUDE.md has multi-project sections, only extracts for current project
|
||
"""
|
||
claude_md = project_path / "CLAUDE.md"
|
||
if not claude_md.exists():
|
||
return []
|
||
|
||
try:
|
||
text = claude_md.read_text(errors="replace")
|
||
except OSError:
|
||
return []
|
||
|
||
# Split into sections and filter out unrelated ones
|
||
sections = _split_into_sections(text)
|
||
relevant_text = []
|
||
for header, body in sections:
|
||
if _is_unrelated_section(header):
|
||
continue
|
||
relevant_text.append(header + "\n" + body)
|
||
|
||
filtered_text = "\n".join(relevant_text)
|
||
|
||
decisions = []
|
||
seen_titles = set()
|
||
|
||
# Pattern-based extraction from relevant sections only
|
||
for pattern, dec_type in _DECISION_PATTERNS:
|
||
for m in re.finditer(pattern, filtered_text, re.DOTALL):
|
||
body = m.group(2).strip()
|
||
if not body or len(body) < 10:
|
||
continue
|
||
lines = body.split("\n")
|
||
title = lines[0].strip().rstrip(".")[:100]
|
||
desc = body
|
||
if _is_noise(title) or _is_noise(desc):
|
||
continue
|
||
if title not in seen_titles:
|
||
seen_titles.add(title)
|
||
decisions.append({
|
||
"type": dec_type,
|
||
"title": title,
|
||
"description": desc,
|
||
"category": _guess_category(title + " " + desc),
|
||
})
|
||
|
||
# Section-based extraction: find ### or #### headers matching decision patterns
|
||
sub_sections = re.split(r"(?m)^(#{1,4}\s+.*?)$", filtered_text)
|
||
for i, section in enumerate(sub_sections):
|
||
if any(re.search(pat, section) for pat in _DECISION_SECTIONS):
|
||
if i + 1 < len(sub_sections):
|
||
content = sub_sections[i + 1].strip()
|
||
for line in content.split("\n"):
|
||
line = line.strip()
|
||
# Numbered items (1. **text**) or bullet items
|
||
item = None
|
||
if re.match(r"^\d+\.\s+", line):
|
||
item = re.sub(r"^\d+\.\s+", "", line).strip()
|
||
elif line.startswith(("- ", "* ", "• ")):
|
||
item = line.lstrip("-*• ").strip()
|
||
|
||
if not item or len(item) < 10:
|
||
continue
|
||
# Clean bold markers for title
|
||
clean = re.sub(r"\*\*([^*]+)\*\*", r"\1", item)
|
||
if _is_noise(clean):
|
||
continue
|
||
title = clean[:100]
|
||
if title not in seen_titles:
|
||
seen_titles.add(title)
|
||
decisions.append({
|
||
"type": "gotcha",
|
||
"title": title,
|
||
"description": item,
|
||
"category": _guess_category(item),
|
||
})
|
||
|
||
return decisions
|
||
|
||
|
||
def _guess_category(text: str) -> str:
|
||
"""Best-effort category guess from text content."""
|
||
t = text.lower()
|
||
if any(w in t for w in ("css", "ui", "vue", "компонент", "стил", "layout", "mobile", "safari", "bottom-sheet")):
|
||
return "ui"
|
||
if any(w in t for w in ("api", "endpoint", "rest", "route", "запрос", "fetch")):
|
||
return "api"
|
||
if any(w in t for w in ("sql", "база", "миграц", "postgres", "sqlite", "бд", "schema")):
|
||
return "architecture"
|
||
if any(w in t for w in ("безопас", "security", "xss", "auth", "token", "csrf", "injection")):
|
||
return "security"
|
||
if any(w in t for w in ("docker", "deploy", "nginx", "ci", "cd", "infra", "сервер")):
|
||
return "devops"
|
||
if any(w in t for w in ("performance", "cache", "оптимиз", "lazy", "скорость")):
|
||
return "performance"
|
||
return "architecture"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Obsidian vault scanning
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def find_vault_root(vault_path: Path | None = None) -> Path | None:
|
||
"""Find the Obsidian vault root directory.
|
||
|
||
If vault_path is given but doesn't exist, returns None (don't fallback).
|
||
If vault_path is None, tries the default iCloud Obsidian location.
|
||
"""
|
||
if vault_path is not None:
|
||
return vault_path if vault_path.is_dir() else None
|
||
|
||
# Default: iCloud Obsidian path
|
||
default = DEFAULT_VAULT
|
||
if default.is_dir():
|
||
# Look for a vault inside (usually one level deep)
|
||
for child in default.iterdir():
|
||
if child.is_dir() and not child.name.startswith("."):
|
||
return child
|
||
return None
|
||
|
||
|
||
def scan_obsidian(
|
||
vault_root: Path,
|
||
project_id: str,
|
||
project_name: str,
|
||
project_dir_name: str | None = None,
|
||
) -> dict:
|
||
"""Scan Obsidian vault for project-related notes.
|
||
|
||
Returns {"tasks": [...], "decisions": [...], "files_scanned": int}
|
||
"""
|
||
result = {"tasks": [], "decisions": [], "files_scanned": 0}
|
||
|
||
# Build search terms
|
||
search_terms = {project_id.lower()}
|
||
if project_name:
|
||
search_terms.add(project_name.lower())
|
||
if project_dir_name:
|
||
search_terms.add(project_dir_name.lower())
|
||
|
||
# Find project folder in vault
|
||
project_files: list[Path] = []
|
||
for term in list(search_terms):
|
||
for child in vault_root.iterdir():
|
||
if child.is_dir() and term in child.name.lower():
|
||
for f in child.rglob("*.md"):
|
||
if f not in project_files:
|
||
project_files.append(f)
|
||
|
||
# Also search for files mentioning the project by name
|
||
for md_file in vault_root.glob("*.md"):
|
||
try:
|
||
text = md_file.read_text(errors="replace")[:5000].lower()
|
||
except OSError:
|
||
continue
|
||
if any(term in text for term in search_terms):
|
||
if md_file not in project_files:
|
||
project_files.append(md_file)
|
||
|
||
result["files_scanned"] = len(project_files)
|
||
|
||
for f in project_files:
|
||
try:
|
||
text = f.read_text(errors="replace")
|
||
except OSError:
|
||
continue
|
||
|
||
_extract_obsidian_tasks(text, f.stem, result["tasks"])
|
||
_extract_obsidian_decisions(text, f.stem, result["decisions"])
|
||
|
||
return result
|
||
|
||
|
||
def _extract_obsidian_tasks(text: str, source: str, tasks: list[dict]):
|
||
"""Extract checkbox items from Obsidian markdown."""
|
||
for m in re.finditer(r"^[-*]\s+\[([ xX])\]\s+(.+)$", text, re.MULTILINE):
|
||
done = m.group(1).lower() == "x"
|
||
title = m.group(2).strip()
|
||
# Remove Obsidian wiki-links
|
||
title = re.sub(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]", r"\1", title)
|
||
if len(title) > 5:
|
||
tasks.append({
|
||
"title": title[:200],
|
||
"done": done,
|
||
"source": source,
|
||
})
|
||
|
||
|
||
def _extract_obsidian_decisions(text: str, source: str, decisions: list[dict]):
|
||
"""Extract decisions/gotchas from Obsidian notes."""
|
||
for pattern, dec_type in _DECISION_PATTERNS:
|
||
for m in re.finditer(pattern, text, re.DOTALL):
|
||
body = m.group(2).strip()
|
||
if not body or len(body) < 10:
|
||
continue
|
||
title = body.split("\n")[0].strip()[:100]
|
||
if _is_noise(title) or _is_noise(body):
|
||
continue
|
||
decisions.append({
|
||
"type": dec_type,
|
||
"title": title,
|
||
"description": body,
|
||
"category": _guess_category(body),
|
||
"source": source,
|
||
})
|
||
|
||
# Also look for ВАЖНО/GOTCHA/FIXME inline markers not caught above
|
||
for m in re.finditer(r"(?i)\*\*(ВАЖНО|GOTCHA|FIXME)\*\*[:\s]*(.*?)(?=\n|$)", text):
|
||
body = m.group(2).strip()
|
||
if not body or len(body) < 10:
|
||
continue
|
||
if _is_noise(body):
|
||
continue
|
||
decisions.append({
|
||
"type": "gotcha",
|
||
"title": body[:100],
|
||
"description": body,
|
||
"category": _guess_category(body),
|
||
"source": source,
|
||
})
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Formatting for CLI preview
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def format_preview(
|
||
project_id: str,
|
||
name: str,
|
||
path: str,
|
||
tech_stack: list[str],
|
||
modules: list[dict],
|
||
decisions: list[dict],
|
||
obsidian: dict | None = None,
|
||
) -> str:
|
||
"""Format bootstrap results for user review."""
|
||
lines = [
|
||
f"Project: {project_id} — {name}",
|
||
f"Path: {path}",
|
||
"",
|
||
f"Tech stack: {', '.join(tech_stack) if tech_stack else '(not detected)'}",
|
||
"",
|
||
]
|
||
|
||
if modules:
|
||
lines.append(f"Modules ({len(modules)}):")
|
||
for m in modules:
|
||
lines.append(f" {m['name']} ({m['type']}) — {m['path']} ({m['file_count']} files)")
|
||
else:
|
||
lines.append("Modules: (none detected)")
|
||
lines.append("")
|
||
|
||
if decisions:
|
||
lines.append(f"Decisions from CLAUDE.md ({len(decisions)}):")
|
||
for i, d in enumerate(decisions, 1):
|
||
lines.append(f" #{i} {d['type']}: {d['title']}")
|
||
else:
|
||
lines.append("Decisions from CLAUDE.md: (none found)")
|
||
|
||
if obsidian:
|
||
lines.append("")
|
||
lines.append(f"Obsidian vault ({obsidian['files_scanned']} files scanned):")
|
||
if obsidian["tasks"]:
|
||
pending = [t for t in obsidian["tasks"] if not t["done"]]
|
||
done = [t for t in obsidian["tasks"] if t["done"]]
|
||
lines.append(f" Tasks: {len(pending)} pending, {len(done)} done")
|
||
for t in pending[:10]:
|
||
lines.append(f" [ ] {t['title']}")
|
||
if len(pending) > 10:
|
||
lines.append(f" ... and {len(pending) - 10} more")
|
||
for t in done[:5]:
|
||
lines.append(f" [x] {t['title']}")
|
||
if len(done) > 5:
|
||
lines.append(f" ... and {len(done) - 5} more done")
|
||
else:
|
||
lines.append(" Tasks: (none found)")
|
||
if obsidian["decisions"]:
|
||
lines.append(f" Decisions: {len(obsidian['decisions'])}")
|
||
for d in obsidian["decisions"][:5]:
|
||
lines.append(f" {d['type']}: {d['title']} (from {d['source']})")
|
||
if len(obsidian["decisions"]) > 5:
|
||
lines.append(f" ... and {len(obsidian['decisions']) - 5} more")
|
||
else:
|
||
lines.append(" Decisions: (none found)")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Write to DB
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def save_to_db(
|
||
conn,
|
||
project_id: str,
|
||
name: str,
|
||
path: str,
|
||
tech_stack: list[str],
|
||
modules: list[dict],
|
||
decisions: list[dict],
|
||
obsidian: dict | None = None,
|
||
):
|
||
"""Save all bootstrap data to kin.db via models."""
|
||
from core import models
|
||
|
||
# Create project
|
||
claude_md = Path(path).expanduser() / "CLAUDE.md"
|
||
models.create_project(
|
||
conn, project_id, name, path,
|
||
tech_stack=tech_stack,
|
||
claude_md_path=str(claude_md) if claude_md.exists() else None,
|
||
)
|
||
|
||
# Add modules
|
||
for m in modules:
|
||
models.add_module(
|
||
conn, project_id, m["name"], m["type"], m["path"],
|
||
description=f"{m['file_count']} files",
|
||
)
|
||
|
||
# Add decisions from CLAUDE.md
|
||
for d in decisions:
|
||
models.add_decision(
|
||
conn, project_id, d["type"], d["title"], d["description"],
|
||
category=d.get("category"),
|
||
)
|
||
|
||
# Add Obsidian decisions
|
||
if obsidian:
|
||
for d in obsidian.get("decisions", []):
|
||
models.add_decision(
|
||
conn, project_id, d["type"], d["title"], d["description"],
|
||
category=d.get("category"),
|
||
tags=[f"obsidian:{d['source']}"],
|
||
)
|
||
|
||
# Import Obsidian tasks
|
||
task_num = 1
|
||
for t in obsidian.get("tasks", []):
|
||
task_id = f"{project_id.upper()}-OBS-{task_num:03d}"
|
||
status = "done" if t["done"] else "pending"
|
||
models.create_task(
|
||
conn, task_id, project_id, t["title"],
|
||
status=status,
|
||
brief={"source": f"obsidian:{t['source']}"},
|
||
)
|
||
task_num += 1
|