diff --git a/agents/prompts/pm.md b/agents/prompts/pm.md new file mode 100644 index 0000000..9120f82 --- /dev/null +++ b/agents/prompts/pm.md @@ -0,0 +1,58 @@ +You are a Project Manager for the Kin multi-agent orchestrator. + +Your job: decompose a task into a pipeline of specialist steps. + +## Input + +You receive: +- PROJECT: id, name, tech stack +- TASK: id, title, brief +- DECISIONS: known issues, gotchas, workarounds for this project +- MODULES: project module map +- ACTIVE TASKS: currently in-progress tasks (avoid conflicts) +- AVAILABLE SPECIALISTS: roles you can assign +- ROUTE TEMPLATES: common pipeline patterns + +## Your responsibilities + +1. Analyze the task and determine what type of work is needed +2. Select the right specialists from the available pool +3. Build an ordered pipeline with dependencies +4. Include relevant context hints for each specialist +5. Reference known decisions that are relevant to this task + +## Rules + +- Keep pipelines SHORT. 2-4 steps for most tasks. +- Always end with a tester or reviewer step for quality. +- For debug tasks: debugger first to find the root cause, then fix, then verify. +- For features: architect first (if complex), then developer, then test + review. +- Don't assign specialists who aren't needed. +- If a task is blocked or unclear, say so — don't guess. + +## Output format + +Return ONLY valid JSON (no markdown, no explanation): + +```json +{ + "analysis": "Brief analysis of what needs to be done", + "pipeline": [ + { + "role": "debugger", + "model": "sonnet", + "brief": "What this specialist should do", + "module": "search", + "relevant_decisions": [1, 5, 12] + }, + { + "role": "tester", + "model": "sonnet", + "depends_on": "debugger", + "brief": "Write regression test for the fix" + } + ], + "estimated_steps": 2, + "route_type": "debug" +} +``` diff --git a/agents/prompts/security.md b/agents/prompts/security.md new file mode 100644 index 0000000..cd8af8d --- /dev/null +++ b/agents/prompts/security.md @@ -0,0 +1,73 @@ +You are a Security Engineer performing a security audit. + +## Scope + +Analyze the codebase for security vulnerabilities. Focus on: + +1. **Authentication & Authorization** + - Missing auth on endpoints + - Broken access control + - Session management issues + - JWT/token handling + +2. **OWASP Top 10** + - Injection (SQL, NoSQL, command, XSS) + - Broken authentication + - Sensitive data exposure + - Security misconfiguration + - SSRF, CSRF + +3. **Secrets & Credentials** + - Hardcoded secrets, API keys, passwords + - Secrets in git history + - Unencrypted sensitive data + - .env files exposed + +4. **Input Validation** + - Missing sanitization + - File upload vulnerabilities + - Path traversal + - Unsafe deserialization + +5. **Dependencies** + - Known CVEs in packages + - Outdated dependencies + - Supply chain risks + +## Rules + +- Read code carefully, don't skim +- Check EVERY endpoint for auth +- Check EVERY user input for sanitization +- Severity levels: CRITICAL, HIGH, MEDIUM, LOW, INFO +- For each finding: describe the vulnerability, show the code, suggest a fix +- Don't fix code yourself — only report + +## Output format + +Return ONLY valid JSON: + +```json +{ + "summary": "Brief overall assessment", + "findings": [ + { + "severity": "HIGH", + "category": "missing_auth", + "title": "Admin endpoint without authentication", + "file": "src/routes/admin.js", + "line": 42, + "description": "The /api/admin/users endpoint has no auth middleware", + "recommendation": "Add requireAuth middleware before the handler", + "owasp": "A01:2021 Broken Access Control" + } + ], + "stats": { + "files_reviewed": 15, + "critical": 0, + "high": 2, + "medium": 3, + "low": 1 + } +} +``` diff --git a/agents/runner.py b/agents/runner.py new file mode 100644 index 0000000..3cd4f69 --- /dev/null +++ b/agents/runner.py @@ -0,0 +1,311 @@ +""" +Kin agent runner — launches Claude Code as subprocess with role-specific context. +Each agent = separate process with isolated context. +""" + +import json +import sqlite3 +import subprocess +import time +from pathlib import Path +from typing import Any + +from core import models +from core.context_builder import build_context, format_prompt + + +def run_agent( + conn: sqlite3.Connection, + role: str, + task_id: str, + project_id: str, + model: str = "sonnet", + previous_output: str | None = None, + brief_override: str | None = None, + dry_run: bool = False, +) -> dict: + """Run a single Claude Code agent as a subprocess. + + 1. Build context from DB + 2. Format prompt with role template + 3. Run: claude -p "{prompt}" --output-format json + 4. Log result to agent_logs + 5. Return {success, output, tokens_used, duration_seconds, cost_usd} + """ + # Build context + ctx = build_context(conn, task_id, role, project_id) + if previous_output: + ctx["previous_output"] = previous_output + if brief_override: + if ctx.get("task"): + ctx["task"]["brief"] = brief_override + + prompt = format_prompt(ctx, role) + + if dry_run: + return { + "success": True, + "output": None, + "prompt": prompt, + "role": role, + "model": model, + "dry_run": True, + } + + # Determine working directory + project = models.get_project(conn, project_id) + working_dir = None + if project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security"): + project_path = Path(project["path"]).expanduser() + if project_path.is_dir(): + working_dir = str(project_path) + + # Run claude subprocess + start = time.monotonic() + result = _run_claude(prompt, model=model, working_dir=working_dir) + duration = int(time.monotonic() - start) + + # Parse output + output_text = result.get("output", "") + success = result["returncode"] == 0 + parsed_output = _try_parse_json(output_text) + + # Log to DB + models.log_agent_run( + conn, + project_id=project_id, + task_id=task_id, + agent_role=role, + action="execute", + input_summary=f"task={task_id}, model={model}", + output_summary=output_text[:500] if output_text else None, + tokens_used=result.get("tokens_used"), + model=model, + cost_usd=result.get("cost_usd"), + success=success, + error_message=result.get("error") if not success else None, + duration_seconds=duration, + ) + + return { + "success": success, + "output": parsed_output if parsed_output else output_text, + "raw_output": output_text, + "role": role, + "model": model, + "duration_seconds": duration, + "tokens_used": result.get("tokens_used"), + "cost_usd": result.get("cost_usd"), + } + + +def _run_claude( + prompt: str, + model: str = "sonnet", + working_dir: str | None = None, +) -> dict: + """Execute claude CLI as subprocess. Returns dict with output, returncode, etc.""" + cmd = [ + "claude", + "-p", prompt, + "--output-format", "json", + "--model", model, + ] + + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=600, # 10 min max + cwd=working_dir, + ) + except FileNotFoundError: + return { + "output": "", + "error": "claude CLI not found in PATH", + "returncode": 127, + } + except subprocess.TimeoutExpired: + return { + "output": "", + "error": "Agent timed out after 600s", + "returncode": 124, + } + + # Try to extract structured data from JSON output + output = proc.stdout or "" + result: dict[str, Any] = { + "output": output, + "error": proc.stderr if proc.returncode != 0 else None, + "returncode": proc.returncode, + } + + # Parse JSON output from claude --output-format json + parsed = _try_parse_json(output) + if isinstance(parsed, dict): + result["tokens_used"] = parsed.get("usage", {}).get("total_tokens") + result["cost_usd"] = parsed.get("cost_usd") + # The actual content is usually in result or content + if "result" in parsed: + result["output"] = parsed["result"] + elif "content" in parsed: + result["output"] = parsed["content"] + + return result + + +def _try_parse_json(text: str) -> Any: + """Try to parse JSON from text. Returns parsed obj or None.""" + text = text.strip() + if not text: + return None + + # Direct parse + try: + return json.loads(text) + except json.JSONDecodeError: + pass + + # Try to find JSON block in markdown code fences + import re + m = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL) + if m: + try: + return json.loads(m.group(1)) + except json.JSONDecodeError: + pass + + # Try to find first { ... } or [ ... ] + for start_char, end_char in [("{", "}"), ("[", "]")]: + start = text.find(start_char) + if start >= 0: + # Find matching close + depth = 0 + for i in range(start, len(text)): + if text[i] == start_char: + depth += 1 + elif text[i] == end_char: + depth -= 1 + if depth == 0: + try: + return json.loads(text[start:i + 1]) + except json.JSONDecodeError: + break + return None + + +# --------------------------------------------------------------------------- +# Pipeline executor +# --------------------------------------------------------------------------- + +def run_pipeline( + conn: sqlite3.Connection, + task_id: str, + steps: list[dict], + dry_run: bool = False, +) -> dict: + """Execute a multi-step pipeline of agents. + + steps = [ + {"role": "debugger", "model": "opus", "brief": "..."}, + {"role": "tester", "depends_on": "debugger", "brief": "..."}, + ] + + Returns {success, steps_completed, total_cost, total_tokens, total_duration, results} + """ + task = models.get_task(conn, task_id) + if not task: + return {"success": False, "error": f"Task '{task_id}' not found"} + + project_id = task["project_id"] + + # Determine route type from steps or task brief + route_type = "custom" + if task.get("brief") and isinstance(task["brief"], dict): + route_type = task["brief"].get("route_type", "custom") or "custom" + + # Create pipeline in DB + pipeline = None + if not dry_run: + pipeline = models.create_pipeline( + conn, task_id, project_id, route_type, steps, + ) + models.update_task(conn, task_id, status="in_progress") + + results = [] + total_cost = 0.0 + total_tokens = 0 + total_duration = 0 + previous_output = None + + for i, step in enumerate(steps): + role = step["role"] + model = step.get("model", "sonnet") + brief = step.get("brief") + + result = run_agent( + conn, role, task_id, project_id, + model=model, + previous_output=previous_output, + brief_override=brief, + dry_run=dry_run, + ) + results.append(result) + + if dry_run: + continue + + # Accumulate stats + total_cost += result.get("cost_usd") or 0 + total_tokens += result.get("tokens_used") or 0 + total_duration += result.get("duration_seconds") or 0 + + if not result["success"]: + # Pipeline failed — stop and mark as failed + if pipeline: + models.update_pipeline( + conn, pipeline["id"], + status="failed", + total_cost_usd=total_cost, + total_tokens=total_tokens, + total_duration_seconds=total_duration, + ) + models.update_task(conn, task_id, status="blocked") + return { + "success": False, + "error": f"Step {i+1}/{len(steps)} ({role}) failed", + "steps_completed": i, + "results": results, + "total_cost_usd": total_cost, + "total_tokens": total_tokens, + "total_duration_seconds": total_duration, + "pipeline_id": pipeline["id"] if pipeline else None, + } + + # Chain output to next step + previous_output = result.get("raw_output") or result.get("output") + if isinstance(previous_output, (dict, list)): + previous_output = json.dumps(previous_output, ensure_ascii=False) + + # Pipeline completed + if pipeline and not dry_run: + models.update_pipeline( + conn, pipeline["id"], + status="completed", + total_cost_usd=total_cost, + total_tokens=total_tokens, + total_duration_seconds=total_duration, + ) + models.update_task(conn, task_id, status="review") + + return { + "success": True, + "steps_completed": len(steps), + "results": results, + "total_cost_usd": total_cost, + "total_tokens": total_tokens, + "total_duration_seconds": total_duration, + "pipeline_id": pipeline["id"] if pipeline else None, + "dry_run": dry_run, + } diff --git a/agents/specialists.yaml b/agents/specialists.yaml new file mode 100644 index 0000000..4e9342c --- /dev/null +++ b/agents/specialists.yaml @@ -0,0 +1,104 @@ +# Kin specialist pool — roles available for pipeline construction. +# PM selects from this pool based on task type. + +specialists: + pm: + name: "Project Manager" + model: sonnet + tools: [Read, Grep, Glob] + description: "Decomposes tasks, selects specialists, builds pipelines" + permissions: read_only + context_rules: + decisions: all + modules: all + + architect: + name: "Software Architect" + model: sonnet + tools: [Read, Grep, Glob] + description: "Designs solutions, reviews structure, writes specs" + permissions: read_only + context_rules: + decisions: all + modules: all + + debugger: + name: "Debugger" + model: sonnet + tools: [Read, Grep, Glob, Bash] + description: "Finds root causes, reads logs, traces execution" + permissions: read_bash + working_dir: project + context_rules: + decisions: [gotcha, workaround] + + frontend_dev: + name: "Frontend Developer" + model: sonnet + tools: [Read, Write, Edit, Bash, Glob, Grep] + description: "Implements UI: Vue, CSS, components, composables" + permissions: full + working_dir: project + context_rules: + decisions: [gotcha, workaround, convention] + + backend_dev: + name: "Backend Developer" + model: sonnet + tools: [Read, Write, Edit, Bash, Glob, Grep] + description: "Implements API, services, database, business logic" + permissions: full + working_dir: project + context_rules: + decisions: [gotcha, workaround, convention] + + tester: + name: "Tester" + model: sonnet + tools: [Read, Write, Bash, Glob, Grep] + description: "Writes and runs tests, verifies fixes" + permissions: full + working_dir: project + context_rules: + decisions: [] + + reviewer: + name: "Code Reviewer" + model: sonnet + tools: [Read, Grep, Glob] + description: "Reviews code for quality, conventions, bugs" + permissions: read_only + context_rules: + decisions: [convention] + + security: + name: "Security Engineer" + model: sonnet + tools: [Read, Grep, Glob, Bash] + description: "OWASP audit, auth checks, secrets scan, vulnerability analysis" + permissions: read_bash + working_dir: project + context_rules: + decisions_category: security + +# Route templates — PM uses these to build pipelines +routes: + debug: + steps: [debugger, tester, frontend_dev, tester] + description: "Find bug → verify → fix → verify fix" + + feature: + steps: [architect, frontend_dev, tester, reviewer] + description: "Design → implement → test → review" + + refactor: + steps: [architect, frontend_dev, tester, reviewer] + description: "Plan refactor → implement → test → review" + + hotfix: + steps: [debugger, frontend_dev, tester] + description: "Find → fix → verify (fast track)" + + security_audit: + steps: [security, architect] + description: "Audit → remediation plan" diff --git a/cli/main.py b/cli/main.py index 288fe6b..7cea90f 100644 --- a/cli/main.py +++ b/cli/main.py @@ -408,6 +408,88 @@ def cost(ctx, period): click.echo(f"\nTotal: ${total:.4f}") +# =========================================================================== +# run +# =========================================================================== + +@cli.command("run") +@click.argument("task_id") +@click.option("--dry-run", is_flag=True, help="Show pipeline plan without executing") +@click.pass_context +def run_task(ctx, task_id, dry_run): + """Run a task through the agent pipeline. + + PM decomposes the task into specialist steps, then the pipeline executes. + With --dry-run, shows the plan without running agents. + """ + from agents.runner import run_agent, run_pipeline + + conn = ctx.obj["conn"] + task = models.get_task(conn, task_id) + if not task: + click.echo(f"Task '{task_id}' not found.", err=True) + raise SystemExit(1) + + project_id = task["project_id"] + click.echo(f"Task: {task['id']} — {task['title']}") + + # Step 1: PM decomposes + click.echo("Running PM to decompose task...") + pm_result = run_agent( + conn, "pm", task_id, project_id, + model="sonnet", dry_run=dry_run, + ) + + if dry_run: + click.echo("\n--- PM Prompt (dry-run) ---") + click.echo(pm_result.get("prompt", "")[:2000]) + click.echo("\n(Dry-run: PM would produce a pipeline JSON)") + return + + if not pm_result["success"]: + click.echo(f"PM failed: {pm_result.get('output', 'unknown error')}", err=True) + raise SystemExit(1) + + # Parse PM output for pipeline + output = pm_result.get("output") + if isinstance(output, str): + try: + output = json.loads(output) + except json.JSONDecodeError: + click.echo(f"PM returned non-JSON output:\n{output[:500]}", err=True) + raise SystemExit(1) + + if not isinstance(output, dict) or "pipeline" not in output: + click.echo(f"PM output missing 'pipeline' key:\n{json.dumps(output, indent=2)[:500]}", err=True) + raise SystemExit(1) + + pipeline_steps = output["pipeline"] + analysis = output.get("analysis", "") + + click.echo(f"\nAnalysis: {analysis}") + click.echo(f"Pipeline ({len(pipeline_steps)} steps):") + for i, step in enumerate(pipeline_steps, 1): + click.echo(f" {i}. {step['role']} ({step.get('model', 'sonnet')}): {step.get('brief', '')}") + + if not click.confirm("\nExecute pipeline?"): + click.echo("Aborted.") + return + + # Step 2: Execute pipeline + click.echo("\nExecuting pipeline...") + result = run_pipeline(conn, task_id, pipeline_steps) + + if result["success"]: + click.echo(f"\nPipeline completed: {result['steps_completed']} steps") + else: + click.echo(f"\nPipeline failed at step: {result.get('error', 'unknown')}", err=True) + + if result.get("total_cost_usd"): + click.echo(f"Cost: ${result['total_cost_usd']:.4f}") + if result.get("total_duration_seconds"): + click.echo(f"Duration: {result['total_duration_seconds']}s") + + # =========================================================================== # bootstrap # =========================================================================== diff --git a/core/context_builder.py b/core/context_builder.py new file mode 100644 index 0000000..9db1b3b --- /dev/null +++ b/core/context_builder.py @@ -0,0 +1,212 @@ +""" +Kin context builder — assembles role-specific context from DB for agent prompts. +Each role gets only the information it needs, keeping prompts focused. +""" + +import json +import sqlite3 +from pathlib import Path + +from core import models + +PROMPTS_DIR = Path(__file__).parent.parent / "agents" / "prompts" +SPECIALISTS_PATH = Path(__file__).parent.parent / "agents" / "specialists.yaml" + + +def _load_specialists() -> dict: + """Load specialists.yaml (lazy, no pyyaml dependency — simple parser).""" + path = SPECIALISTS_PATH + if not path.exists(): + return {} + import yaml + return yaml.safe_load(path.read_text()) + + +def build_context( + conn: sqlite3.Connection, + task_id: str, + role: str, + project_id: str, +) -> dict: + """Build role-specific context from DB. + + Returns a dict with keys: task, project, and role-specific data. + """ + task = models.get_task(conn, task_id) + project = models.get_project(conn, project_id) + + ctx = { + "task": _slim_task(task) if task else None, + "project": _slim_project(project) if project else None, + "role": role, + } + + if role == "pm": + ctx["modules"] = models.get_modules(conn, project_id) + ctx["decisions"] = models.get_decisions(conn, project_id) + ctx["active_tasks"] = models.list_tasks(conn, project_id=project_id, status="in_progress") + try: + specs = _load_specialists() + ctx["available_specialists"] = list(specs.get("specialists", {}).keys()) + ctx["routes"] = specs.get("routes", {}) + except Exception: + ctx["available_specialists"] = [] + ctx["routes"] = {} + + elif role == "architect": + ctx["modules"] = models.get_modules(conn, project_id) + ctx["decisions"] = models.get_decisions(conn, project_id) + + elif role == "debugger": + ctx["decisions"] = models.get_decisions( + conn, project_id, types=["gotcha", "workaround"], + ) + ctx["module_hint"] = _extract_module_hint(task) + + elif role in ("frontend_dev", "backend_dev"): + ctx["decisions"] = models.get_decisions( + conn, project_id, types=["gotcha", "workaround", "convention"], + ) + + elif role == "reviewer": + ctx["decisions"] = models.get_decisions( + conn, project_id, types=["convention"], + ) + + elif role == "tester": + # Minimal context — just the task spec + pass + + elif role == "security": + ctx["decisions"] = models.get_decisions( + conn, project_id, category="security", + ) + + else: + # Unknown role — give decisions as fallback + ctx["decisions"] = models.get_decisions(conn, project_id, limit=20) + + return ctx + + +def _slim_task(task: dict) -> dict: + """Extract only relevant fields from a task for the prompt.""" + return { + "id": task["id"], + "title": task["title"], + "status": task["status"], + "priority": task["priority"], + "assigned_role": task.get("assigned_role"), + "brief": task.get("brief"), + "spec": task.get("spec"), + } + + +def _slim_project(project: dict) -> dict: + """Extract only relevant fields from a project.""" + return { + "id": project["id"], + "name": project["name"], + "path": project["path"], + "tech_stack": project.get("tech_stack"), + } + + +def _extract_module_hint(task: dict | None) -> str | None: + """Try to extract module name from task brief.""" + if not task: + return None + brief = task.get("brief") + if isinstance(brief, dict): + return brief.get("module") + return None + + +def format_prompt(context: dict, role: str, prompt_template: str | None = None) -> str: + """Format a prompt by injecting context into a role template. + + If prompt_template is None, loads from agents/prompts/{role}.md. + """ + if prompt_template is None: + prompt_path = PROMPTS_DIR / f"{role}.md" + if prompt_path.exists(): + prompt_template = prompt_path.read_text() + else: + prompt_template = f"You are a {role}. Complete the task described below." + + sections = [prompt_template, ""] + + # Project info + proj = context.get("project") + if proj: + sections.append(f"## Project: {proj['id']} — {proj['name']}") + if proj.get("tech_stack"): + sections.append(f"Tech stack: {', '.join(proj['tech_stack'])}") + sections.append(f"Path: {proj['path']}") + sections.append("") + + # Task info + task = context.get("task") + if task: + sections.append(f"## Task: {task['id']} — {task['title']}") + sections.append(f"Status: {task['status']}, Priority: {task['priority']}") + if task.get("brief"): + sections.append(f"Brief: {json.dumps(task['brief'], ensure_ascii=False)}") + if task.get("spec"): + sections.append(f"Spec: {json.dumps(task['spec'], ensure_ascii=False)}") + sections.append("") + + # Decisions + decisions = context.get("decisions") + if decisions: + sections.append(f"## Known decisions ({len(decisions)}):") + for d in decisions[:30]: # Cap at 30 to avoid token bloat + tags = f" [{', '.join(d['tags'])}]" if d.get("tags") else "" + sections.append(f"- #{d['id']} [{d['type']}] {d['title']}{tags}") + sections.append("") + + # Modules + modules = context.get("modules") + if modules: + sections.append(f"## Modules ({len(modules)}):") + for m in modules: + sections.append(f"- {m['name']} ({m['type']}) — {m['path']}") + sections.append("") + + # Active tasks (PM) + active = context.get("active_tasks") + if active: + sections.append(f"## Active tasks ({len(active)}):") + for t in active: + sections.append(f"- {t['id']}: {t['title']} [{t['status']}]") + sections.append("") + + # Available specialists (PM) + specialists = context.get("available_specialists") + if specialists: + sections.append(f"## Available specialists: {', '.join(specialists)}") + sections.append("") + + # Routes (PM) + routes = context.get("routes") + if routes: + sections.append("## Route templates:") + for name, route in routes.items(): + steps = " → ".join(route.get("steps", [])) + sections.append(f"- {name}: {steps}") + sections.append("") + + # Module hint (debugger) + hint = context.get("module_hint") + if hint: + sections.append(f"## Target module: {hint}") + sections.append("") + + # Previous step output (pipeline chaining) + prev = context.get("previous_output") + if prev: + sections.append("## Previous step output:") + sections.append(prev if isinstance(prev, str) else json.dumps(prev, ensure_ascii=False)) + sections.append("") + + return "\n".join(sections) diff --git a/tests/test_context_builder.py b/tests/test_context_builder.py new file mode 100644 index 0000000..45f27d9 --- /dev/null +++ b/tests/test_context_builder.py @@ -0,0 +1,133 @@ +"""Tests for core/context_builder.py — context assembly per role.""" + +import pytest +from core.db import init_db +from core import models +from core.context_builder import build_context, format_prompt + + +@pytest.fixture +def conn(): + c = init_db(":memory:") + # Seed project, modules, decisions, tasks + models.create_project(c, "vdol", "ВДОЛЬ и ПОПЕРЕК", "~/projects/vdolipoperek", + tech_stack=["vue3", "typescript", "nodejs"]) + models.add_module(c, "vdol", "search", "frontend", "src/search/") + models.add_module(c, "vdol", "api", "backend", "src/api/") + models.add_decision(c, "vdol", "gotcha", "Safari bug", + "position:fixed breaks", category="ui", tags=["ios"]) + models.add_decision(c, "vdol", "workaround", "API rate limit", + "10 req/s max", category="api") + models.add_decision(c, "vdol", "convention", "Use WAL mode", + "Always use WAL for SQLite", category="architecture") + models.add_decision(c, "vdol", "decision", "Auth required", + "All endpoints need auth", category="security") + models.create_task(c, "VDOL-001", "vdol", "Fix search filters", + brief={"module": "search", "route_type": "debug"}) + models.create_task(c, "VDOL-002", "vdol", "Add payments", + status="in_progress") + yield c + c.close() + + +class TestBuildContext: + def test_pm_gets_everything(self, conn): + ctx = build_context(conn, "VDOL-001", "pm", "vdol") + assert ctx["task"]["id"] == "VDOL-001" + assert ctx["project"]["id"] == "vdol" + assert len(ctx["modules"]) == 2 + assert len(ctx["decisions"]) == 4 # all decisions + assert len(ctx["active_tasks"]) == 1 # VDOL-002 in_progress + assert "pm" in ctx["available_specialists"] + + def test_architect_gets_all_decisions_and_modules(self, conn): + ctx = build_context(conn, "VDOL-001", "architect", "vdol") + assert len(ctx["modules"]) == 2 + assert len(ctx["decisions"]) == 4 + + def test_debugger_gets_only_gotcha_workaround(self, conn): + ctx = build_context(conn, "VDOL-001", "debugger", "vdol") + types = {d["type"] for d in ctx["decisions"]} + assert types <= {"gotcha", "workaround"} + assert "convention" not in types + assert "decision" not in types + assert ctx["module_hint"] == "search" + + def test_frontend_dev_gets_gotcha_workaround_convention(self, conn): + ctx = build_context(conn, "VDOL-001", "frontend_dev", "vdol") + types = {d["type"] for d in ctx["decisions"]} + assert "gotcha" in types + assert "workaround" in types + assert "convention" in types + assert "decision" not in types # plain decisions excluded + + def test_backend_dev_same_as_frontend(self, conn): + ctx = build_context(conn, "VDOL-001", "backend_dev", "vdol") + types = {d["type"] for d in ctx["decisions"]} + assert types == {"gotcha", "workaround", "convention"} + + def test_reviewer_gets_only_conventions(self, conn): + ctx = build_context(conn, "VDOL-001", "reviewer", "vdol") + types = {d["type"] for d in ctx["decisions"]} + assert types == {"convention"} + + def test_tester_gets_minimal_context(self, conn): + ctx = build_context(conn, "VDOL-001", "tester", "vdol") + assert ctx["task"] is not None + assert ctx["project"] is not None + assert "decisions" not in ctx + assert "modules" not in ctx + + def test_security_gets_security_decisions(self, conn): + ctx = build_context(conn, "VDOL-001", "security", "vdol") + categories = {d.get("category") for d in ctx["decisions"]} + assert categories == {"security"} + + def test_unknown_role_gets_fallback(self, conn): + ctx = build_context(conn, "VDOL-001", "unknown_role", "vdol") + assert "decisions" in ctx + assert len(ctx["decisions"]) > 0 + + +class TestFormatPrompt: + def test_format_with_template(self, conn): + ctx = build_context(conn, "VDOL-001", "debugger", "vdol") + prompt = format_prompt(ctx, "debugger", "You are a debugger. Find bugs.") + assert "You are a debugger" in prompt + assert "VDOL-001" in prompt + assert "Fix search filters" in prompt + assert "vdol" in prompt + assert "vue3" in prompt + + def test_format_includes_decisions(self, conn): + ctx = build_context(conn, "VDOL-001", "debugger", "vdol") + prompt = format_prompt(ctx, "debugger", "Debug this.") + assert "Safari bug" in prompt + assert "API rate limit" in prompt + # Convention should NOT be here (debugger doesn't get it) + assert "WAL mode" not in prompt + + def test_format_pm_includes_specialists(self, conn): + ctx = build_context(conn, "VDOL-001", "pm", "vdol") + prompt = format_prompt(ctx, "pm", "You are PM.") + assert "Available specialists" in prompt + assert "debugger" in prompt + assert "Active tasks" in prompt + assert "VDOL-002" in prompt + + def test_format_with_previous_output(self, conn): + ctx = build_context(conn, "VDOL-001", "tester", "vdol") + ctx["previous_output"] = "Found race condition in useSearch.ts" + prompt = format_prompt(ctx, "tester", "Write tests.") + assert "Previous step output" in prompt + assert "race condition" in prompt + + def test_format_loads_prompt_file(self, conn): + ctx = build_context(conn, "VDOL-001", "pm", "vdol") + prompt = format_prompt(ctx, "pm") # Should load from agents/prompts/pm.md + assert "decompose" in prompt.lower() or "pipeline" in prompt.lower() + + def test_format_missing_prompt_file(self, conn): + ctx = build_context(conn, "VDOL-001", "analyst", "vdol") + prompt = format_prompt(ctx, "analyst") # No analyst.md exists + assert "analyst" in prompt.lower() diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100644 index 0000000..588d681 --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,234 @@ +"""Tests for agents/runner.py — agent execution with mocked claude CLI.""" + +import json +import pytest +from unittest.mock import patch, MagicMock +from core.db import init_db +from core import models +from agents.runner import run_agent, run_pipeline, _try_parse_json + + +@pytest.fixture +def conn(): + c = init_db(":memory:") + models.create_project(c, "vdol", "ВДОЛЬ", "~/projects/vdolipoperek", + tech_stack=["vue3"]) + models.create_task(c, "VDOL-001", "vdol", "Fix bug", + brief={"route_type": "debug"}) + yield c + c.close() + + +def _mock_claude_success(output_data): + """Create a mock subprocess result with successful claude output.""" + mock = MagicMock() + mock.stdout = json.dumps(output_data) if isinstance(output_data, dict) else output_data + mock.stderr = "" + mock.returncode = 0 + return mock + + +def _mock_claude_failure(error_msg): + mock = MagicMock() + mock.stdout = "" + mock.stderr = error_msg + mock.returncode = 1 + return mock + + +# --------------------------------------------------------------------------- +# run_agent +# --------------------------------------------------------------------------- + +class TestRunAgent: + @patch("agents.runner.subprocess.run") + def test_successful_agent_run(self, mock_run, conn): + mock_run.return_value = _mock_claude_success({ + "result": "Found race condition in useSearch.ts", + "usage": {"total_tokens": 5000}, + "cost_usd": 0.015, + }) + + result = run_agent(conn, "debugger", "VDOL-001", "vdol") + + assert result["success"] is True + assert result["role"] == "debugger" + assert result["model"] == "sonnet" + assert result["duration_seconds"] >= 0 + + # Verify claude was called with right args + call_args = mock_run.call_args + cmd = call_args[0][0] + assert "claude" in cmd[0] + assert "-p" in cmd + assert "--output-format" in cmd + assert "json" in cmd + + @patch("agents.runner.subprocess.run") + def test_failed_agent_run(self, mock_run, conn): + mock_run.return_value = _mock_claude_failure("API error") + + result = run_agent(conn, "debugger", "VDOL-001", "vdol") + + assert result["success"] is False + + # Should be logged in agent_logs + logs = conn.execute("SELECT * FROM agent_logs WHERE task_id='VDOL-001'").fetchall() + assert len(logs) == 1 + assert logs[0]["success"] == 0 + + def test_dry_run_returns_prompt(self, conn): + result = run_agent(conn, "debugger", "VDOL-001", "vdol", dry_run=True) + + assert result["dry_run"] is True + assert result["prompt"] is not None + assert "VDOL-001" in result["prompt"] + assert result["output"] is None + + @patch("agents.runner.subprocess.run") + def test_agent_logs_to_db(self, mock_run, conn): + mock_run.return_value = _mock_claude_success({"result": "ok"}) + + run_agent(conn, "tester", "VDOL-001", "vdol") + + logs = conn.execute("SELECT * FROM agent_logs WHERE agent_role='tester'").fetchall() + assert len(logs) == 1 + assert logs[0]["project_id"] == "vdol" + + @patch("agents.runner.subprocess.run") + def test_previous_output_passed(self, mock_run, conn): + mock_run.return_value = _mock_claude_success({"result": "tests pass"}) + + run_agent(conn, "tester", "VDOL-001", "vdol", + previous_output="Found bug in line 42") + + call_args = mock_run.call_args + prompt = call_args[0][0][2] # -p argument + assert "line 42" in prompt + + +# --------------------------------------------------------------------------- +# run_pipeline +# --------------------------------------------------------------------------- + +class TestRunPipeline: + @patch("agents.runner.subprocess.run") + def test_successful_pipeline(self, mock_run, conn): + mock_run.return_value = _mock_claude_success({"result": "done"}) + + steps = [ + {"role": "debugger", "brief": "find bug"}, + {"role": "tester", "depends_on": "debugger", "brief": "verify"}, + ] + result = run_pipeline(conn, "VDOL-001", steps) + + assert result["success"] is True + assert result["steps_completed"] == 2 + assert len(result["results"]) == 2 + + # Pipeline created in DB + pipe = conn.execute("SELECT * FROM pipelines WHERE task_id='VDOL-001'").fetchone() + assert pipe is not None + assert pipe["status"] == "completed" + + # Task updated to review + task = models.get_task(conn, "VDOL-001") + assert task["status"] == "review" + + @patch("agents.runner.subprocess.run") + def test_pipeline_fails_on_step(self, mock_run, conn): + # First step succeeds, second fails + mock_run.side_effect = [ + _mock_claude_success({"result": "found bug"}), + _mock_claude_failure("compilation error"), + ] + + steps = [ + {"role": "debugger", "brief": "find"}, + {"role": "frontend_dev", "brief": "fix"}, + {"role": "tester", "brief": "test"}, + ] + result = run_pipeline(conn, "VDOL-001", steps) + + assert result["success"] is False + assert result["steps_completed"] == 1 # Only debugger completed + assert "frontend_dev" in result["error"] + + # Pipeline marked as failed + pipe = conn.execute("SELECT * FROM pipelines WHERE task_id='VDOL-001'").fetchone() + assert pipe["status"] == "failed" + + # Task marked as blocked + task = models.get_task(conn, "VDOL-001") + assert task["status"] == "blocked" + + def test_pipeline_dry_run(self, conn): + steps = [ + {"role": "debugger", "brief": "find"}, + {"role": "tester", "brief": "verify"}, + ] + result = run_pipeline(conn, "VDOL-001", steps, dry_run=True) + + assert result["dry_run"] is True + assert result["success"] is True + assert result["steps_completed"] == 2 + + # No pipeline created in DB + pipes = conn.execute("SELECT * FROM pipelines").fetchall() + assert len(pipes) == 0 + + @patch("agents.runner.subprocess.run") + def test_pipeline_chains_output(self, mock_run, conn): + """Output from step N is passed as previous_output to step N+1.""" + call_count = [0] + + def side_effect(*args, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + return _mock_claude_success({"result": "bug is in line 42"}) + return _mock_claude_success({"result": "test written"}) + + mock_run.side_effect = side_effect + + steps = [ + {"role": "debugger", "brief": "find"}, + {"role": "tester", "brief": "write test"}, + ] + run_pipeline(conn, "VDOL-001", steps) + + # Second call should include first step's output in prompt + second_call = mock_run.call_args_list[1] + prompt = second_call[0][0][2] # -p argument + assert "line 42" in prompt or "bug" in prompt + + def test_pipeline_task_not_found(self, conn): + result = run_pipeline(conn, "NONEXISTENT", [{"role": "debugger"}]) + assert result["success"] is False + assert "not found" in result["error"] + + +# --------------------------------------------------------------------------- +# JSON parsing +# --------------------------------------------------------------------------- + +class TestTryParseJson: + def test_direct_json(self): + assert _try_parse_json('{"a": 1}') == {"a": 1} + + def test_json_in_code_fence(self): + text = 'Some text\n```json\n{"a": 1}\n```\nMore text' + assert _try_parse_json(text) == {"a": 1} + + def test_json_embedded_in_text(self): + text = 'Here is the result: {"status": "ok", "count": 42} and more' + result = _try_parse_json(text) + assert result == {"status": "ok", "count": 42} + + def test_empty_string(self): + assert _try_parse_json("") is None + + def test_no_json(self): + assert _try_parse_json("just plain text") is None + + def test_json_array(self): + assert _try_parse_json('[1, 2, 3]') == [1, 2, 3]