2026-03-16 07:13:32 +02:00
|
|
|
|
"""
|
|
|
|
|
|
Kin — двусторонний sync с Obsidian vault.
|
|
|
|
|
|
|
|
|
|
|
|
Export: decisions → .md-файлы с YAML frontmatter
|
|
|
|
|
|
Import: чекбоксы в .md-файлах → статус задач
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import re
|
|
|
|
|
|
import sqlite3
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
|
|
from core import models
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _slug(title: str) -> str:
|
|
|
|
|
|
"""Генерирует slug из заголовка для имени файла."""
|
|
|
|
|
|
s = title.lower()
|
|
|
|
|
|
s = re.sub(r"[^a-zа-я0-9\s-]", "", s)
|
|
|
|
|
|
s = re.sub(r"\s+", "-", s.strip())
|
|
|
|
|
|
return s[:50]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _decision_to_md(decision: dict) -> str:
|
|
|
|
|
|
"""Форматирует decision как .md файл с YAML frontmatter."""
|
|
|
|
|
|
tags = decision.get("tags") or []
|
|
|
|
|
|
if isinstance(tags, str):
|
|
|
|
|
|
try:
|
|
|
|
|
|
import json
|
|
|
|
|
|
tags = json.loads(tags)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
tags = []
|
|
|
|
|
|
|
|
|
|
|
|
tags_str = "[" + ", ".join(str(t) for t in tags) + "]"
|
|
|
|
|
|
created_at = (decision.get("created_at") or "")[:10] # только дата
|
|
|
|
|
|
|
|
|
|
|
|
frontmatter = (
|
|
|
|
|
|
"---\n"
|
|
|
|
|
|
f"kin_decision_id: {decision['id']}\n"
|
|
|
|
|
|
f"project: {decision['project_id']}\n"
|
|
|
|
|
|
f"type: {decision['type']}\n"
|
|
|
|
|
|
f"category: {decision.get('category') or ''}\n"
|
|
|
|
|
|
f"tags: {tags_str}\n"
|
|
|
|
|
|
f"created_at: {created_at}\n"
|
|
|
|
|
|
"---\n"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
body = f"\n# {decision['title']}\n\n{decision['description']}\n"
|
|
|
|
|
|
return frontmatter + body
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_frontmatter(text: str) -> dict:
|
|
|
|
|
|
"""Парсит YAML frontmatter из .md файла (упрощённый парсер через re)."""
|
|
|
|
|
|
result = {}
|
|
|
|
|
|
match = re.match(r"^---\n(.*?)\n---", text, re.DOTALL)
|
|
|
|
|
|
if not match:
|
|
|
|
|
|
return result
|
|
|
|
|
|
for line in match.group(1).splitlines():
|
|
|
|
|
|
if ":" in line:
|
|
|
|
|
|
key, _, val = line.partition(":")
|
|
|
|
|
|
result[key.strip()] = val.strip()
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def export_decisions_to_md(
|
|
|
|
|
|
project_id: str,
|
|
|
|
|
|
decisions: list[dict],
|
|
|
|
|
|
vault_path: Path,
|
|
|
|
|
|
) -> list[Path]:
|
|
|
|
|
|
"""Экспортирует decisions в .md-файлы Obsidian. Возвращает список созданных файлов."""
|
|
|
|
|
|
out_dir = vault_path / project_id / "decisions"
|
|
|
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
created: list[Path] = []
|
|
|
|
|
|
for d in decisions:
|
|
|
|
|
|
slug = _slug(d["title"])
|
|
|
|
|
|
fname = f"{d['id']}-{slug}.md"
|
|
|
|
|
|
fpath = out_dir / fname
|
|
|
|
|
|
fpath.write_text(_decision_to_md(d), encoding="utf-8")
|
|
|
|
|
|
created.append(fpath)
|
|
|
|
|
|
|
|
|
|
|
|
return created
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_task_checkboxes(
|
|
|
|
|
|
vault_path: Path,
|
|
|
|
|
|
project_id: str,
|
|
|
|
|
|
) -> list[dict]:
|
|
|
|
|
|
"""Парсит *.md-файлы в vault/{project_id}/tasks/ и {project_id}/ на чекбоксы с task ID.
|
|
|
|
|
|
|
|
|
|
|
|
Returns: [{"task_id": "KIN-013", "done": True, "title": "..."}]
|
|
|
|
|
|
"""
|
2026-03-16 07:17:54 +02:00
|
|
|
|
pattern = re.compile(r"^[-*]\s+\[([xX ])\]\s+([A-Z][A-Z0-9]*-\d+)\s+(.+)$")
|
2026-03-16 07:13:32 +02:00
|
|
|
|
results: list[dict] = []
|
|
|
|
|
|
|
|
|
|
|
|
search_dirs = [
|
|
|
|
|
|
vault_path / project_id / "tasks",
|
|
|
|
|
|
vault_path / project_id,
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
for search_dir in search_dirs:
|
|
|
|
|
|
if not search_dir.is_dir():
|
|
|
|
|
|
continue
|
|
|
|
|
|
for md_file in search_dir.glob("*.md"):
|
|
|
|
|
|
try:
|
|
|
|
|
|
text = md_file.read_text(encoding="utf-8")
|
|
|
|
|
|
except OSError:
|
|
|
|
|
|
continue
|
|
|
|
|
|
for line in text.splitlines():
|
|
|
|
|
|
m = pattern.match(line.strip())
|
|
|
|
|
|
if m:
|
|
|
|
|
|
check_char, task_id, title = m.group(1), m.group(2), m.group(3)
|
|
|
|
|
|
results.append({
|
|
|
|
|
|
"task_id": task_id,
|
|
|
|
|
|
"done": check_char.lower() == "x",
|
|
|
|
|
|
"title": title.strip(),
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sync_obsidian(conn: sqlite3.Connection, project_id: str) -> dict:
|
|
|
|
|
|
"""Оркестратор: export decisions + import checkboxes.
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
{
|
|
|
|
|
|
"exported_decisions": int,
|
|
|
|
|
|
"tasks_updated": int,
|
|
|
|
|
|
"errors": list[str],
|
|
|
|
|
|
"vault_path": str
|
|
|
|
|
|
}
|
|
|
|
|
|
"""
|
|
|
|
|
|
project = models.get_project(conn, project_id)
|
|
|
|
|
|
if not project:
|
|
|
|
|
|
raise ValueError(f"Project '{project_id}' not found")
|
|
|
|
|
|
|
|
|
|
|
|
vault_path_str: Optional[str] = project.get("obsidian_vault_path")
|
|
|
|
|
|
if not vault_path_str:
|
|
|
|
|
|
raise ValueError(f"obsidian_vault_path not set for project '{project_id}'")
|
|
|
|
|
|
|
|
|
|
|
|
vault_path = Path(vault_path_str)
|
|
|
|
|
|
errors: list[str] = []
|
|
|
|
|
|
|
|
|
|
|
|
# --- Export decisions ---
|
|
|
|
|
|
exported_count = 0
|
|
|
|
|
|
if not vault_path.is_dir():
|
|
|
|
|
|
errors.append(f"Vault path does not exist or is not a directory: {vault_path_str}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
try:
|
|
|
|
|
|
decisions = models.get_decisions(conn, project_id)
|
|
|
|
|
|
created_files = export_decisions_to_md(project_id, decisions, vault_path)
|
|
|
|
|
|
exported_count = len(created_files)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
errors.append(f"Export error: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# --- Import checkboxes ---
|
|
|
|
|
|
tasks_updated = 0
|
|
|
|
|
|
if vault_path.is_dir():
|
|
|
|
|
|
try:
|
|
|
|
|
|
checkboxes = parse_task_checkboxes(vault_path, project_id)
|
|
|
|
|
|
for item in checkboxes:
|
|
|
|
|
|
if not item["done"]:
|
|
|
|
|
|
continue
|
|
|
|
|
|
task = models.get_task(conn, item["task_id"])
|
|
|
|
|
|
if task is None:
|
|
|
|
|
|
continue
|
|
|
|
|
|
if task.get("project_id") != project_id:
|
|
|
|
|
|
continue
|
|
|
|
|
|
if task.get("status") != "done":
|
|
|
|
|
|
models.update_task(conn, item["task_id"], status="done")
|
|
|
|
|
|
tasks_updated += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
errors.append(f"Import error: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
"exported_decisions": exported_count,
|
|
|
|
|
|
"tasks_updated": tasks_updated,
|
|
|
|
|
|
"errors": errors,
|
|
|
|
|
|
"vault_path": vault_path_str,
|
|
|
|
|
|
}
|