kin/core/obsidian_sync.py

181 lines
5.6 KiB
Python
Raw Normal View History

"""
Kin двусторонний sync с Obsidian vault.
Export: decisions .md-файлы с YAML frontmatter
Import: чекбоксы в .md-файлах статус задач
"""
import re
import sqlite3
from pathlib import Path
from typing import Optional
from core import models
def _slug(title: str) -> str:
"""Генерирует slug из заголовка для имени файла."""
s = title.lower()
s = re.sub(r"[^a-zа-я0-9\s-]", "", s)
s = re.sub(r"\s+", "-", s.strip())
return s[:50]
def _decision_to_md(decision: dict) -> str:
"""Форматирует decision как .md файл с YAML frontmatter."""
tags = decision.get("tags") or []
if isinstance(tags, str):
try:
import json
tags = json.loads(tags)
except Exception:
tags = []
tags_str = "[" + ", ".join(str(t) for t in tags) + "]"
created_at = (decision.get("created_at") or "")[:10] # только дата
frontmatter = (
"---\n"
f"kin_decision_id: {decision['id']}\n"
f"project: {decision['project_id']}\n"
f"type: {decision['type']}\n"
f"category: {decision.get('category') or ''}\n"
f"tags: {tags_str}\n"
f"created_at: {created_at}\n"
"---\n"
)
body = f"\n# {decision['title']}\n\n{decision['description']}\n"
return frontmatter + body
def _parse_frontmatter(text: str) -> dict:
"""Парсит YAML frontmatter из .md файла (упрощённый парсер через re)."""
result = {}
match = re.match(r"^---\n(.*?)\n---", text, re.DOTALL)
if not match:
return result
for line in match.group(1).splitlines():
if ":" in line:
key, _, val = line.partition(":")
result[key.strip()] = val.strip()
return result
def export_decisions_to_md(
project_id: str,
decisions: list[dict],
vault_path: Path,
) -> list[Path]:
"""Экспортирует decisions в .md-файлы Obsidian. Возвращает список созданных файлов."""
out_dir = vault_path / project_id / "decisions"
out_dir.mkdir(parents=True, exist_ok=True)
created: list[Path] = []
for d in decisions:
slug = _slug(d["title"])
fname = f"{d['id']}-{slug}.md"
fpath = out_dir / fname
fpath.write_text(_decision_to_md(d), encoding="utf-8")
created.append(fpath)
return created
def parse_task_checkboxes(
vault_path: Path,
project_id: str,
) -> list[dict]:
"""Парсит *.md-файлы в vault/{project_id}/tasks/ и {project_id}/ на чекбоксы с task ID.
Returns: [{"task_id": "KIN-013", "done": True, "title": "..."}]
"""
pattern = re.compile(r"^[-*]\s+\[([xX ])\]\s+([A-Z][A-Z0-9]*-\d+)\s+(.+)$")
results: list[dict] = []
search_dirs = [
vault_path / project_id / "tasks",
vault_path / project_id,
]
for search_dir in search_dirs:
if not search_dir.is_dir():
continue
for md_file in search_dir.glob("*.md"):
try:
text = md_file.read_text(encoding="utf-8")
except OSError:
continue
for line in text.splitlines():
m = pattern.match(line.strip())
if m:
check_char, task_id, title = m.group(1), m.group(2), m.group(3)
results.append({
"task_id": task_id,
"done": check_char.lower() == "x",
"title": title.strip(),
})
return results
def sync_obsidian(conn: sqlite3.Connection, project_id: str) -> dict:
"""Оркестратор: export decisions + import checkboxes.
Returns:
{
"exported_decisions": int,
"tasks_updated": int,
"errors": list[str],
"vault_path": str
}
"""
project = models.get_project(conn, project_id)
if not project:
raise ValueError(f"Project '{project_id}' not found")
vault_path_str: Optional[str] = project.get("obsidian_vault_path")
if not vault_path_str:
raise ValueError(f"obsidian_vault_path not set for project '{project_id}'")
vault_path = Path(vault_path_str)
errors: list[str] = []
# --- Export decisions ---
exported_count = 0
if not vault_path.is_dir():
errors.append(f"Vault path does not exist or is not a directory: {vault_path_str}")
else:
try:
decisions = models.get_decisions(conn, project_id)
created_files = export_decisions_to_md(project_id, decisions, vault_path)
exported_count = len(created_files)
except Exception as e:
errors.append(f"Export error: {e}")
# --- Import checkboxes ---
tasks_updated = 0
if vault_path.is_dir():
try:
checkboxes = parse_task_checkboxes(vault_path, project_id)
for item in checkboxes:
if not item["done"]:
continue
task = models.get_task(conn, item["task_id"])
if task is None:
continue
if task.get("project_id") != project_id:
continue
if task.get("status") != "done":
models.update_task(conn, item["task_id"], status="done")
tasks_updated += 1
except Exception as e:
errors.append(f"Import error: {e}")
return {
"exported_decisions": exported_count,
"tasks_updated": tasks_updated,
"errors": errors,
"vault_path": vault_path_str,
}