mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-30 13:01:02 +00:00
feat: stream autoreview progress
This commit is contained in:
@@ -27,10 +27,11 @@ Use when:
|
||||
- For security-audit suppression changes, verify accepted findings remain auditable: suppressed findings stay in structured output, active output keeps an unsuppressible suppression notice, and aggregate findings cannot hide unrelated active risk.
|
||||
- Never switch or override the requested review engine/model. If the review hits model capacity, retry the same command a few times with the same engine/model.
|
||||
- Be patient with large bundles. Structured review can take up to 30 minutes while the model call is active, especially with Codex tools or web search.
|
||||
- Treat heartbeat lines like `review still running: ... elapsed=... pid=...` as healthy progress, not a hang. Let the helper continue while heartbeats are advancing.
|
||||
- Treat heartbeat lines like `review still running: ... elapsed=... pid=...` as healthy progress, not a hang. Let the helper continue while heartbeats are advancing. Pass `--stream-engine-output` when live engine text is useful; Codex filters tool/file chatter and other engines pass raw output through.
|
||||
- Do not kill a review just because it has been quiet for 2-5 minutes, or because it is still running under the 30-minute window. Inspect the process only after missing multiple expected heartbeats, after 30 minutes, or after an obviously failed subprocess; prefer letting the same helper command finish.
|
||||
- Tools are useful in review mode. The helper allows read-only inspection tools and web search by default so reviewers can check dependency contracts, upstream docs, and current behavior.
|
||||
- Security perspective is always included, but it should not cripple legitimate functionality. Report security findings only when the change creates a concrete, actionable risk or removes an important safety check.
|
||||
- For regression provenance, if no blamed PR is traceable, use the blamed commit as the provenance: commit SHA, date, and author username. Do not guess a merger or frame missing PR metadata as a separate finding.
|
||||
- Do not invoke built-in `codex review`, nested reviewers, or reviewer panels from inside the review. The helper builds one bundle, calls one selected engine, validates one structured result, and stops.
|
||||
- Stop as soon as the helper exits 0 with no accepted/actionable findings. Do not run an extra review just to get a nicer "clean" line, a second opinion, or clearer closeout wording.
|
||||
- Treat the helper's successful exit plus absence of actionable findings as the clean review result, even if the underlying Codex CLI output is terse.
|
||||
@@ -168,11 +169,12 @@ The helper:
|
||||
- supports `--engine codex`, `claude`, `droid`, and `copilot`; default is `AUTOREVIEW_ENGINE` or `codex`; Codex should remain the default when nothing is set
|
||||
- use `--mode commit --commit <ref>` for already-committed work, especially clean `main` after landing
|
||||
- should be left in `--mode auto` or forced to `--mode branch` for PR/branch work; do not force `--mode local` after committing
|
||||
- writes only to stdout unless `--output` or `--json-output` is set
|
||||
- writes only to stdout unless `--output`, `--json-output`, or live streamed engine stderr is set
|
||||
- supports `--dry-run`, `--parallel-tests`, `--prompt`, `--prompt-file`, `--dataset`, `--no-tools`, `--no-web-search`, and commit refs
|
||||
- supports `--stream-engine-output` or `AUTOREVIEW_STREAM_ENGINE_OUTPUT=1` for live engine text while preserving structured validation; Codex hides tool/file event details, emits compact activity summaries, and reports token usage at turn completion
|
||||
- supports opt-in review panels with `--panel` / `--reviewers`, plus per-engine `--model` and `--thinking`
|
||||
- allows read-only tools and web search by default where the selected CLI supports them; forbids nested review in the prompt; Codex is run through `codex exec` with read-only sandbox and structured output
|
||||
- prints `review still running: <engine> elapsed=<seconds>s pid=<pid>` to stderr at long-running intervals while waiting for the selected review engine
|
||||
- prints `review still running: <engine> elapsed=<seconds>s pid=<pid>` to stderr at long-running intervals while waiting for the selected review engine, unless streamed output or compact Codex activity has been visible recently
|
||||
- prints `autoreview clean: no accepted/actionable findings reported` when the selected review command exits 0
|
||||
- exits nonzero when accepted/actionable findings are present
|
||||
|
||||
|
||||
@@ -6,13 +6,15 @@ import concurrent.futures
|
||||
import copy
|
||||
import json
|
||||
import os
|
||||
import queue
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import textwrap
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
|
||||
|
||||
ENGINES = ("codex", "claude", "droid", "copilot")
|
||||
@@ -100,7 +102,18 @@ def run_with_heartbeat(
|
||||
input_text: str | None = None,
|
||||
label: str,
|
||||
heartbeat_seconds: int = 60,
|
||||
stream_output: bool = False,
|
||||
stream_display: Callable[[str, str], str | None] | None = None,
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
if stream_output:
|
||||
return run_with_stream(
|
||||
args,
|
||||
cwd,
|
||||
input_text=input_text,
|
||||
label=label,
|
||||
heartbeat_seconds=heartbeat_seconds,
|
||||
stream_display=stream_display,
|
||||
)
|
||||
started = time.monotonic()
|
||||
proc = subprocess.Popen(
|
||||
args,
|
||||
@@ -124,6 +137,82 @@ def run_with_heartbeat(
|
||||
print(f"review still running: {label} elapsed={elapsed}s pid={proc.pid}", file=sys.stderr, flush=True)
|
||||
|
||||
|
||||
def run_with_stream(
|
||||
args: list[str],
|
||||
cwd: Path,
|
||||
*,
|
||||
input_text: str | None,
|
||||
label: str,
|
||||
heartbeat_seconds: int,
|
||||
stream_display: Callable[[str, str], str | None] | None,
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
started = time.monotonic()
|
||||
proc = subprocess.Popen(
|
||||
args,
|
||||
cwd=cwd,
|
||||
stdin=subprocess.PIPE if input_text is not None else None,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
)
|
||||
events: queue.Queue[tuple[str, str | None]] = queue.Queue()
|
||||
stdout_parts: list[str] = []
|
||||
stderr_parts: list[str] = []
|
||||
|
||||
def read_stream(name: str, stream: Any) -> None:
|
||||
try:
|
||||
for line in iter(stream.readline, ""):
|
||||
events.put((name, line))
|
||||
finally:
|
||||
events.put((name, None))
|
||||
|
||||
def write_stdin() -> None:
|
||||
if proc.stdin is None or input_text is None:
|
||||
return
|
||||
try:
|
||||
proc.stdin.write(input_text)
|
||||
proc.stdin.close()
|
||||
except BrokenPipeError:
|
||||
return
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=read_stream, args=("stdout", proc.stdout), daemon=True),
|
||||
threading.Thread(target=read_stream, args=("stderr", proc.stderr), daemon=True),
|
||||
]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
stdin_thread = threading.Thread(target=write_stdin, daemon=True)
|
||||
stdin_thread.start()
|
||||
|
||||
open_streams = 2
|
||||
while open_streams:
|
||||
try:
|
||||
name, line = events.get(timeout=heartbeat_seconds)
|
||||
except queue.Empty:
|
||||
elapsed = int(time.monotonic() - started)
|
||||
print(f"review still running: {label} elapsed={elapsed}s pid={proc.pid}", file=sys.stderr, flush=True)
|
||||
continue
|
||||
if line is None:
|
||||
open_streams -= 1
|
||||
continue
|
||||
if name == "stdout":
|
||||
stdout_parts.append(line)
|
||||
else:
|
||||
stderr_parts.append(line)
|
||||
display = stream_display(name, line) if stream_display else line
|
||||
if display:
|
||||
target = sys.stdout if name == "stdout" else sys.stderr
|
||||
target.write(display)
|
||||
target.flush()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
stdin_thread.join(timeout=1)
|
||||
returncode = proc.wait()
|
||||
return subprocess.CompletedProcess(args, returncode, "".join(stdout_parts), "".join(stderr_parts))
|
||||
|
||||
|
||||
def git(repo: Path, *args: str, check: bool = True) -> str:
|
||||
return run(["git", *args], repo, check=check).stdout
|
||||
|
||||
@@ -336,9 +425,11 @@ def run_codex(args: argparse.Namespace, repo: Path, prompt: str) -> str:
|
||||
cmd.extend(["--model", args.model])
|
||||
if args.thinking:
|
||||
cmd.extend(["-c", f'model_reasoning_effort="{args.thinking}"'])
|
||||
cmd.append("exec")
|
||||
if args.stream_engine_output:
|
||||
cmd.append("--json")
|
||||
cmd.extend(
|
||||
[
|
||||
"exec",
|
||||
"--ephemeral",
|
||||
"-C",
|
||||
str(repo),
|
||||
@@ -351,7 +442,14 @@ def run_codex(args: argparse.Namespace, repo: Path, prompt: str) -> str:
|
||||
"-",
|
||||
]
|
||||
)
|
||||
result = run_with_heartbeat(cmd, repo, input_text=prompt, label="codex")
|
||||
result = run_with_heartbeat(
|
||||
cmd,
|
||||
repo,
|
||||
input_text=prompt,
|
||||
label="codex",
|
||||
stream_output=args.stream_engine_output,
|
||||
stream_display=CodexStreamDisplay() if args.stream_engine_output else None,
|
||||
)
|
||||
try:
|
||||
output = output_path.read_text()
|
||||
finally:
|
||||
@@ -368,7 +466,7 @@ def run_claude(args: argparse.Namespace, repo: Path, prompt: str) -> str:
|
||||
"--print",
|
||||
"--no-session-persistence",
|
||||
"--output-format",
|
||||
"json",
|
||||
"stream-json" if args.stream_engine_output else "json",
|
||||
"--json-schema",
|
||||
json.dumps(SCHEMA),
|
||||
]
|
||||
@@ -376,11 +474,13 @@ def run_claude(args: argparse.Namespace, repo: Path, prompt: str) -> str:
|
||||
cmd.extend(["--allowedTools", claude_allowed_tools(args)])
|
||||
else:
|
||||
cmd.extend(["--tools", ""])
|
||||
if args.stream_engine_output:
|
||||
cmd.append("--verbose")
|
||||
if args.model:
|
||||
cmd.extend(["--model", args.model])
|
||||
if args.thinking:
|
||||
cmd.extend(["--effort", args.thinking])
|
||||
result = run_with_heartbeat(cmd, repo, input_text=prompt, label="claude")
|
||||
result = run_with_heartbeat(cmd, repo, input_text=prompt, label="claude", stream_output=args.stream_engine_output)
|
||||
if result.returncode != 0:
|
||||
raise SystemExit(f"claude engine failed ({result.returncode})\n{result.stderr or result.stdout}")
|
||||
return result.stdout
|
||||
@@ -405,7 +505,7 @@ def run_droid(args: argparse.Namespace, repo: Path, prompt: str) -> str:
|
||||
cmd.extend(["--model", args.model])
|
||||
if not args.tools:
|
||||
cmd.extend(["--disabled-tools", "*"])
|
||||
result = run_with_heartbeat(cmd, repo, label="droid")
|
||||
result = run_with_heartbeat(cmd, repo, label="droid", stream_output=args.stream_engine_output)
|
||||
prompt_path.unlink(missing_ok=True)
|
||||
if result.returncode != 0:
|
||||
raise SystemExit(f"droid engine failed ({result.returncode})\n{result.stderr or result.stdout}")
|
||||
@@ -430,7 +530,7 @@ def run_copilot(args: argparse.Namespace, repo: Path, prompt: str) -> str:
|
||||
"--output-format",
|
||||
"json",
|
||||
"--stream",
|
||||
"off",
|
||||
"on" if args.stream_engine_output else "off",
|
||||
"--no-ask-user",
|
||||
"--disable-builtin-mcps",
|
||||
]
|
||||
@@ -447,12 +547,68 @@ def run_copilot(args: argparse.Namespace, repo: Path, prompt: str) -> str:
|
||||
)
|
||||
if args.web_search:
|
||||
cmd.append("--allow-all-urls")
|
||||
result = run_with_heartbeat(cmd, Path(tempdir), label="copilot")
|
||||
result = run_with_heartbeat(cmd, Path(tempdir), label="copilot", stream_output=args.stream_engine_output)
|
||||
if result.returncode != 0:
|
||||
raise SystemExit(f"copilot engine failed ({result.returncode})\n{result.stderr or result.stdout}")
|
||||
return result.stdout
|
||||
|
||||
|
||||
class CodexStreamDisplay:
|
||||
def __init__(self, *, activity_seconds: int = 20) -> None:
|
||||
self.activity_seconds = activity_seconds
|
||||
self.hidden_events = 0
|
||||
self.last_visible = time.monotonic()
|
||||
|
||||
def __call__(self, name: str, line: str) -> str | None:
|
||||
if name != "stdout":
|
||||
return line
|
||||
try:
|
||||
event = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
return self.visible(line)
|
||||
event_type = event.get("type")
|
||||
if event_type == "thread.started":
|
||||
return self.visible(f"codex thread: {event.get('thread_id', '<unknown>')}\n")
|
||||
if event_type == "turn.started":
|
||||
return self.visible("codex turn started\n")
|
||||
if event_type == "turn.completed":
|
||||
usage = event.get("usage")
|
||||
message = format_codex_usage(usage) + "\n" if isinstance(usage, dict) else "codex turn completed\n"
|
||||
return self.visible(self.flush_hidden() + message)
|
||||
item = event.get("item")
|
||||
if isinstance(item, dict) and item.get("type") == "agent_message" and isinstance(item.get("text"), str):
|
||||
return self.visible(self.flush_hidden() + item["text"].rstrip() + "\n")
|
||||
return self.hidden_activity()
|
||||
|
||||
def hidden_activity(self) -> str | None:
|
||||
self.hidden_events += 1
|
||||
if time.monotonic() - self.last_visible < self.activity_seconds:
|
||||
return None
|
||||
return self.visible(self.flush_hidden())
|
||||
|
||||
def flush_hidden(self) -> str:
|
||||
if not self.hidden_events:
|
||||
return ""
|
||||
count = self.hidden_events
|
||||
self.hidden_events = 0
|
||||
return f"codex activity: {count} hidden tool/status events\n"
|
||||
|
||||
def visible(self, text: str) -> str:
|
||||
self.last_visible = time.monotonic()
|
||||
return text
|
||||
|
||||
|
||||
def format_codex_usage(usage: dict[str, Any]) -> str:
|
||||
fields = [
|
||||
"input_tokens",
|
||||
"cached_input_tokens",
|
||||
"output_tokens",
|
||||
"reasoning_output_tokens",
|
||||
]
|
||||
parts = [f"{field}={usage[field]}" for field in fields if isinstance(usage.get(field), int)]
|
||||
return "codex usage: " + " ".join(parts) if parts else "codex usage: unavailable"
|
||||
|
||||
|
||||
def claude_allowed_tools(args: argparse.Namespace) -> str:
|
||||
tools = [tool.strip() for tool in args.claude_allowed_tools.split(",") if tool.strip()]
|
||||
if not args.web_search:
|
||||
@@ -673,6 +829,12 @@ def parse_args() -> argparse.Namespace:
|
||||
parser.add_argument("--dataset", action="append", help="Extra evidence file to include in the review bundle.")
|
||||
parser.add_argument("--output", help="Write human output to a file as well as stdout.")
|
||||
parser.add_argument("--json-output", help="Write validated structured review JSON.")
|
||||
parser.add_argument(
|
||||
"--stream-engine-output",
|
||||
action="store_true",
|
||||
default=os.environ.get("AUTOREVIEW_STREAM_ENGINE_OUTPUT") == "1",
|
||||
help="Stream review engine output while preserving buffered output for validation. Codex output is filtered to hide tool/file chatter.",
|
||||
)
|
||||
parser.add_argument("--parallel-tests", help="Run a test command concurrently with review; failure fails the helper.")
|
||||
parser.add_argument("--require-finding", action="append", default=[], help="Require finding text to contain this substring.")
|
||||
parser.add_argument("--expect-findings", action="store_true", help="Treat findings as success; for harness acceptance tests.")
|
||||
|
||||
Reference in New Issue
Block a user