262 lines
8.3 KiB
Python
262 lines
8.3 KiB
Python
"""
|
|
Kin follow-up generator — analyzes pipeline output and creates follow-up tasks.
|
|
Runs a PM agent to parse results and produce actionable task list.
|
|
Detects permission-blocked items and returns them as pending actions.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import sqlite3
|
|
|
|
from core import models
|
|
from core.context_builder import format_prompt, PROMPTS_DIR
|
|
|
|
PERMISSION_PATTERNS = [
|
|
r"(?i)permission\s+denied",
|
|
r"(?i)ручное\s+применение",
|
|
r"(?i)не\s+получил[иа]?\s+разрешени[ея]",
|
|
r"(?i)cannot\s+write",
|
|
r"(?i)read[- ]?only",
|
|
r"(?i)нет\s+прав\s+на\s+запись",
|
|
r"(?i)manually\s+appl",
|
|
r"(?i)apply\s+manually",
|
|
r"(?i)требуется\s+ручн",
|
|
]
|
|
|
|
|
|
def _next_task_id(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
category: str | None = None,
|
|
) -> str:
|
|
"""Thin wrapper around models.next_task_id for testability."""
|
|
return models.next_task_id(conn, project_id, category=category)
|
|
|
|
|
|
def _is_permission_blocked(item: dict) -> bool:
|
|
"""Check if a follow-up item describes a permission/write failure."""
|
|
text = f"{item.get('title', '')} {item.get('brief', '')}".lower()
|
|
return any(re.search(p, text) for p in PERMISSION_PATTERNS)
|
|
|
|
|
|
def _collect_pipeline_output(conn: sqlite3.Connection, task_id: str) -> str:
|
|
"""Collect all pipeline step outputs for a task into a single string."""
|
|
rows = conn.execute(
|
|
"""SELECT agent_role, output_summary, success
|
|
FROM agent_logs WHERE task_id = ? ORDER BY created_at""",
|
|
(task_id,),
|
|
).fetchall()
|
|
if not rows:
|
|
return ""
|
|
parts = []
|
|
for r in rows:
|
|
status = "OK" if r["success"] else "FAILED"
|
|
parts.append(f"=== {r['agent_role']} [{status}] ===")
|
|
parts.append(r["output_summary"] or "(no output)")
|
|
parts.append("")
|
|
return "\n".join(parts)
|
|
|
|
|
|
|
|
def generate_followups(
|
|
conn: sqlite3.Connection,
|
|
task_id: str,
|
|
dry_run: bool = False,
|
|
) -> dict:
|
|
"""Analyze pipeline output and create follow-up tasks.
|
|
|
|
Returns dict:
|
|
{
|
|
"created": [task, ...], # tasks created immediately
|
|
"pending_actions": [action, ...], # items needing user decision
|
|
}
|
|
|
|
A pending_action looks like:
|
|
{
|
|
"type": "permission_fix",
|
|
"description": "...",
|
|
"original_item": {...}, # raw item from PM
|
|
"options": ["rerun", "manual_task", "skip"],
|
|
}
|
|
"""
|
|
task = models.get_task(conn, task_id)
|
|
if not task:
|
|
return {"created": [], "pending_actions": []}
|
|
|
|
project_id = task["project_id"]
|
|
project = models.get_project(conn, project_id)
|
|
if not project:
|
|
return {"created": [], "pending_actions": []}
|
|
|
|
pipeline_output = _collect_pipeline_output(conn, task_id)
|
|
|
|
# Build context for followup agent
|
|
language = project.get("language", "ru")
|
|
context = {
|
|
"project": {
|
|
"id": project["id"],
|
|
"name": project["name"],
|
|
"path": project["path"],
|
|
"tech_stack": project.get("tech_stack"),
|
|
"language": language,
|
|
},
|
|
"task": {
|
|
"id": task["id"],
|
|
"title": task["title"],
|
|
"status": task["status"],
|
|
"priority": task["priority"],
|
|
"brief": task.get("brief"),
|
|
"spec": task.get("spec"),
|
|
},
|
|
"previous_output": pipeline_output,
|
|
}
|
|
|
|
prompt = format_prompt(context, "followup")
|
|
|
|
if dry_run:
|
|
return {"created": [{"_dry_run": True, "_prompt": prompt}], "pending_actions": []}
|
|
|
|
# Run followup agent
|
|
from agents.runner import _run_claude, _try_parse_json
|
|
|
|
result = _run_claude(prompt, model="sonnet")
|
|
output = result.get("output", "")
|
|
|
|
# Parse the task list from output
|
|
parsed = _try_parse_json(output)
|
|
if not isinstance(parsed, list):
|
|
if isinstance(parsed, dict):
|
|
if "tasks" in parsed:
|
|
parsed = parsed["tasks"]
|
|
elif "followups" in parsed:
|
|
parsed = parsed["followups"]
|
|
else:
|
|
parsed = []
|
|
else:
|
|
return {"created": [], "pending_actions": []}
|
|
|
|
# Guard: extracted value might be null/non-list (e.g. {"tasks": null})
|
|
if not isinstance(parsed, list):
|
|
parsed = []
|
|
|
|
# Separate permission-blocked items from normal ones
|
|
created = []
|
|
pending_actions = []
|
|
|
|
for item in parsed:
|
|
if not isinstance(item, dict) or not item.get("title"):
|
|
continue
|
|
|
|
if _is_permission_blocked(item):
|
|
pending_actions.append({
|
|
"type": "permission_fix",
|
|
"description": item["title"],
|
|
"original_item": item,
|
|
"options": ["rerun", "manual_task", "skip"],
|
|
})
|
|
else:
|
|
new_id = _next_task_id(conn, project_id, category=task.get("category"))
|
|
brief_dict = {"source": f"followup:{task_id}"}
|
|
if item.get("type"):
|
|
brief_dict["route_type"] = item["type"]
|
|
if item.get("brief"):
|
|
brief_dict["description"] = item["brief"]
|
|
|
|
t = models.create_task(
|
|
conn, new_id, project_id,
|
|
title=item["title"],
|
|
priority=item.get("priority", 5),
|
|
parent_task_id=task_id,
|
|
brief=brief_dict,
|
|
category=task.get("category"),
|
|
)
|
|
created.append(t)
|
|
|
|
# Log the followup generation
|
|
models.log_agent_run(
|
|
conn, project_id, "followup_pm", "generate_followups",
|
|
task_id=task_id,
|
|
output_summary=json.dumps({
|
|
"created": [{"id": t["id"], "title": t["title"]} for t in created],
|
|
"pending": len(pending_actions),
|
|
}, ensure_ascii=False),
|
|
success=True,
|
|
)
|
|
|
|
return {"created": created, "pending_actions": pending_actions}
|
|
|
|
|
|
def resolve_pending_action(
|
|
conn: sqlite3.Connection,
|
|
task_id: str,
|
|
action: dict,
|
|
choice: str,
|
|
) -> dict | None:
|
|
"""Resolve a single pending action.
|
|
|
|
choice: "rerun" | "manual_task" | "skip"
|
|
Returns created task dict for "manual_task", None otherwise.
|
|
"""
|
|
task = models.get_task(conn, task_id)
|
|
if not task:
|
|
return None
|
|
|
|
project_id = task["project_id"]
|
|
item = action.get("original_item", {})
|
|
|
|
if choice == "skip":
|
|
return None
|
|
|
|
if choice == "manual_task":
|
|
new_id = _next_task_id(conn, project_id, category=task.get("category"))
|
|
brief_dict = {"source": f"followup:{task_id}", "task_type": "manual_escalation"}
|
|
if item.get("type"):
|
|
brief_dict["route_type"] = item["type"]
|
|
if item.get("brief"):
|
|
brief_dict["description"] = item["brief"]
|
|
return models.create_task(
|
|
conn, new_id, project_id,
|
|
title=item.get("title", "Manual fix required"),
|
|
priority=item.get("priority", 5),
|
|
parent_task_id=task_id,
|
|
brief=brief_dict,
|
|
category=task.get("category"),
|
|
)
|
|
|
|
if choice == "rerun":
|
|
# Re-run pipeline for the parent task with allow_write
|
|
from agents.runner import run_pipeline
|
|
steps = [{"role": item.get("type", "frontend_dev"),
|
|
"brief": item.get("brief", item.get("title", "")),
|
|
"model": "sonnet"}]
|
|
result = run_pipeline(conn, task_id, steps, allow_write=True)
|
|
return {"rerun_result": result}
|
|
|
|
return None
|
|
|
|
|
|
def auto_resolve_pending_actions(
|
|
conn: sqlite3.Connection,
|
|
task_id: str,
|
|
pending_actions: list,
|
|
) -> list:
|
|
"""Auto-resolve pending permission actions in auto mode.
|
|
|
|
Strategy: try 'rerun' first; if rerun fails → escalate to 'manual_task'.
|
|
Returns list of resolution results.
|
|
"""
|
|
results = []
|
|
for action in pending_actions:
|
|
result = resolve_pending_action(conn, task_id, action, "rerun")
|
|
rerun_success = (
|
|
isinstance(result, dict)
|
|
and isinstance(result.get("rerun_result"), dict)
|
|
and result["rerun_result"].get("success")
|
|
)
|
|
if rerun_success:
|
|
results.append({"resolved": "rerun", "result": result})
|
|
else:
|
|
# Rerun failed → create manual task for human review
|
|
manual = resolve_pending_action(conn, task_id, action, "manual_task")
|
|
results.append({"resolved": "manual_task", "result": manual})
|
|
return results
|