kin/tasks/kin-084-spec.md
2026-03-17 17:26:31 +02:00

8.5 KiB
Raw Blame History

KIN-084 — Live Console: архитектурная спека

1. DDL — таблица pipeline_log

CREATE TABLE IF NOT EXISTS pipeline_log (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    pipeline_id INTEGER NOT NULL REFERENCES pipelines(id),
    ts          TEXT NOT NULL DEFAULT (datetime('now')),
    level       TEXT NOT NULL DEFAULT 'INFO',
    message     TEXT NOT NULL,
    extra_json  TEXT
);

CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id);

Замечание: Brief указывает pipeline_id TEXT, но pipelines.idINTEGER PRIMARY KEY AUTOINCREMENT. Используем INTEGER (соответствует паттерну department_handoffs.pipeline_id INTEGER).

Где добавить:

  1. В SCHEMA в core/db.py — в конец строки SCHEMA (после task_attachments)
  2. В _migrate() в core/db.py — guard if "pipeline_log" not in existing_tables: (по аналогии с task_attachments)

2. Функции в core/models.py

write_log()

def write_log(
    conn: sqlite3.Connection,
    pipeline_id: int,
    message: str,
    level: str = "INFO",
    extra: dict | list | None = None,
) -> dict:
    """Insert a pipeline log entry. Returns inserted row as dict.

    extra: optional structured data (serialized to JSON string).
    level: INFO | DEBUG | ERROR | WARN
    """
    extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None
    cur = conn.execute(
        """INSERT INTO pipeline_log (pipeline_id, message, level, extra_json)
           VALUES (?, ?, ?, ?)""",
        (pipeline_id, message, level, extra_json),
    )
    conn.commit()
    row = conn.execute(
        "SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,)
    ).fetchone()
    return _row_to_dict(row)

get_pipeline_logs()

def get_pipeline_logs(
    conn: sqlite3.Connection,
    pipeline_id: int,
    since_id: int = 0,
) -> list[dict]:
    """Get pipeline log entries after since_id in chronological order.

    since_id=0 returns all entries (id > 0 matches all).
    """
    rows = conn.execute(
        """SELECT * FROM pipeline_log
           WHERE pipeline_id = ? AND id > ?
           ORDER BY id ASC""",
        (pipeline_id, since_id),
    ).fetchall()
    return _rows_to_list(rows)

Замечание по extra_json: _row_to_dict() автоматически декодирует JSON-строки начинающиеся с { или [. Так что в ответе API extra_json будет уже распарсенным dict/list.


3. Точки инструментации в agents/runner.py

Функция run_pipeline() (строки 12101835)

Все write_log в run_pipeline должны:

  • Быть защищены if pipeline: (dry_run не создаёт pipeline)
  • Оборачиваться в try/except Exception: pass (не блокировать pipeline)
# После строки Событие Level extra
1 ~1270 после update_pipeline(pid=...) Pipeline start INFO {route_type, steps_count, mode}
2 ~1280 начало тела цикла for i, step Step N start INFO {role, model, brief_preview}
3 ~1383 после аккумуляции stats Step N done (success) INFO {tokens, cost_usd, duration_seconds}
4 ~1329 в except Exception as exc: Step exception ERROR {role, exc_type}
5 ~1436-1438 в блоке if not result["success"] Step failed ERROR {role, error}
6 ~1525 в блоке if blocked_info: Step blocked WARN {role, reason}
7 ~1748 после update_pipeline(completed) Pipeline completed INFO {steps, cost_usd, tokens, duration_s}
8 ~17711802 финальный статус задачи Task status INFO {task_status, mode}

Функция _execute_department_head_step() (строки 10281172)

Логируем в parent_pipeline_id (существует в момент вызова). Guard: if parent_pipeline_id:.

# После строки Событие Level extra
9 ~1059-1066 (dept blocked) Dept head blocked WARN {role, blocked_reason}
10 ~1100 перед вызовом sub run_pipeline Sub-pipeline start INFO {dept_name, sub_steps}
11 ~1112 после вызова sub run_pipeline Sub-pipeline done INFO/ERROR {success, steps_completed}

Пример вызова (точка 2):

try:
    if pipeline:
        brief_preview = (step.get("brief") or "")[:100]
        models.write_log(
            conn, pipeline["id"],
            f"Step {i+1}/{len(steps)} start: role={role}, model={model}",
            level="INFO",
            extra={"role": role, "model": model, "brief_preview": brief_preview},
        )
except Exception:
    pass

Пример вызова (точка 1 — pipeline start):

try:
    models.write_log(
        conn, pipeline["id"],
        f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}",
        extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode},
    )
except Exception:
    pass

4. API Endpoint

GET /api/pipelines/{pipeline_id}/logs

Контракт:

  • Путь: /api/pipelines/{pipeline_id}/logs
  • Query: since_id: int = 0
  • Response: list[PipelineLogEntry]
  • 404 если pipeline не найден

Схема ответа (один элемент):

{
  "id": 42,
  "pipeline_id": 7,
  "ts": "2026-03-17 14:23:01",
  "level": "INFO",
  "message": "Step 1/3 start: role=backend_dev, model=sonnet",
  "extra_json": {"role": "backend_dev", "model": "sonnet", "brief_preview": "..."}
}

Реализация в web/api.py:

@app.get("/api/pipelines/{pipeline_id}/logs")
def get_pipeline_logs(pipeline_id: int, since_id: int = 0):
    """Get pipeline log entries after since_id (for live console polling)."""
    conn = get_conn()
    row = conn.execute("SELECT id FROM pipelines WHERE id = ?", (pipeline_id,)).fetchone()
    if not row:
        conn.close()
        raise HTTPException(404, f"Pipeline {pipeline_id} not found")
    logs = models.get_pipeline_logs(conn, pipeline_id, since_id=since_id)
    conn.close()
    return logs

Поллинг-протокол (для фронтенда):

  1. since_id=0 — первый запрос, получить всё
  2. since_id = max(id) из последнего ответа для инкрементного обновления
  3. Поллить каждые 1-2 секунды пока pipeline.status == 'running'
  4. Остановить поллинг когда pipeline завершён

5. Тесты (обязательны для core/)

tests/test_db.py

  • SCHEMA-строка содержит CREATE TABLE pipeline_log (convention #387)
  • _migrate() создаёт pipeline_log если таблицы нет

tests/test_models.py

  • write_log() корректно вставляет запись, возвращает dict
  • write_log() с extra=None — extra_json = NULL
  • write_log() с extra={dict} — extra_json декодируется в dict
  • get_pipeline_logs() с since_id=0 возвращает все записи
  • get_pipeline_logs() с since_id=N возвращает только id > N
  • Записи в хронологическом порядке (ORDER BY id ASC)

tests/test_api.py

  • GET /api/pipelines/{id}/logs → 200, list
  • GET /api/pipelines/{id}/logs?since_id=5 → только id > 5
  • GET /api/pipelines/9999/logs → 404

6. Совместимость (#375/#376)

write_log() пишет ТОЛЬКО в pipeline_log, не затрагивает pipelines.status и watchdog. Watchdog (core/watchdog.py) проверяет pipelines.status = 'running' AND pid IS NOT NULL — не зависит от pipeline_log. Нет конфликтов.

FK guard: write_log вызывается только ПОСЛЕ create_pipeline() (т.е. когда pipeline_id уже в БД). PM-шаг (cli/main.py) запускается ДО run_pipeline(), поэтому pipeline_id ещё не существует. PM-логи недоступны в pipeline_log — это ограничение текущей архитектуры. Для KIN-084 MVP: логируем от момента создания pipeline_record до финала.