""" Kin follow-up generator — analyzes pipeline output and creates follow-up tasks. Runs a PM agent to parse results and produce actionable task list. Detects permission-blocked items and returns them as pending actions. """ import json import re import sqlite3 from core import models from core.context_builder import format_prompt, PROMPTS_DIR PERMISSION_PATTERNS = [ r"(?i)permission\s+denied", r"(?i)ручное\s+применение", r"(?i)не\s+получил[иа]?\s+разрешени[ея]", r"(?i)cannot\s+write", r"(?i)read[- ]?only", r"(?i)нет\s+прав\s+на\s+запись", r"(?i)manually\s+appl", r"(?i)apply\s+manually", r"(?i)требуется\s+ручн", ] def _next_task_id( conn: sqlite3.Connection, project_id: str, category: str | None = None, ) -> str: """Thin wrapper around models.next_task_id for testability.""" return models.next_task_id(conn, project_id, category=category) def _is_permission_blocked(item: dict) -> bool: """Check if a follow-up item describes a permission/write failure.""" text = f"{item.get('title', '')} {item.get('brief', '')}".lower() return any(re.search(p, text) for p in PERMISSION_PATTERNS) def _collect_pipeline_output(conn: sqlite3.Connection, task_id: str) -> str: """Collect all pipeline step outputs for a task into a single string.""" rows = conn.execute( """SELECT agent_role, output_summary, success FROM agent_logs WHERE task_id = ? ORDER BY created_at""", (task_id,), ).fetchall() if not rows: return "" parts = [] for r in rows: status = "OK" if r["success"] else "FAILED" parts.append(f"=== {r['agent_role']} [{status}] ===") parts.append(r["output_summary"] or "(no output)") parts.append("") return "\n".join(parts) def generate_followups( conn: sqlite3.Connection, task_id: str, dry_run: bool = False, ) -> dict: """Analyze pipeline output and create follow-up tasks. Returns dict: { "created": [task, ...], # tasks created immediately "pending_actions": [action, ...], # items needing user decision } A pending_action looks like: { "type": "permission_fix", "description": "...", "original_item": {...}, # raw item from PM "options": ["rerun", "manual_task", "skip"], } """ task = models.get_task(conn, task_id) if not task: return {"created": [], "pending_actions": []} project_id = task["project_id"] project = models.get_project(conn, project_id) if not project: return {"created": [], "pending_actions": []} pipeline_output = _collect_pipeline_output(conn, task_id) if not pipeline_output: return {"created": [], "pending_actions": []} # Build context for followup agent language = project.get("language", "ru") context = { "project": { "id": project["id"], "name": project["name"], "path": project["path"], "tech_stack": project.get("tech_stack"), "language": language, }, "task": { "id": task["id"], "title": task["title"], "status": task["status"], "priority": task["priority"], "brief": task.get("brief"), "spec": task.get("spec"), }, "previous_output": pipeline_output, } prompt = format_prompt(context, "followup") if dry_run: return {"created": [{"_dry_run": True, "_prompt": prompt}], "pending_actions": []} # Run followup agent from agents.runner import _run_claude, _try_parse_json result = _run_claude(prompt, model="sonnet") output = result.get("output", "") # Parse the task list from output parsed = _try_parse_json(output) if not isinstance(parsed, list): if isinstance(parsed, dict): parsed = parsed.get("tasks") or parsed.get("followups") or [] else: return {"created": [], "pending_actions": []} # Separate permission-blocked items from normal ones created = [] pending_actions = [] for item in parsed: if not isinstance(item, dict) or "title" not in item: continue if _is_permission_blocked(item): pending_actions.append({ "type": "permission_fix", "description": item["title"], "original_item": item, "options": ["rerun", "manual_task", "skip"], }) else: new_id = _next_task_id(conn, project_id, category=task.get("category")) brief_dict = {"source": f"followup:{task_id}"} if item.get("type"): brief_dict["route_type"] = item["type"] if item.get("brief"): brief_dict["description"] = item["brief"] t = models.create_task( conn, new_id, project_id, title=item["title"], priority=item.get("priority", 5), parent_task_id=task_id, brief=brief_dict, category=task.get("category"), ) created.append(t) # Log the followup generation models.log_agent_run( conn, project_id, "followup_pm", "generate_followups", task_id=task_id, output_summary=json.dumps({ "created": [{"id": t["id"], "title": t["title"]} for t in created], "pending": len(pending_actions), }, ensure_ascii=False), success=True, ) return {"created": created, "pending_actions": pending_actions} def resolve_pending_action( conn: sqlite3.Connection, task_id: str, action: dict, choice: str, ) -> dict | None: """Resolve a single pending action. choice: "rerun" | "manual_task" | "skip" Returns created task dict for "manual_task", None otherwise. """ task = models.get_task(conn, task_id) if not task: return None project_id = task["project_id"] item = action.get("original_item", {}) if choice == "skip": return None if choice == "manual_task": new_id = _next_task_id(conn, project_id, category=task.get("category")) brief_dict = {"source": f"followup:{task_id}", "task_type": "manual_escalation"} if item.get("type"): brief_dict["route_type"] = item["type"] if item.get("brief"): brief_dict["description"] = item["brief"] return models.create_task( conn, new_id, project_id, title=item.get("title", "Manual fix required"), priority=item.get("priority", 5), parent_task_id=task_id, brief=brief_dict, category=task.get("category"), ) if choice == "rerun": # Re-run pipeline for the parent task with allow_write from agents.runner import run_pipeline steps = [{"role": item.get("type", "frontend_dev"), "brief": item.get("brief", item.get("title", "")), "model": "sonnet"}] result = run_pipeline(conn, task_id, steps, allow_write=True) return {"rerun_result": result} return None def auto_resolve_pending_actions( conn: sqlite3.Connection, task_id: str, pending_actions: list, ) -> list: """Auto-resolve pending permission actions in auto mode. Strategy: try 'rerun' first; if rerun fails → escalate to 'manual_task'. Returns list of resolution results. """ results = [] for action in pending_actions: result = resolve_pending_action(conn, task_id, action, "rerun") rerun_success = ( isinstance(result, dict) and isinstance(result.get("rerun_result"), dict) and result["rerun_result"].get("success") ) if rerun_success: results.append({"resolved": "rerun", "result": result}) else: # Rerun failed → create manual task for human review manual = resolve_pending_action(conn, task_id, action, "manual_task") results.append({"resolved": "manual_task", "result": manual}) return results