217 lines
8.5 KiB
Markdown
217 lines
8.5 KiB
Markdown
# KIN-084 — Live Console: архитектурная спека
|
||
|
||
## 1. DDL — таблица pipeline_log
|
||
|
||
```sql
|
||
CREATE TABLE IF NOT EXISTS pipeline_log (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
pipeline_id INTEGER NOT NULL REFERENCES pipelines(id),
|
||
ts TEXT NOT NULL DEFAULT (datetime('now')),
|
||
level TEXT NOT NULL DEFAULT 'INFO',
|
||
message TEXT NOT NULL,
|
||
extra_json TEXT
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_pipeline_log_pipeline_id ON pipeline_log(pipeline_id, id);
|
||
```
|
||
|
||
**Замечание**: Brief указывает `pipeline_id TEXT`, но `pipelines.id` — `INTEGER PRIMARY KEY AUTOINCREMENT`.
|
||
Используем `INTEGER` (соответствует паттерну `department_handoffs.pipeline_id INTEGER`).
|
||
|
||
**Где добавить**:
|
||
1. В `SCHEMA` в `core/db.py` — в конец строки SCHEMA (после `task_attachments`)
|
||
2. В `_migrate()` в `core/db.py` — guard `if "pipeline_log" not in existing_tables:` (по аналогии с `task_attachments`)
|
||
|
||
---
|
||
|
||
## 2. Функции в core/models.py
|
||
|
||
### write_log()
|
||
|
||
```python
|
||
def write_log(
|
||
conn: sqlite3.Connection,
|
||
pipeline_id: int,
|
||
message: str,
|
||
level: str = "INFO",
|
||
extra: dict | list | None = None,
|
||
) -> dict:
|
||
"""Insert a pipeline log entry. Returns inserted row as dict.
|
||
|
||
extra: optional structured data (serialized to JSON string).
|
||
level: INFO | DEBUG | ERROR | WARN
|
||
"""
|
||
extra_json = json.dumps(extra, ensure_ascii=False) if extra is not None else None
|
||
cur = conn.execute(
|
||
"""INSERT INTO pipeline_log (pipeline_id, message, level, extra_json)
|
||
VALUES (?, ?, ?, ?)""",
|
||
(pipeline_id, message, level, extra_json),
|
||
)
|
||
conn.commit()
|
||
row = conn.execute(
|
||
"SELECT * FROM pipeline_log WHERE id = ?", (cur.lastrowid,)
|
||
).fetchone()
|
||
return _row_to_dict(row)
|
||
```
|
||
|
||
### get_pipeline_logs()
|
||
|
||
```python
|
||
def get_pipeline_logs(
|
||
conn: sqlite3.Connection,
|
||
pipeline_id: int,
|
||
since_id: int = 0,
|
||
) -> list[dict]:
|
||
"""Get pipeline log entries after since_id in chronological order.
|
||
|
||
since_id=0 returns all entries (id > 0 matches all).
|
||
"""
|
||
rows = conn.execute(
|
||
"""SELECT * FROM pipeline_log
|
||
WHERE pipeline_id = ? AND id > ?
|
||
ORDER BY id ASC""",
|
||
(pipeline_id, since_id),
|
||
).fetchall()
|
||
return _rows_to_list(rows)
|
||
```
|
||
|
||
**Замечание по extra_json**: `_row_to_dict()` автоматически декодирует JSON-строки начинающиеся с `{` или `[`.
|
||
Так что в ответе API `extra_json` будет уже распарсенным dict/list.
|
||
|
||
---
|
||
|
||
## 3. Точки инструментации в agents/runner.py
|
||
|
||
### Функция run_pipeline() (строки 1210–1835)
|
||
|
||
Все write_log в run_pipeline должны:
|
||
- Быть защищены `if pipeline:` (dry_run не создаёт pipeline)
|
||
- Оборачиваться в `try/except Exception: pass` (не блокировать pipeline)
|
||
|
||
| # | После строки | Событие | Level | extra |
|
||
|---|---|---|---|---|
|
||
| 1 | ~1270 после `update_pipeline(pid=...)` | Pipeline start | INFO | `{route_type, steps_count, mode}` |
|
||
| 2 | ~1280 начало тела цикла `for i, step` | Step N start | INFO | `{role, model, brief_preview}` |
|
||
| 3 | ~1383 после аккумуляции stats | Step N done (success) | INFO | `{tokens, cost_usd, duration_seconds}` |
|
||
| 4 | ~1329 в `except Exception as exc:` | Step exception | ERROR | `{role, exc_type}` |
|
||
| 5 | ~1436-1438 в блоке `if not result["success"]` | Step failed | ERROR | `{role, error}` |
|
||
| 6 | ~1525 в блоке `if blocked_info:` | Step blocked | WARN | `{role, reason}` |
|
||
| 7 | ~1748 после `update_pipeline(completed)` | Pipeline completed | INFO | `{steps, cost_usd, tokens, duration_s}` |
|
||
| 8 | ~1771–1802 финальный статус задачи | Task status | INFO | `{task_status, mode}` |
|
||
|
||
### Функция _execute_department_head_step() (строки 1028–1172)
|
||
|
||
Логируем в `parent_pipeline_id` (существует в момент вызова).
|
||
Guard: `if parent_pipeline_id:`.
|
||
|
||
| # | После строки | Событие | Level | extra |
|
||
|---|---|---|---|---|
|
||
| 9 | ~1059-1066 (dept blocked) | Dept head blocked | WARN | `{role, blocked_reason}` |
|
||
| 10 | ~1100 перед вызовом sub run_pipeline | Sub-pipeline start | INFO | `{dept_name, sub_steps}` |
|
||
| 11 | ~1112 после вызова sub run_pipeline | Sub-pipeline done | INFO/ERROR | `{success, steps_completed}` |
|
||
|
||
**Пример вызова (точка 2)**:
|
||
```python
|
||
try:
|
||
if pipeline:
|
||
brief_preview = (step.get("brief") or "")[:100]
|
||
models.write_log(
|
||
conn, pipeline["id"],
|
||
f"Step {i+1}/{len(steps)} start: role={role}, model={model}",
|
||
level="INFO",
|
||
extra={"role": role, "model": model, "brief_preview": brief_preview},
|
||
)
|
||
except Exception:
|
||
pass
|
||
```
|
||
|
||
**Пример вызова (точка 1 — pipeline start)**:
|
||
```python
|
||
try:
|
||
models.write_log(
|
||
conn, pipeline["id"],
|
||
f"Pipeline start: task={task_id}, steps={len(steps)}, route_type={effective_route_type}, mode={mode}",
|
||
extra={"route_type": effective_route_type, "steps_count": len(steps), "mode": mode},
|
||
)
|
||
except Exception:
|
||
pass
|
||
```
|
||
|
||
---
|
||
|
||
## 4. API Endpoint
|
||
|
||
### GET /api/pipelines/{pipeline_id}/logs
|
||
|
||
**Контракт**:
|
||
- Путь: `/api/pipelines/{pipeline_id}/logs`
|
||
- Query: `since_id: int = 0`
|
||
- Response: `list[PipelineLogEntry]`
|
||
- 404 если pipeline не найден
|
||
|
||
**Схема ответа** (один элемент):
|
||
```json
|
||
{
|
||
"id": 42,
|
||
"pipeline_id": 7,
|
||
"ts": "2026-03-17 14:23:01",
|
||
"level": "INFO",
|
||
"message": "Step 1/3 start: role=backend_dev, model=sonnet",
|
||
"extra_json": {"role": "backend_dev", "model": "sonnet", "brief_preview": "..."}
|
||
}
|
||
```
|
||
|
||
**Реализация в web/api.py**:
|
||
```python
|
||
@app.get("/api/pipelines/{pipeline_id}/logs")
|
||
def get_pipeline_logs(pipeline_id: int, since_id: int = 0):
|
||
"""Get pipeline log entries after since_id (for live console polling)."""
|
||
conn = get_conn()
|
||
row = conn.execute("SELECT id FROM pipelines WHERE id = ?", (pipeline_id,)).fetchone()
|
||
if not row:
|
||
conn.close()
|
||
raise HTTPException(404, f"Pipeline {pipeline_id} not found")
|
||
logs = models.get_pipeline_logs(conn, pipeline_id, since_id=since_id)
|
||
conn.close()
|
||
return logs
|
||
```
|
||
|
||
**Поллинг-протокол (для фронтенда)**:
|
||
1. `since_id=0` — первый запрос, получить всё
|
||
2. `since_id = max(id)` из последнего ответа для инкрементного обновления
|
||
3. Поллить каждые 1-2 секунды пока `pipeline.status == 'running'`
|
||
4. Остановить поллинг когда pipeline завершён
|
||
|
||
---
|
||
|
||
## 5. Тесты (обязательны для core/)
|
||
|
||
### tests/test_db.py
|
||
- SCHEMA-строка содержит CREATE TABLE pipeline_log (convention #387)
|
||
- `_migrate()` создаёт pipeline_log если таблицы нет
|
||
|
||
### tests/test_models.py
|
||
- `write_log()` корректно вставляет запись, возвращает dict
|
||
- `write_log()` с `extra=None` — extra_json = NULL
|
||
- `write_log()` с `extra={dict}` — extra_json декодируется в dict
|
||
- `get_pipeline_logs()` с `since_id=0` возвращает все записи
|
||
- `get_pipeline_logs()` с `since_id=N` возвращает только id > N
|
||
- Записи в хронологическом порядке (ORDER BY id ASC)
|
||
|
||
### tests/test_api.py
|
||
- GET /api/pipelines/{id}/logs → 200, list
|
||
- GET /api/pipelines/{id}/logs?since_id=5 → только id > 5
|
||
- GET /api/pipelines/9999/logs → 404
|
||
|
||
---
|
||
|
||
## 6. Совместимость (#375/#376)
|
||
|
||
`write_log()` пишет ТОЛЬКО в `pipeline_log`, не затрагивает `pipelines.status` и `watchdog`.
|
||
Watchdog (core/watchdog.py) проверяет `pipelines.status = 'running' AND pid IS NOT NULL` — не зависит от pipeline_log.
|
||
Нет конфликтов.
|
||
|
||
**FK guard**: write_log вызывается только ПОСЛЕ `create_pipeline()` (т.е. когда pipeline_id уже в БД).
|
||
PM-шаг (cli/main.py) запускается ДО `run_pipeline()`, поэтому pipeline_id ещё не существует.
|
||
PM-логи недоступны в pipeline_log — это ограничение текущей архитектуры.
|
||
Для KIN-084 MVP: логируем от момента создания pipeline_record до финала.
|