# KIN-084 — Live Console: архитектурная спека ## 1. DDL — таблица pipeline_log ```sql CREATE TABLE IF NOT EXISTS pipeline_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, pipeline_id INTEGER NOT NULL REFERENCES pipelines(id), ts TEXT NOT NULL DEFAULT (datetime('now')), level TEXT NOT NULL DEFAULT 'INFO', message TEXT NOT NULL, extra_json TEXT ); CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id); ``` **Замечание**: Brief указывает `pipeline_id TEXT`, но `pipelines.id` — `INTEGER PRIMARY KEY AUTOINCREMENT`. Используем `INTEGER` (соответствует паттерну `department_handoffs.pipeline_id INTEGER`). **Где добавить**: 1. В `SCHEMA` в `core/db.py` — в конец строки SCHEMA (после `task_attachments`) 2. В `_migrate()` в `core/db.py` — guard `if "pipeline_log" not in existing_tables:` (по аналогии с `task_attachments`) --- ## 2. Функции в core/models.py ### write_log() ```python def write_log( conn: sqlite3.Connection, pipeline_id: int, message: str, level: str = "INFO", extra: dict | list | None = None, ) -> dict: """Insert a pipeline log entry. Returns inserted row as dict. extra: optional structured data (serialized to JSON string). level: INFO | DEBUG | ERROR | WARN """ extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None cur = conn.execute( """INSERT INTO pipeline_log (pipeline_id, message, level, extra_json) VALUES (?, ?, ?, ?)""", (pipeline_id, message, level, extra_json), ) conn.commit() row = conn.execute( "SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,) ).fetchone() return _row_to_dict(row) ``` ### get_pipeline_logs() ```python def get_pipeline_logs( conn: sqlite3.Connection, pipeline_id: int, since_id: int = 0, ) -> list[dict]: """Get pipeline log entries after since_id in chronological order. since_id=0 returns all entries (id > 0 matches all). """ rows = conn.execute( """SELECT * FROM pipeline_log WHERE pipeline_id = ? AND id > ? ORDER BY id ASC""", (pipeline_id, since_id), ).fetchall() return _rows_to_list(rows) ``` **Замечание по extra_json**: `_row_to_dict()` автоматически декодирует JSON-строки начинающиеся с `{` или `[`. Так что в ответе API `extra_json` будет уже распарсенным dict/list. --- ## 3. Точки инструментации в agents/runner.py ### Функция run_pipeline() (строки 1210–1835) Все write_log в run_pipeline должны: - Быть защищены `if pipeline:` (dry_run не создаёт pipeline) - Оборачиваться в `try/except Exception: pass` (не блокировать pipeline) | # | После строки | Событие | Level | extra | |---|---|---|---|---| | 1 | ~1270 после `update_pipeline(pid=...)` | Pipeline start | INFO | `{route_type, steps_count, mode}` | | 2 | ~1280 начало тела цикла `for i, step` | Step N start | INFO | `{role, model, brief_preview}` | | 3 | ~1383 после аккумуляции stats | Step N done (success) | INFO | `{tokens, cost_usd, duration_seconds}` | | 4 | ~1329 в `except Exception as exc:` | Step exception | ERROR | `{role, exc_type}` | | 5 | ~1436-1438 в блоке `if not result["success"]` | Step failed | ERROR | `{role, error}` | | 6 | ~1525 в блоке `if blocked_info:` | Step blocked | WARN | `{role, reason}` | | 7 | ~1748 после `update_pipeline(completed)` | Pipeline completed | INFO | `{steps, cost_usd, tokens, duration_s}` | | 8 | ~1771–1802 финальный статус задачи | Task status | INFO | `{task_status, mode}` | ### Функция _execute_department_head_step() (строки 1028–1172) Логируем в `parent_pipeline_id` (существует в момент вызова). Guard: `if parent_pipeline_id:`. | # | После строки | Событие | Level | extra | |---|---|---|---|---| | 9 | ~1059-1066 (dept blocked) | Dept head blocked | WARN | `{role, blocked_reason}` | | 10 | ~1100 перед вызовом sub run_pipeline | Sub-pipeline start | INFO | `{dept_name, sub_steps}` | | 11 | ~1112 после вызова sub run_pipeline | Sub-pipeline done | INFO/ERROR | `{success, steps_completed}` | **Пример вызова (точка 2)**: ```python try: if pipeline: brief_preview = (step.get("brief") or "")[:100] models.write_log( conn, pipeline["id"], f"Step {i+1}/{len(steps)} start: role={role}, model={model}", level="INFO", extra={"role": role, "model": model, "brief_preview": brief_preview}, ) except Exception: pass ``` **Пример вызова (точка 1 — pipeline start)**: ```python try: models.write_log( conn, pipeline["id"], f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}", extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode}, ) except Exception: pass ``` --- ## 4. API Endpoint ### GET /api/pipelines/{pipeline_id}/logs **Контракт**: - Путь: `/api/pipelines/{pipeline_id}/logs` - Query: `since_id: int = 0` - Response: `list[PipelineLogEntry]` - 404 если pipeline не найден **Схема ответа** (один элемент): ```json { "id": 42, "pipeline_id": 7, "ts": "2026-03-17 14:23:01", "level": "INFO", "message": "Step 1/3 start: role=backend_dev, model=sonnet", "extra_json": {"role": "backend_dev", "model": "sonnet", "brief_preview": "..."} } ``` **Реализация в web/api.py**: ```python @app.get("/api/pipelines/{pipeline_id}/logs") def get_pipeline_logs(pipeline_id: int, since_id: int = 0): """Get pipeline log entries after since_id (for live console polling).""" conn = get_conn() row = conn.execute("SELECT id FROM pipelines WHERE id = ?", (pipeline_id,)).fetchone() if not row: conn.close() raise HTTPException(404, f"Pipeline {pipeline_id} not found") logs = models.get_pipeline_logs(conn, pipeline_id, since_id=since_id) conn.close() return logs ``` **Поллинг-протокол (для фронтенда)**: 1. `since_id=0` — первый запрос, получить всё 2. `since_id = max(id)` из последнего ответа для инкрементного обновления 3. Поллить каждые 1-2 секунды пока `pipeline.status == 'running'` 4. Остановить поллинг когда pipeline завершён --- ## 5. Тесты (обязательны для core/) ### tests/test_db.py - SCHEMA-строка содержит CREATE TABLE pipeline_log (convention #387) - `_migrate()` создаёт pipeline_log если таблицы нет ### tests/test_models.py - `write_log()` корректно вставляет запись, возвращает dict - `write_log()` с `extra=None` — extra_json = NULL - `write_log()` с `extra={dict}` — extra_json декодируется в dict - `get_pipeline_logs()` с `since_id=0` возвращает все записи - `get_pipeline_logs()` с `since_id=N` возвращает только id > N - Записи в хронологическом порядке (ORDER BY id ASC) ### tests/test_api.py - GET /api/pipelines/{id}/logs → 200, list - GET /api/pipelines/{id}/logs?since_id=5 → только id > 5 - GET /api/pipelines/9999/logs → 404 --- ## 6. Совместимость (#375/#376) `write_log()` пишет ТОЛЬКО в `pipeline_log`, не затрагивает `pipelines.status` и `watchdog`. Watchdog (core/watchdog.py) проверяет `pipelines.status = 'running' AND pid IS NOT NULL` — не зависит от pipeline_log. Нет конфликтов. **FK guard**: write_log вызывается только ПОСЛЕ `create_pipeline()` (т.е. когда pipeline_id уже в БД). PM-шаг (cli/main.py) запускается ДО `run_pipeline()`, поэтому pipeline_id ещё не существует. PM-логи недоступны в pipeline_log — это ограничение текущей архитектуры. Для KIN-084 MVP: логируем от момента создания pipeline_record до финала.