Add backlog audit and task update command
- agents/prompts/backlog_audit.md: QA analyst prompt for checking
which pending tasks are already implemented in the codebase
- agents/runner.py: run_audit() — project-level agent that reads
all pending tasks, inspects code, returns classification
- cli/main.py: kin audit <project_id> — runs audit, offers to mark
done tasks; kin task update <id> --status --priority
- web/api.py: POST /api/projects/{id}/audit (runs audit inline),
POST /api/projects/{id}/audit/apply (batch mark as done)
- Frontend: "Audit backlog" button on ProjectView with results
modal showing already_done/still_pending/unclear categories
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
e755a19633
commit
96509dcafc
9 changed files with 548 additions and 2 deletions
44
agents/prompts/backlog_audit.md
Normal file
44
agents/prompts/backlog_audit.md
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
You are a QA analyst performing a backlog audit.
|
||||
|
||||
## Your task
|
||||
|
||||
You receive a list of pending tasks and have access to the project's codebase.
|
||||
For EACH task, determine: is the described feature/fix already implemented in the current code?
|
||||
|
||||
## Rules
|
||||
|
||||
- Check actual files, functions, tests — don't guess
|
||||
- Look at: file existence, function names, imports, test coverage, recent git log
|
||||
- Read relevant source files before deciding
|
||||
- If the task describes a feature and you find matching code — it's done
|
||||
- If the task describes a bug fix and you see the fix applied — it's done
|
||||
- If you find partial implementation — mark as "unclear"
|
||||
- If you can't find any related code — it's still pending
|
||||
|
||||
## How to investigate
|
||||
|
||||
1. Read package.json / pyproject.toml for project structure
|
||||
2. List src/ directory to understand file layout
|
||||
3. For each task, search for keywords in the codebase
|
||||
4. Read relevant files to confirm implementation
|
||||
5. Check tests if they exist
|
||||
|
||||
## Output format
|
||||
|
||||
Return ONLY valid JSON:
|
||||
|
||||
```json
|
||||
{
|
||||
"already_done": [
|
||||
{"id": "TASK-001", "reason": "Implemented in src/api.ts:42, function fetchData()"}
|
||||
],
|
||||
"still_pending": [
|
||||
{"id": "TASK-003", "reason": "No matching code found in codebase"}
|
||||
],
|
||||
"unclear": [
|
||||
{"id": "TASK-007", "reason": "Partial implementation in src/utils.ts, needs review"}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Every task from the input list MUST appear in exactly one category.
|
||||
128
agents/runner.py
128
agents/runner.py
|
|
@ -210,6 +210,134 @@ def _try_parse_json(text: str) -> Any:
|
|||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Backlog audit
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
PROMPTS_DIR = Path(__file__).parent / "prompts"
|
||||
|
||||
_LANG_NAMES = {"ru": "Russian", "en": "English", "es": "Spanish",
|
||||
"de": "German", "fr": "French"}
|
||||
|
||||
|
||||
def run_audit(
|
||||
conn: sqlite3.Connection,
|
||||
project_id: str,
|
||||
noninteractive: bool = False,
|
||||
) -> dict:
|
||||
"""Audit pending tasks against the actual codebase.
|
||||
|
||||
Returns {success, already_done, still_pending, unclear, duration_seconds, ...}
|
||||
"""
|
||||
project = models.get_project(conn, project_id)
|
||||
if not project:
|
||||
return {"success": False, "error": f"Project '{project_id}' not found"}
|
||||
|
||||
pending = models.list_tasks(conn, project_id=project_id, status="pending")
|
||||
if not pending:
|
||||
return {
|
||||
"success": True,
|
||||
"already_done": [],
|
||||
"still_pending": [],
|
||||
"unclear": [],
|
||||
"message": "No pending tasks to audit",
|
||||
}
|
||||
|
||||
# Build prompt
|
||||
prompt_path = PROMPTS_DIR / "backlog_audit.md"
|
||||
template = prompt_path.read_text() if prompt_path.exists() else (
|
||||
"You are a QA analyst. Check if pending tasks are already done in the code."
|
||||
)
|
||||
|
||||
task_list = [
|
||||
{"id": t["id"], "title": t["title"], "brief": t.get("brief")}
|
||||
for t in pending
|
||||
]
|
||||
|
||||
sections = [
|
||||
template,
|
||||
"",
|
||||
f"## Project: {project['id']} — {project['name']}",
|
||||
]
|
||||
if project.get("tech_stack"):
|
||||
sections.append(f"Tech stack: {', '.join(project['tech_stack'])}")
|
||||
sections.append(f"Path: {project['path']}")
|
||||
sections.append("")
|
||||
sections.append(f"## Pending tasks ({len(task_list)}):")
|
||||
sections.append(json.dumps(task_list, ensure_ascii=False, indent=2))
|
||||
sections.append("")
|
||||
|
||||
language = project.get("language", "ru")
|
||||
lang_name = _LANG_NAMES.get(language, language)
|
||||
sections.append("## Language")
|
||||
sections.append(f"ALWAYS respond in {lang_name}.")
|
||||
sections.append("")
|
||||
|
||||
prompt = "\n".join(sections)
|
||||
|
||||
# Determine working dir
|
||||
working_dir = None
|
||||
project_path = Path(project["path"]).expanduser()
|
||||
if project_path.is_dir():
|
||||
working_dir = str(project_path)
|
||||
|
||||
# Run agent
|
||||
start = time.monotonic()
|
||||
result = _run_claude(prompt, model="sonnet", working_dir=working_dir,
|
||||
noninteractive=noninteractive)
|
||||
duration = int(time.monotonic() - start)
|
||||
|
||||
raw_output = result.get("output", "")
|
||||
if not isinstance(raw_output, str):
|
||||
raw_output = json.dumps(raw_output, ensure_ascii=False)
|
||||
success = result["returncode"] == 0
|
||||
|
||||
# Log to agent_logs
|
||||
models.log_agent_run(
|
||||
conn,
|
||||
project_id=project_id,
|
||||
task_id=None,
|
||||
agent_role="backlog_audit",
|
||||
action="audit",
|
||||
input_summary=f"project={project_id}, pending_tasks={len(pending)}",
|
||||
output_summary=raw_output or None,
|
||||
tokens_used=result.get("tokens_used"),
|
||||
model="sonnet",
|
||||
cost_usd=result.get("cost_usd"),
|
||||
success=success,
|
||||
error_message=result.get("error") if not success else None,
|
||||
duration_seconds=duration,
|
||||
)
|
||||
|
||||
if not success:
|
||||
return {
|
||||
"success": False,
|
||||
"error": result.get("error", "Agent failed"),
|
||||
"raw_output": raw_output,
|
||||
"duration_seconds": duration,
|
||||
}
|
||||
|
||||
# Parse structured output
|
||||
parsed = _try_parse_json(raw_output)
|
||||
if not isinstance(parsed, dict):
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Agent returned non-JSON output",
|
||||
"raw_output": raw_output,
|
||||
"duration_seconds": duration,
|
||||
}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"already_done": parsed.get("already_done", []),
|
||||
"still_pending": parsed.get("still_pending", []),
|
||||
"unclear": parsed.get("unclear", []),
|
||||
"duration_seconds": duration,
|
||||
"tokens_used": result.get("tokens_used"),
|
||||
"cost_usd": result.get("cost_usd"),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pipeline executor
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue