8.5 KiB
KIN-084 — Live Console: архитектурная спека
1. DDL — таблица pipeline_log
CREATE TABLE IF NOT EXISTS pipeline_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL REFERENCES pipelines(id),
ts TEXT NOT NULL DEFAULT (datetime('now')),
level TEXT NOT NULL DEFAULT 'INFO',
message TEXT NOT NULL,
extra_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id);
Замечание: Brief указывает pipeline_id TEXT, но pipelines.id — INTEGER PRIMARY KEY AUTOINCREMENT.
Используем INTEGER (соответствует паттерну department_handoffs.pipeline_id INTEGER).
Где добавить:
- В
SCHEMAвcore/db.py— в конец строки SCHEMA (послеtask_attachments) - В
_migrate()вcore/db.py— guardif "pipeline_log" not in existing_tables:(по аналогии сtask_attachments)
2. Функции в core/models.py
write_log()
def write_log(
conn: sqlite3.Connection,
pipeline_id: int,
message: str,
level: str = "INFO",
extra: dict | list | None = None,
) -> dict:
"""Insert a pipeline log entry. Returns inserted row as dict.
extra: optional structured data (serialized to JSON string).
level: INFO | DEBUG | ERROR | WARN
"""
extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None
cur = conn.execute(
"""INSERT INTO pipeline_log (pipeline_id, message, level, extra_json)
VALUES (?, ?, ?, ?)""",
(pipeline_id, message, level, extra_json),
)
conn.commit()
row = conn.execute(
"SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,)
).fetchone()
return _row_to_dict(row)
get_pipeline_logs()
def get_pipeline_logs(
conn: sqlite3.Connection,
pipeline_id: int,
since_id: int = 0,
) -> list[dict]:
"""Get pipeline log entries after since_id in chronological order.
since_id=0 returns all entries (id > 0 matches all).
"""
rows = conn.execute(
"""SELECT * FROM pipeline_log
WHERE pipeline_id = ? AND id > ?
ORDER BY id ASC""",
(pipeline_id, since_id),
).fetchall()
return _rows_to_list(rows)
Замечание по extra_json: _row_to_dict() автоматически декодирует JSON-строки начинающиеся с { или [.
Так что в ответе API extra_json будет уже распарсенным dict/list.
3. Точки инструментации в agents/runner.py
Функция run_pipeline() (строки 1210–1835)
Все write_log в run_pipeline должны:
- Быть защищены
if pipeline:(dry_run не создаёт pipeline) - Оборачиваться в
try/except Exception: pass(не блокировать pipeline)
| # | После строки | Событие | Level | extra |
|---|---|---|---|---|
| 1 | ~1270 после update_pipeline(pid=...) |
Pipeline start | INFO | {route_type, steps_count, mode} |
| 2 | ~1280 начало тела цикла for i, step |
Step N start | INFO | {role, model, brief_preview} |
| 3 | ~1383 после аккумуляции stats | Step N done (success) | INFO | {tokens, cost_usd, duration_seconds} |
| 4 | ~1329 в except Exception as exc: |
Step exception | ERROR | {role, exc_type} |
| 5 | ~1436-1438 в блоке if not result["success"] |
Step failed | ERROR | {role, error} |
| 6 | ~1525 в блоке if blocked_info: |
Step blocked | WARN | {role, reason} |
| 7 | ~1748 после update_pipeline(completed) |
Pipeline completed | INFO | {steps, cost_usd, tokens, duration_s} |
| 8 | ~1771–1802 финальный статус задачи | Task status | INFO | {task_status, mode} |
Функция _execute_department_head_step() (строки 1028–1172)
Логируем в parent_pipeline_id (существует в момент вызова).
Guard: if parent_pipeline_id:.
| # | После строки | Событие | Level | extra |
|---|---|---|---|---|
| 9 | ~1059-1066 (dept blocked) | Dept head blocked | WARN | {role, blocked_reason} |
| 10 | ~1100 перед вызовом sub run_pipeline | Sub-pipeline start | INFO | {dept_name, sub_steps} |
| 11 | ~1112 после вызова sub run_pipeline | Sub-pipeline done | INFO/ERROR | {success, steps_completed} |
Пример вызова (точка 2):
try:
if pipeline:
brief_preview = (step.get("brief") or "")[:100]
models.write_log(
conn, pipeline["id"],
f"Step {i+1}/{len(steps)} start: role={role}, model={model}",
level="INFO",
extra={"role": role, "model": model, "brief_preview": brief_preview},
)
except Exception:
pass
Пример вызова (точка 1 — pipeline start):
try:
models.write_log(
conn, pipeline["id"],
f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}",
extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode},
)
except Exception:
pass
4. API Endpoint
GET /api/pipelines/{pipeline_id}/logs
Контракт:
- Путь:
/api/pipelines/{pipeline_id}/logs - Query:
since_id: int = 0 - Response:
list[PipelineLogEntry] - 404 если pipeline не найден
Схема ответа (один элемент):
{
"id": 42,
"pipeline_id": 7,
"ts": "2026-03-17 14:23:01",
"level": "INFO",
"message": "Step 1/3 start: role=backend_dev, model=sonnet",
"extra_json": {"role": "backend_dev", "model": "sonnet", "brief_preview": "..."}
}
Реализация в web/api.py:
@app.get("/api/pipelines/{pipeline_id}/logs")
def get_pipeline_logs(pipeline_id: int, since_id: int = 0):
"""Get pipeline log entries after since_id (for live console polling)."""
conn = get_conn()
row = conn.execute("SELECT id FROM pipelines WHERE id = ?", (pipeline_id,)).fetchone()
if not row:
conn.close()
raise HTTPException(404, f"Pipeline {pipeline_id} not found")
logs = models.get_pipeline_logs(conn, pipeline_id, since_id=since_id)
conn.close()
return logs
Поллинг-протокол (для фронтенда):
since_id=0— первый запрос, получить всёsince_id = max(id)из последнего ответа для инкрементного обновления- Поллить каждые 1-2 секунды пока
pipeline.status == 'running' - Остановить поллинг когда pipeline завершён
5. Тесты (обязательны для core/)
tests/test_db.py
- SCHEMA-строка содержит CREATE TABLE pipeline_log (convention #387)
_migrate()создаёт pipeline_log если таблицы нет
tests/test_models.py
write_log()корректно вставляет запись, возвращает dictwrite_log()сextra=None— extra_json = NULLwrite_log()сextra={dict}— extra_json декодируется в dictget_pipeline_logs()сsince_id=0возвращает все записиget_pipeline_logs()сsince_id=Nвозвращает только id > N- Записи в хронологическом порядке (ORDER BY id ASC)
tests/test_api.py
- GET /api/pipelines/{id}/logs → 200, list
- GET /api/pipelines/{id}/logs?since_id=5 → только id > 5
- GET /api/pipelines/9999/logs → 404
6. Совместимость (#375/#376)
write_log() пишет ТОЛЬКО в pipeline_log, не затрагивает pipelines.status и watchdog.
Watchdog (core/watchdog.py) проверяет pipelines.status = 'running' AND pid IS NOT NULL — не зависит от pipeline_log.
Нет конфликтов.
FK guard: write_log вызывается только ПОСЛЕ create_pipeline() (т.е. когда pipeline_id уже в БД).
PM-шаг (cli/main.py) запускается ДО run_pipeline(), поэтому pipeline_id ещё не существует.
PM-логи недоступны в pipeline_log — это ограничение текущей архитектуры.
Для KIN-084 MVP: логируем от момента создания pipeline_record до финала.