kin: KIN-091 Улучшения из исследования рынка: (1) Revise button с feedback loop, (2) auto-test before review — агент сам прогоняет тесты и фиксит до review, (3) spec-driven workflow для новых проектов — constitution → spec → plan → tasks, (4) git worktrees для параллельных агентов без конфликтов, (5) auto-trigger pipeline при создании задачи с label auto

This commit is contained in:
Gros Frumos 2026-03-16 22:35:31 +02:00
parent 0cc063d47a
commit 0ccd451b4b
14 changed files with 1660 additions and 18 deletions

View file

@ -0,0 +1,37 @@
You are a Constitution Agent for a software project.
Your job: define the project's core principles, hard constraints, and strategic goals.
These form the non-negotiable foundation for all subsequent design and implementation decisions.
## Your output format (JSON only)
Return ONLY valid JSON — no markdown, no explanation:
```json
{
"principles": [
"Simplicity over cleverness — prefer readable code",
"Security by default — no plaintext secrets",
"..."
],
"constraints": [
"Must use Python 3.11+",
"No external paid APIs without fallback",
"..."
],
"goals": [
"Enable solo developer to ship features 10x faster via AI agents",
"..."
]
}
```
## Instructions
1. Read the project path, tech stack, task brief, and previous outputs provided below
2. Analyze existing CLAUDE.md, README, or design documents if available
3. Infer principles from existing code style and patterns
4. Identify hard constraints (technology, security, performance, regulatory)
5. Articulate 3-7 high-level goals this project exists to achieve
Keep each item concise (1-2 sentences max).

45
agents/prompts/spec.md Normal file
View file

@ -0,0 +1,45 @@
You are a Specification Agent for a software project.
Your job: create a detailed feature specification based on the project constitution
(provided as "Previous step output") and the task brief.
## Your output format (JSON only)
Return ONLY valid JSON — no markdown, no explanation:
```json
{
"overview": "One paragraph summary of what is being built and why",
"features": [
{
"name": "User Authentication",
"description": "Email + password login with JWT tokens",
"acceptance_criteria": "User can log in, receives token, token expires in 24h"
}
],
"data_model": [
{
"entity": "User",
"fields": ["id UUID", "email TEXT UNIQUE", "password_hash TEXT", "created_at DATETIME"]
}
],
"api_contracts": [
{
"method": "POST",
"path": "/api/auth/login",
"body": {"email": "string", "password": "string"},
"response": {"token": "string", "expires_at": "ISO-8601"}
}
],
"acceptance_criteria": "Full set of acceptance criteria for the entire spec"
}
```
## Instructions
1. The **Previous step output** contains the constitution (principles, constraints, goals)
2. Respect ALL constraints from the constitution — do not violate them
3. Design features that advance the stated goals
4. Keep the data model minimal — only what is needed
5. API contracts must be consistent with existing project patterns
6. Acceptance criteria must be testable and specific

View file

@ -0,0 +1,43 @@
You are a Task Decomposer Agent for a software project.
Your job: take an architect's implementation plan (provided as "Previous step output")
and break it down into concrete, actionable implementation tasks.
## Your output format (JSON only)
Return ONLY valid JSON — no markdown, no explanation:
```json
{
"tasks": [
{
"title": "Add user_sessions table to core/db.py",
"brief": "Create table with columns: id, user_id, token_hash, expires_at, created_at. Add migration in _migrate().",
"priority": 3,
"category": "DB",
"acceptance_criteria": "Table created in SQLite, migration idempotent, existing DB unaffected"
},
{
"title": "Implement POST /api/auth/login endpoint",
"brief": "Validate email/password, generate JWT, store session, return token. Use bcrypt for password verification.",
"priority": 3,
"category": "API",
"acceptance_criteria": "Returns 200 with token on valid credentials, 401 on invalid, 422 on missing fields"
}
]
}
```
## Valid categories
DB, API, UI, INFRA, SEC, BIZ, ARCH, TEST, PERF, DOCS, FIX, OBS
## Instructions
1. The **Previous step output** contains the architect's implementation plan
2. Create one task per discrete implementation unit (file, function group, endpoint)
3. Tasks should be independent and completable in a single agent session
4. Priority: 1 = critical, 3 = normal, 5 = low
5. Each task must have clear, testable acceptance criteria
6. Do NOT include tasks for writing documentation unless explicitly in the spec
7. Aim for 3-10 tasks — if you need more, group related items

View file

@ -54,6 +54,19 @@ def _build_claude_env() -> dict:
seen.add(d)
deduped.append(d)
env["PATH"] = ":".join(deduped)
# Ensure SSH agent is available for agents that connect via SSH.
# Under launchd, SSH_AUTH_SOCK is not inherited — detect macOS system socket.
if "SSH_AUTH_SOCK" not in env:
import glob
socks = glob.glob("/private/tmp/com.apple.launchd.*/Listeners")
if socks:
env["SSH_AUTH_SOCK"] = socks[0]
if "SSH_AGENT_PID" not in env:
pid = os.environ.get("SSH_AGENT_PID")
if pid:
env["SSH_AGENT_PID"] = pid
return env
@ -127,6 +140,7 @@ def run_agent(
dry_run: bool = False,
allow_write: bool = False,
noninteractive: bool = False,
working_dir_override: str | None = None,
) -> dict:
"""Run a single Claude Code agent as a subprocess.
@ -161,7 +175,9 @@ def run_agent(
working_dir = None
# Operations projects have no local path — sysadmin works via SSH
is_operations = project and project.get("project_type") == "operations"
if not is_operations and project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security"):
if working_dir_override:
working_dir = working_dir_override
elif not is_operations and project and role in ("debugger", "frontend_dev", "backend_dev", "tester", "security", "constitution", "spec", "task_decomposer"):
project_path = Path(project["path"]).expanduser()
if project_path.is_dir():
working_dir = str(project_path)
@ -685,6 +701,151 @@ def _save_sysadmin_output(
}
# ---------------------------------------------------------------------------
# Auto-test: detect test failure in agent output
# ---------------------------------------------------------------------------
_TEST_FAILURE_PATTERNS = [
r"\bFAILED\b",
r"\bFAIL\b",
r"\d+\s+failed",
r"test(?:s)?\s+failed",
r"assert(?:ion)?\s*(error|failed)",
r"exception(?:s)?\s+occurred",
r"returncode\s*[!=]=\s*0",
r"Error:\s",
r"ERRORS?\b",
]
_TEST_SUCCESS_PATTERNS = [
r"no\s+failures",
r"all\s+tests?\s+pass",
r"0\s+failed",
r"passed.*no\s+errors",
]
def _is_test_failure(result: dict) -> bool:
"""Return True if agent output indicates test failures.
Checks for failure keywords, guards against false positives from
explicit success phrases (e.g. 'no failures').
"""
output = result.get("raw_output") or result.get("output") or ""
if not isinstance(output, str):
output = json.dumps(output, ensure_ascii=False)
for p in _TEST_SUCCESS_PATTERNS:
if re.search(p, output, re.IGNORECASE):
return False
for p in _TEST_FAILURE_PATTERNS:
if re.search(p, output, re.IGNORECASE):
return True
return False
# ---------------------------------------------------------------------------
# Auto-test runner: run project tests via `make test`
# ---------------------------------------------------------------------------
# Roles that trigger auto-test when project.auto_test_enabled is set
_AUTO_TEST_ROLES = {"backend_dev", "frontend_dev"}
def _run_project_tests(project_path: str, timeout: int = 120) -> dict:
"""Run `make test` in project_path. Returns {success, output, returncode}.
Never raises all errors are captured and returned in output.
"""
env = _build_claude_env()
make_cmd = shutil.which("make", path=env["PATH"]) or "make"
try:
result = subprocess.run(
[make_cmd, "test"],
cwd=project_path,
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
output = (result.stdout or "") + (result.stderr or "")
return {"success": result.returncode == 0, "output": output, "returncode": result.returncode}
except subprocess.TimeoutExpired:
return {"success": False, "output": f"make test timed out after {timeout}s", "returncode": 124}
except FileNotFoundError:
return {"success": False, "output": "make not found — no Makefile or make not in PATH", "returncode": 127}
except Exception as exc:
return {"success": False, "output": f"Test run error: {exc}", "returncode": -1}
# ---------------------------------------------------------------------------
# Decomposer output: create child tasks from task_decomposer JSON
# ---------------------------------------------------------------------------
def _save_decomposer_output(
conn: sqlite3.Connection,
project_id: str,
parent_task_id: str,
result: dict,
) -> dict:
"""Parse task_decomposer output and create child tasks in DB.
Expected output format: {tasks: [{title, brief, priority, category, acceptance_criteria}]}
Idempotent: skips tasks with same parent_task_id + title (case-insensitive).
Returns {created: int, skipped: int}.
"""
raw = result.get("raw_output") or result.get("output") or ""
if isinstance(raw, (dict, list)):
raw = json.dumps(raw, ensure_ascii=False)
parsed = _try_parse_json(raw)
if not isinstance(parsed, dict):
return {"created": 0, "skipped": 0, "error": "non-JSON decomposer output"}
task_list = parsed.get("tasks", [])
if not isinstance(task_list, list):
return {"created": 0, "skipped": 0, "error": "invalid tasks format"}
created = 0
skipped = 0
for item in task_list:
if not isinstance(item, dict):
continue
title = (item.get("title") or "").strip()
if not title:
continue
# Idempotency: skip if same parent + title already exists
existing = conn.execute(
"""SELECT id FROM tasks
WHERE parent_task_id = ? AND lower(trim(title)) = lower(trim(?))""",
(parent_task_id, title),
).fetchone()
if existing:
skipped += 1
continue
category = (item.get("category") or "").strip().upper()
if category not in models.TASK_CATEGORIES:
category = None
task_id = models.next_task_id(conn, project_id, category=category)
brief_text = item.get("brief") or ""
models.create_task(
conn,
task_id,
project_id,
title,
priority=item.get("priority", 5),
brief={"text": brief_text, "source": f"decomposer:{parent_task_id}"},
category=category,
acceptance_criteria=item.get("acceptance_criteria"),
parent_task_id=parent_task_id,
)
created += 1
return {"created": created, "skipped": skipped}
# ---------------------------------------------------------------------------
# Auto-learning: extract decisions from pipeline results
# ---------------------------------------------------------------------------
@ -866,6 +1027,26 @@ def run_pipeline(
model = step.get("model", "sonnet")
brief = step.get("brief")
# Worktree isolation: opt-in per project, for write-capable roles
_WORKTREE_ROLES = {"backend_dev", "frontend_dev", "debugger"}
worktree_path = None
project_for_wt = models.get_project(conn, task["project_id"]) if not dry_run else None
use_worktree = (
not dry_run
and role in _WORKTREE_ROLES
and project_for_wt
and project_for_wt.get("worktrees_enabled")
and project_for_wt.get("path")
)
if use_worktree:
try:
from core.worktree import create_worktree, ensure_gitignore
p_path = str(Path(project_for_wt["path"]).expanduser())
ensure_gitignore(p_path)
worktree_path = create_worktree(p_path, task_id, role)
except Exception:
worktree_path = None # Fall back to normal execution
try:
result = run_agent(
conn, role, task_id, project_id,
@ -875,6 +1056,7 @@ def run_pipeline(
dry_run=dry_run,
allow_write=allow_write,
noninteractive=noninteractive,
working_dir_override=worktree_path,
)
except Exception as exc:
exc_msg = f"Step {i+1}/{len(steps)} ({role}) raised exception: {exc}"
@ -999,6 +1181,44 @@ def run_pipeline(
"pipeline_id": pipeline["id"] if pipeline else None,
}
# Worktree merge/cleanup after successful step
if worktree_path and result["success"] and not dry_run:
try:
from core.worktree import merge_worktree, cleanup_worktree
p_path = str(Path(project_for_wt["path"]).expanduser())
merge_result = merge_worktree(worktree_path, p_path)
if not merge_result["success"]:
conflicts = merge_result.get("conflicts", [])
conflict_msg = f"Worktree merge conflict in files: {', '.join(conflicts)}" if conflicts else "Worktree merge failed"
models.update_task(conn, task_id, status="blocked", blocked_reason=conflict_msg)
cleanup_worktree(worktree_path, p_path)
if pipeline:
models.update_pipeline(conn, pipeline["id"], status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration)
return {
"success": False,
"error": conflict_msg,
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
cleanup_worktree(worktree_path, p_path)
except Exception:
pass # Worktree errors must never block pipeline
elif worktree_path and not dry_run:
# Step failed — cleanup worktree without merging
try:
from core.worktree import cleanup_worktree
p_path = str(Path(project_for_wt["path"]).expanduser())
cleanup_worktree(worktree_path, p_path)
except Exception:
pass
results.append(result)
# Semantic blocked: agent ran successfully but returned status='blocked'
@ -1056,6 +1276,137 @@ def run_pipeline(
except Exception:
pass # Never block pipeline on sysadmin save errors
# Save decomposer output: create child tasks from task_decomposer JSON
if role == "task_decomposer" and result["success"] and not dry_run:
try:
_save_decomposer_output(conn, project_id, task_id, result)
except Exception:
pass # Never block pipeline on decomposer save errors
# Project-level auto-test: run `make test` after backend_dev/frontend_dev steps.
# Enabled per project via auto_test_enabled flag (opt-in).
# On failure, loop fixer up to KIN_AUTO_TEST_MAX_ATTEMPTS times, then block.
if (
not dry_run
and role in _AUTO_TEST_ROLES
and result["success"]
and project_for_wt
and project_for_wt.get("auto_test_enabled")
and project_for_wt.get("path")
):
max_auto_test_attempts = int(os.environ.get("KIN_AUTO_TEST_MAX_ATTEMPTS") or 3)
p_path_str = str(Path(project_for_wt["path"]).expanduser())
test_run = _run_project_tests(p_path_str)
results.append({"role": "_auto_test", "success": test_run["success"],
"output": test_run["output"], "_project_test": True})
auto_test_attempt = 0
while not test_run["success"] and auto_test_attempt < max_auto_test_attempts:
auto_test_attempt += 1
fix_context = (
f"Automated project test run (make test) failed after your changes.\n"
f"Test output:\n{test_run['output'][:4000]}\n"
f"Fix the failing tests. Do NOT modify test files."
)
fix_result = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=fix_context,
dry_run=False,
allow_write=allow_write,
noninteractive=noninteractive,
)
total_cost += fix_result.get("cost_usd") or 0
total_tokens += fix_result.get("tokens_used") or 0
total_duration += fix_result.get("duration_seconds") or 0
results.append({**fix_result, "_auto_test_fix_attempt": auto_test_attempt})
test_run = _run_project_tests(p_path_str)
results.append({"role": "_auto_test", "success": test_run["success"],
"output": test_run["output"], "_project_test": True,
"_attempt": auto_test_attempt})
if not test_run["success"]:
block_reason = (
f"Auto-test (make test) failed after {auto_test_attempt} fix attempt(s). "
f"Last output: {test_run['output'][:500]}"
)
models.update_task(conn, task_id, status="blocked", blocked_reason=block_reason)
if pipeline:
models.update_pipeline(conn, pipeline["id"], status="failed",
total_cost_usd=total_cost,
total_tokens=total_tokens,
total_duration_seconds=total_duration)
return {
"success": False,
"error": block_reason,
"steps_completed": i,
"results": results,
"total_cost_usd": total_cost,
"total_tokens": total_tokens,
"total_duration_seconds": total_duration,
"pipeline_id": pipeline["id"] if pipeline else None,
}
# Auto-test loop: if tester step has auto_fix=true and tests failed,
# call fix_role agent and re-run tester up to max_attempts times.
if (
not dry_run
and step.get("auto_fix")
and role == "tester"
and result["success"]
and _is_test_failure(result)
):
max_attempts = int(step.get("max_attempts", 3))
fix_role = step.get("fix_role", "backend_dev")
fix_model = step.get("fix_model", model)
attempt = 0
while attempt < max_attempts and _is_test_failure(result):
attempt += 1
tester_output = result.get("raw_output") or result.get("output") or ""
if isinstance(tester_output, (dict, list)):
tester_output = json.dumps(tester_output, ensure_ascii=False)
# Run fixer
fix_result = run_agent(
conn, fix_role, task_id, project_id,
model=fix_model,
previous_output=tester_output,
dry_run=False,
allow_write=allow_write,
noninteractive=noninteractive,
)
total_cost += fix_result.get("cost_usd") or 0
total_tokens += fix_result.get("tokens_used") or 0
total_duration += fix_result.get("duration_seconds") or 0
results.append({**fix_result, "_auto_fix_attempt": attempt})
# Re-run tester
fix_output = fix_result.get("raw_output") or fix_result.get("output") or ""
if isinstance(fix_output, (dict, list)):
fix_output = json.dumps(fix_output, ensure_ascii=False)
retest = run_agent(
conn, role, task_id, project_id,
model=model,
previous_output=fix_output,
dry_run=False,
allow_write=allow_write,
noninteractive=noninteractive,
)
total_cost += retest.get("cost_usd") or 0
total_tokens += retest.get("tokens_used") or 0
total_duration += retest.get("duration_seconds") or 0
result = retest
results.append({**result, "_auto_retest_attempt": attempt})
# Save final test result regardless of outcome
try:
final_output = result.get("raw_output") or result.get("output") or ""
models.update_task(conn, task_id, test_result={
"output": final_output if isinstance(final_output, str) else str(final_output),
"auto_fix_attempts": attempt,
"passed": not _is_test_failure(result),
})
except Exception:
pass
# Chain output to next step
previous_output = result.get("raw_output") or result.get("output")
if isinstance(previous_output, (dict, list)):

View file

@ -111,6 +111,46 @@ specialists:
codebase_diff: "array of { file, line_hint, issue, suggestion }"
notes: string
constitution:
name: "Constitution Agent"
model: sonnet
tools: [Read, Grep, Glob]
description: "Defines project principles, constraints, and non-negotiables. First step in spec-driven workflow."
permissions: read_only
context_rules:
decisions: all
output_schema:
principles: "array of strings"
constraints: "array of strings"
goals: "array of strings"
spec:
name: "Spec Agent"
model: sonnet
tools: [Read, Grep, Glob]
description: "Creates detailed feature specification from constitution output. Second step in spec-driven workflow."
permissions: read_only
context_rules:
decisions: all
output_schema:
overview: string
features: "array of { name, description, acceptance_criteria }"
data_model: "array of { entity, fields }"
api_contracts: "array of { method, path, body, response }"
acceptance_criteria: string
task_decomposer:
name: "Task Decomposer"
model: sonnet
tools: [Read, Grep, Glob]
description: "Decomposes architect output into concrete implementation tasks. Creates child tasks in DB."
permissions: read_only
context_rules:
decisions: all
modules: all
output_schema:
tasks: "array of { title, brief, priority, category, acceptance_criteria }"
# Route templates — PM uses these to build pipelines
routes:
debug:
@ -144,3 +184,7 @@ routes:
infra_debug:
steps: [sysadmin, debugger, reviewer]
description: "SSH diagnose → find root cause → verify fix plan"
spec_driven:
steps: [constitution, spec, architect, task_decomposer]
description: "Constitution → spec → implementation plan → decompose into tasks"