180 lines
5.6 KiB
Python
180 lines
5.6 KiB
Python
"""
|
||
Kin — двусторонний sync с Obsidian vault.
|
||
|
||
Export: decisions → .md-файлы с YAML frontmatter
|
||
Import: чекбоксы в .md-файлах → статус задач
|
||
"""
|
||
|
||
import re
|
||
import sqlite3
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from core import models
|
||
|
||
|
||
def _slug(title: str) -> str:
|
||
"""Генерирует slug из заголовка для имени файла."""
|
||
s = title.lower()
|
||
s = re.sub(r"[^a-zа-я0-9\s-]", "", s)
|
||
s = re.sub(r"\s+", "-", s.strip())
|
||
return s[:50]
|
||
|
||
|
||
def _decision_to_md(decision: dict) -> str:
|
||
"""Форматирует decision как .md файл с YAML frontmatter."""
|
||
tags = decision.get("tags") or []
|
||
if isinstance(tags, str):
|
||
try:
|
||
import json
|
||
tags = json.loads(tags)
|
||
except Exception:
|
||
tags = []
|
||
|
||
tags_str = "[" + ", ".join(str(t) for t in tags) + "]"
|
||
created_at = (decision.get("created_at") or "")[:10] # только дата
|
||
|
||
frontmatter = (
|
||
"---\n"
|
||
f"kin_decision_id: {decision['id']}\n"
|
||
f"project: {decision['project_id']}\n"
|
||
f"type: {decision['type']}\n"
|
||
f"category: {decision.get('category') or ''}\n"
|
||
f"tags: {tags_str}\n"
|
||
f"created_at: {created_at}\n"
|
||
"---\n"
|
||
)
|
||
|
||
body = f"\n# {decision['title']}\n\n{decision['description']}\n"
|
||
return frontmatter + body
|
||
|
||
|
||
def _parse_frontmatter(text: str) -> dict:
|
||
"""Парсит YAML frontmatter из .md файла (упрощённый парсер через re)."""
|
||
result = {}
|
||
match = re.match(r"^---\n(.*?)\n---", text, re.DOTALL)
|
||
if not match:
|
||
return result
|
||
for line in match.group(1).splitlines():
|
||
if ":" in line:
|
||
key, _, val = line.partition(":")
|
||
result[key.strip()] = val.strip()
|
||
return result
|
||
|
||
|
||
def export_decisions_to_md(
|
||
project_id: str,
|
||
decisions: list[dict],
|
||
vault_path: Path,
|
||
) -> list[Path]:
|
||
"""Экспортирует decisions в .md-файлы Obsidian. Возвращает список созданных файлов."""
|
||
out_dir = vault_path / project_id / "decisions"
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
created: list[Path] = []
|
||
for d in decisions:
|
||
slug = _slug(d["title"])
|
||
fname = f"{d['id']}-{slug}.md"
|
||
fpath = out_dir / fname
|
||
fpath.write_text(_decision_to_md(d), encoding="utf-8")
|
||
created.append(fpath)
|
||
|
||
return created
|
||
|
||
|
||
def parse_task_checkboxes(
|
||
vault_path: Path,
|
||
project_id: str,
|
||
) -> list[dict]:
|
||
"""Парсит *.md-файлы в vault/{project_id}/tasks/ и {project_id}/ на чекбоксы с task ID.
|
||
|
||
Returns: [{"task_id": "KIN-013", "done": True, "title": "..."}]
|
||
"""
|
||
pattern = re.compile(r"^[-*]\s+\[([xX ])\]\s+([A-Z][A-Z0-9]*-(?:[A-Z][A-Z0-9]*-)?\d+)\s+(.+)$")
|
||
results: list[dict] = []
|
||
|
||
search_dirs = [
|
||
vault_path / project_id / "tasks",
|
||
vault_path / project_id,
|
||
]
|
||
|
||
for search_dir in search_dirs:
|
||
if not search_dir.is_dir():
|
||
continue
|
||
for md_file in search_dir.glob("*.md"):
|
||
try:
|
||
text = md_file.read_text(encoding="utf-8")
|
||
except OSError:
|
||
continue
|
||
for line in text.splitlines():
|
||
m = pattern.match(line.strip())
|
||
if m:
|
||
check_char, task_id, title = m.group(1), m.group(2), m.group(3)
|
||
results.append({
|
||
"task_id": task_id,
|
||
"done": check_char.lower() == "x",
|
||
"title": title.strip(),
|
||
})
|
||
|
||
return results
|
||
|
||
|
||
def sync_obsidian(conn: sqlite3.Connection, project_id: str) -> dict:
|
||
"""Оркестратор: export decisions + import checkboxes.
|
||
|
||
Returns:
|
||
{
|
||
"exported_decisions": int,
|
||
"tasks_updated": int,
|
||
"errors": list[str],
|
||
"vault_path": str
|
||
}
|
||
"""
|
||
project = models.get_project(conn, project_id)
|
||
if not project:
|
||
raise ValueError(f"Project '{project_id}' not found")
|
||
|
||
vault_path_str: Optional[str] = project.get("obsidian_vault_path")
|
||
if not vault_path_str:
|
||
raise ValueError(f"obsidian_vault_path not set for project '{project_id}'")
|
||
|
||
vault_path = Path(vault_path_str)
|
||
errors: list[str] = []
|
||
|
||
# --- Export decisions ---
|
||
exported_count = 0
|
||
if not vault_path.is_dir():
|
||
errors.append(f"Vault path does not exist or is not a directory: {vault_path_str}")
|
||
else:
|
||
try:
|
||
decisions = models.get_decisions(conn, project_id)
|
||
created_files = export_decisions_to_md(project_id, decisions, vault_path)
|
||
exported_count = len(created_files)
|
||
except Exception as e:
|
||
errors.append(f"Export error: {e}")
|
||
|
||
# --- Import checkboxes ---
|
||
tasks_updated = 0
|
||
if vault_path.is_dir():
|
||
try:
|
||
checkboxes = parse_task_checkboxes(vault_path, project_id)
|
||
for item in checkboxes:
|
||
if not item["done"]:
|
||
continue
|
||
task = models.get_task(conn, item["task_id"])
|
||
if task is None:
|
||
continue
|
||
if task.get("project_id") != project_id:
|
||
continue
|
||
if task.get("status") != "done":
|
||
models.update_task(conn, item["task_id"], status="done")
|
||
tasks_updated += 1
|
||
except Exception as e:
|
||
errors.append(f"Import error: {e}")
|
||
|
||
return {
|
||
"exported_decisions": exported_count,
|
||
"tasks_updated": tasks_updated,
|
||
"errors": errors,
|
||
"vault_path": vault_path_str,
|
||
}
|