""" Kin follow-up generator — analyzes pipeline output and creates follow-up tasks. Runs a PM agent to parse results and produce actionable task list. """ import json import sqlite3 from core import models from core.context_builder import format_prompt, PROMPTS_DIR def _collect_pipeline_output(conn: sqlite3.Connection, task_id: str) -> str: """Collect all pipeline step outputs for a task into a single string.""" rows = conn.execute( """SELECT agent_role, output_summary, success FROM agent_logs WHERE task_id = ? ORDER BY created_at""", (task_id,), ).fetchall() if not rows: return "" parts = [] for r in rows: status = "OK" if r["success"] else "FAILED" parts.append(f"=== {r['agent_role']} [{status}] ===") parts.append(r["output_summary"] or "(no output)") parts.append("") return "\n".join(parts) def _next_task_id(conn: sqlite3.Connection, project_id: str) -> str: """Generate the next sequential task ID for a project.""" prefix = project_id.upper() existing = models.list_tasks(conn, project_id=project_id) max_num = 0 for t in existing: tid = t["id"] if tid.startswith(prefix + "-"): try: num = int(tid.split("-", 1)[1]) max_num = max(max_num, num) except ValueError: pass return f"{prefix}-{max_num + 1:03d}" def generate_followups( conn: sqlite3.Connection, task_id: str, dry_run: bool = False, ) -> list[dict]: """Analyze pipeline output and create follow-up tasks. 1. Collects all agent_logs output for the task 2. Runs followup agent (claude -p) to analyze and propose tasks 3. Creates tasks in DB with parent_task_id = task_id Returns list of created task dicts. """ task = models.get_task(conn, task_id) if not task: return [] project_id = task["project_id"] project = models.get_project(conn, project_id) if not project: return [] pipeline_output = _collect_pipeline_output(conn, task_id) if not pipeline_output: return [] # Build context for followup agent language = project.get("language", "ru") context = { "project": { "id": project["id"], "name": project["name"], "path": project["path"], "tech_stack": project.get("tech_stack"), "language": language, }, "task": { "id": task["id"], "title": task["title"], "status": task["status"], "priority": task["priority"], "brief": task.get("brief"), "spec": task.get("spec"), }, "previous_output": pipeline_output, } prompt = format_prompt(context, "followup") if dry_run: return [{"_dry_run": True, "_prompt": prompt}] # Run followup agent from agents.runner import _run_claude, _try_parse_json result = _run_claude(prompt, model="sonnet") output = result.get("output", "") # Parse the task list from output parsed = _try_parse_json(output) if not isinstance(parsed, list): # Maybe it's wrapped in a dict if isinstance(parsed, dict): parsed = parsed.get("tasks") or parsed.get("followups") or [] else: return [] # Create tasks in DB created = [] for item in parsed: if not isinstance(item, dict) or "title" not in item: continue new_id = _next_task_id(conn, project_id) brief = item.get("brief") brief_dict = {"source": f"followup:{task_id}"} if item.get("type"): brief_dict["route_type"] = item["type"] if brief: brief_dict["description"] = brief t = models.create_task( conn, new_id, project_id, title=item["title"], priority=item.get("priority", 5), parent_task_id=task_id, brief=brief_dict, ) created.append(t) # Log the followup generation models.log_agent_run( conn, project_id, "followup_pm", "generate_followups", task_id=task_id, output_summary=json.dumps( [{"id": t["id"], "title": t["title"]} for t in created], ensure_ascii=False, ), success=True, ) return created