Fix audit hanging: add auto_apply param + allow_write for tool access

Root cause: claude agent without --dangerously-skip-permissions
hangs on tool permission prompts when stdin=DEVNULL.

Fixes:
- run_audit() now passes allow_write=True so agent can use
  Read/Bash tools without interactive permission prompts
- Added auto_apply param: False for API (result only),
  CLI confirms with user then applies manually
- API explicitly passes auto_apply=False
- Tests for auto_apply=True/False behavior

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Gros Frumos 2026-03-15 18:00:39 +02:00
parent 96509dcafc
commit 9cbb3cec37
4 changed files with 78 additions and 5 deletions

View file

@ -224,9 +224,13 @@ def run_audit(
conn: sqlite3.Connection, conn: sqlite3.Connection,
project_id: str, project_id: str,
noninteractive: bool = False, noninteractive: bool = False,
auto_apply: bool = False,
) -> dict: ) -> dict:
"""Audit pending tasks against the actual codebase. """Audit pending tasks against the actual codebase.
auto_apply=True: marks already_done tasks as done in DB.
auto_apply=False: returns results only (for API/GUI).
Returns {success, already_done, still_pending, unclear, duration_seconds, ...} Returns {success, already_done, still_pending, unclear, duration_seconds, ...}
""" """
project = models.get_project(conn, project_id) project = models.get_project(conn, project_id)
@ -281,10 +285,11 @@ def run_audit(
if project_path.is_dir(): if project_path.is_dir():
working_dir = str(project_path) working_dir = str(project_path)
# Run agent # Run agent — allow_write=True so claude can use Read/Bash tools
# without interactive permission prompts (critical for noninteractive mode)
start = time.monotonic() start = time.monotonic()
result = _run_claude(prompt, model="sonnet", working_dir=working_dir, result = _run_claude(prompt, model="sonnet", working_dir=working_dir,
noninteractive=noninteractive) allow_write=True, noninteractive=noninteractive)
duration = int(time.monotonic() - start) duration = int(time.monotonic() - start)
raw_output = result.get("output", "") raw_output = result.get("output", "")
@ -327,11 +332,25 @@ def run_audit(
"duration_seconds": duration, "duration_seconds": duration,
} }
already_done = parsed.get("already_done", [])
# Auto-apply: mark already_done tasks as done in DB
applied = []
if auto_apply and already_done:
for item in already_done:
tid = item.get("id")
if tid:
t = models.get_task(conn, tid)
if t and t["project_id"] == project_id and t["status"] == "pending":
models.update_task(conn, tid, status="done")
applied.append(tid)
return { return {
"success": True, "success": True,
"already_done": parsed.get("already_done", []), "already_done": already_done,
"still_pending": parsed.get("still_pending", []), "still_pending": parsed.get("still_pending", []),
"unclear": parsed.get("unclear", []), "unclear": parsed.get("unclear", []),
"applied": applied,
"duration_seconds": duration, "duration_seconds": duration,
"tokens_used": result.get("tokens_used"), "tokens_used": result.get("tokens_used"),
"cost_usd": result.get("cost_usd"), "cost_usd": result.get("cost_usd"),

View file

@ -613,6 +613,7 @@ def audit_backlog(ctx, project_id):
return return
click.echo(f"Auditing {len(pending)} pending tasks for {project_id}...") click.echo(f"Auditing {len(pending)} pending tasks for {project_id}...")
# First pass: get results only (no auto_apply yet)
result = run_audit(conn, project_id) result = run_audit(conn, project_id)
if not result["success"]: if not result["success"]:
@ -643,9 +644,14 @@ def audit_backlog(ctx, project_id):
if result.get("duration_seconds"): if result.get("duration_seconds"):
click.echo(f"Duration: {result['duration_seconds']}s") click.echo(f"Duration: {result['duration_seconds']}s")
# Apply: mark tasks as done after user confirmation
if done and click.confirm(f"\nMark {len(done)} tasks as done?"): if done and click.confirm(f"\nMark {len(done)} tasks as done?"):
for item in done: for item in done:
models.update_task(conn, item["id"], status="done") tid = item.get("id")
if tid:
t = models.get_task(conn, tid)
if t and t["project_id"] == project_id and t["status"] == "pending":
models.update_task(conn, tid, status="done")
click.echo(f"Marked {len(done)} tasks as done.") click.echo(f"Marked {len(done)} tasks as done.")

View file

@ -414,3 +414,51 @@ class TestRunAudit:
prompt = mock_run.call_args[0][0][2] # -p argument prompt = mock_run.call_args[0][0][2] # -p argument
assert "VDOL-001" in prompt assert "VDOL-001" in prompt
assert "Fix bug" in prompt assert "Fix bug" in prompt
@patch("agents.runner.subprocess.run")
def test_audit_auto_apply_marks_done(self, mock_run, conn):
"""auto_apply=True should mark already_done tasks as done in DB."""
mock_run.return_value = _mock_claude_success({
"result": json.dumps({
"already_done": [{"id": "VDOL-001", "reason": "Done"}],
"still_pending": [],
"unclear": [],
}),
})
result = run_audit(conn, "vdol", auto_apply=True)
assert result["success"] is True
assert "VDOL-001" in result["applied"]
task = models.get_task(conn, "VDOL-001")
assert task["status"] == "done"
@patch("agents.runner.subprocess.run")
def test_audit_no_auto_apply_keeps_pending(self, mock_run, conn):
"""auto_apply=False should NOT change task status."""
mock_run.return_value = _mock_claude_success({
"result": json.dumps({
"already_done": [{"id": "VDOL-001", "reason": "Done"}],
"still_pending": [],
"unclear": [],
}),
})
result = run_audit(conn, "vdol", auto_apply=False)
assert result["success"] is True
assert result["applied"] == []
task = models.get_task(conn, "VDOL-001")
assert task["status"] == "pending"
@patch("agents.runner.subprocess.run")
def test_audit_uses_dangerously_skip_permissions(self, mock_run, conn):
"""Audit must use --dangerously-skip-permissions for tool access."""
mock_run.return_value = _mock_claude_success({
"result": json.dumps({"already_done": [], "still_pending": [], "unclear": []}),
})
run_audit(conn, "vdol")
cmd = mock_run.call_args[0][0]
assert "--dangerously-skip-permissions" in cmd

View file

@ -399,7 +399,7 @@ def audit_project(project_id: str):
if not p: if not p:
conn.close() conn.close()
raise HTTPException(404, f"Project '{project_id}' not found") raise HTTPException(404, f"Project '{project_id}' not found")
result = run_audit(conn, project_id, noninteractive=True) result = run_audit(conn, project_id, noninteractive=True, auto_apply=False)
conn.close() conn.close()
return result return result