From 36802d2a624f9ea4578de2d3657340eb01604e0c Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Mon, 15 Jun 2026 01:11:53 +0300 Subject: [PATCH 1/2] wire graph-phase index progress (count-first pass1 + pass steps) build_ast_graph emits JCIRAG_PROGRESS kind=graph under --verbose (count-first exact total in pass 1; pass 2-6 step lines). The sync and async subprocess drains route progress events to an on_progress callback via ProgressRelay (parse-first, single-writer). init/increment/ reprocess render a determinate graph-phase bar in default TTY mode (running only after the builder spawns); --quiet silent; --verbose raw. Renderer.apply() is a no-op after stop() (drain-thread safety). Co-Authored-By: Claude --- build_ast_graph.py | 80 +++++++++++- java_codebase_rag/cli.py | 79 ++++++++++-- java_codebase_rag/cli_format.py | 8 ++ java_codebase_rag/cli_progress.py | 22 +++- java_codebase_rag/pipeline.py | 35 +++++- java_codebase_rag/progress.py | 35 ++++++ tests/test_ast_graph_build.py | 100 +++++++++++++++ tests/test_cli_progress_stdout_invariant.py | 4 +- tests/test_java_codebase_rag_cli.py | 131 ++++++++++++++++++++ tests/test_progress.py | 101 +++++++++++++++ 10 files changed, 570 insertions(+), 25 deletions(-) diff --git a/build_ast_graph.py b/build_ast_graph.py index 5fda415..4f10b56 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -25,6 +25,7 @@ from __future__ import annotations import argparse +import contextlib import hashlib import json import logging @@ -84,6 +85,53 @@ def _verbose_stderr_line(content: str) -> None: print(content, file=sys.stderr, flush=True) +def _emit_graph_progress(parts: dict[str, object], *, verbose: bool) -> None: + """Emit one ``JCIRAG_PROGRESS kind=graph …`` line to stderr (gated by verbose). + + The parent process (``pipeline.run_build_ast_graph`` / + ``run_incremental_graph``) passes ``--verbose`` in default AND verbose modes + (only suppressed for ``--quiet``), so this structured progress surfaces in + default mode (where the parent renders it) and verbose mode (raw relay). In + ``--quiet`` the builder is never invoked with ``--verbose`` so nothing is + emitted. Field order is fixed so the parser and tests can pin substrings. + """ + if not verbose: + return + fields = ["kind=graph"] + for key in ("pass", "done", "total", "status", "elapsed_s"): + if key in parts: + fields.append(f"{key}={parts[key]}") + line = "JCIRAG_PROGRESS " + " ".join(fields) + _verbose_stderr_line(line) + + +# Pass-1 per-file tick cadence: bound stderr volume on huge trees without making +# the bar feel stale. A final tick on pass completion carries status=done. +_PASS1_TICK_EVERY = 25 + + +@contextlib.contextmanager +def _graph_pass_progress(pass_label: str, *, verbose: bool): + """Emit ``pass=N/6 status=running`` on entry and ``status=done elapsed_s=…`` + on exit for passes 2–6 (each advances the rendered bar by 1/6). + + Usage: ``with _graph_pass_progress("2/6", verbose=verbose): …`` + """ + if not verbose: + yield + return + _emit_graph_progress({"pass": pass_label, "status": "running"}, verbose=verbose) + t0 = time.time() + try: + yield + finally: + elapsed = time.time() - t0 + _emit_graph_progress( + {"pass": pass_label, "status": "done", "elapsed_s": f"{elapsed:.2f}"}, + verbose=verbose, + ) + + class _VerbosePassHeartbeats: """Emit ``[tag] running … Ns elapsed`` every 5s on stderr while in scope (verbose only).""" @@ -852,6 +900,19 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: n_files = 0 if verbose: _verbose_stderr_line(_PASS1_START) + # Count-first: one filtered walk (no parsing) to set the EXACT total before + # the parse loop ticks. Single-layer ignore → the count is exact, so the + # rendered bar is determinate. For a scoped (incremental) parse the total is + # the scope size; for a full rebuild it is the non-ignored .java count. + if verbose: + if scope_files is not None: + pass1_total = len(scope_files) + else: + pass1_total = sum(1 for _ in iter_java_source_files(root, ignore=ignore)) + _emit_graph_progress( + {"pass": "1/6", "done": 0, "total": pass1_total, "status": "running"}, + verbose=verbose, + ) slow_sec = 0.0 raw_slow = os.environ.get("JAVA_CODEBASE_RAG_TEST_GRAPH_SLOW_SEC", "").strip() if raw_slow: @@ -871,6 +932,11 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: if scope_files is not None and rel not in scope_files: continue n_files += 1 + if verbose and (n_files % _PASS1_TICK_EVERY == 0): + _emit_graph_progress( + {"pass": "1/6", "done": n_files, "status": "running"}, + verbose=verbose, + ) try: content = p.read_bytes() except OSError: @@ -906,6 +972,10 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: if verbose: elapsed = time.time() - t0 + _emit_graph_progress( + {"pass": "1/6", "done": n_files, "status": "done", "elapsed_s": f"{elapsed:.2f}"}, + verbose=verbose, + ) _verbose_stderr_line( f"[graph] pass 1 · parsed {n_files} files in {elapsed:.2f}s: " f"{len(tables.types)} types, {len(tables.members)} members, " @@ -1145,7 +1215,7 @@ def pass2_edges(tables: GraphTables, asts: dict[str, JavaFileAst], *, verbose: b seen_inj: set[tuple[str, str, str, str]] = set() if verbose: _verbose_stderr_line(_PASS2_START) - with _VerbosePassHeartbeats("[graph] pass 2", verbose=verbose): + with _graph_pass_progress("2/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 2", verbose=verbose): for fqn, entry in tables.types.items(): ast = asts.get(entry.file_path) if ast is None: @@ -1818,7 +1888,7 @@ def pass3_calls(tables: GraphTables, asts: dict[str, JavaFileAst], *, verbose: b _verbose_stderr_line(_PASS3_START) _build_member_indexes(tables) stats = CallResolutionStats() - with _VerbosePassHeartbeats("[graph] pass 3", verbose=verbose): + with _graph_pass_progress("3/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 3", verbose=verbose): for rel_path, file_ast in asts.items(): try: _process_file_calls(file_ast, rel_path, tables, stats) @@ -1972,7 +2042,7 @@ def pass4_routes( meta_chain = collect_annotation_meta_chain(prs) if verbose: _verbose_stderr_line(_PASS4_START) - with _VerbosePassHeartbeats("[graph] pass 4", verbose=verbose): + with _graph_pass_progress("4/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 4", verbose=verbose): for ast in asts.values(): stats.routes_skipped_unresolved += ast.routes_skipped_unresolved @@ -2149,7 +2219,7 @@ def _phantom_async_route_id(call: OutgoingCallDecl) -> str: if verbose: _verbose_stderr_line(_PASS5_START) - with _VerbosePassHeartbeats("[graph] pass 5", verbose=verbose): + with _graph_pass_progress("5/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 5", verbose=verbose): for member in sorted(tables.members, key=lambda x: x.node_id): if member.decl.is_constructor: continue @@ -2551,7 +2621,7 @@ def _micro_factor(member: MemberEntry | None) -> float: if verbose: _verbose_stderr_line(_PASS6_START) - with _VerbosePassHeartbeats("[graph] pass 6", verbose=verbose): + with _graph_pass_progress("6/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 6", verbose=verbose): for row in tables.http_call_rows: if row.match != "unresolved": continue diff --git a/java_codebase_rag/cli.py b/java_codebase_rag/cli.py index 5fca79f..02173cf 100644 --- a/java_codebase_rag/cli.py +++ b/java_codebase_rag/cli.py @@ -131,15 +131,41 @@ def _run_with_pipeline_progress( cfg: ResolvedOperatorConfig, *, quiet: bool, - work: Callable[[], int], + verbose: bool = False, + work: Callable[["PipelineProgress | None"], int], ) -> int: - if quiet: - return int(work()) + """Run ``work`` under the unified progress renderer (default TTY mode only). + + ``work`` receives a :class:`PipelineProgress` whose ``on_progress`` callback + should be forwarded to the graph/vectors pipeline helpers so their + ``JCIRAG_PROGRESS`` events feed the renderer. In ``--quiet`` or ``--verbose`` + mode the context is ``None`` (no Live region: quiet is silent, verbose + raw-relays subprocess output). + """ + if quiet or verbose: + return int(work(None)) + from java_codebase_rag.progress import IndexProgressRenderer, ProgressEvent + + # PR-2 owns the graph task only; vectors/optimize stay pending (PR-3). + phases = ["graph"] + renderer = IndexProgressRenderer(phases) + progress = PipelineProgress(renderer=renderer) + + def on_progress(ev: ProgressEvent) -> None: + renderer.apply(ev) + + progress.on_progress = on_progress + progress.console = renderer._console # noqa: SLF001 — shared with the drain for Live-safe routing + _pipeline_header(subcommand, cfg) t0 = time.perf_counter() code = 0 + # start() always flips _started (the non-TTY fallback is a no-op for Live but + # still needs the flag so apply() routes to the concise-line printer). The + # TTY Live region is entered inside start() only when the console is a TTY. + renderer.start() try: - code = int(work()) + code = int(work(progress)) return code except BaseException as exc: # Keep footer aligned with process outcome (main maps unhandled Exception -> exit 2). @@ -155,9 +181,26 @@ def _run_with_pipeline_progress( code = 2 raise finally: + renderer.stop() _pipeline_footer(subcommand, t0, code) +class PipelineProgress: + """Progress context handed to ``work``: the renderer + a ready ``on_progress``. + + ``on_progress``/``console`` are wired by :func:`_run_with_pipeline_progress` + and should be forwarded to the pipeline helpers' ``on_progress`` / + ``on_progress_console`` parameters. ``console`` is the renderer's stderr + ``rich.Console`` so the subprocess drain routes non-progress lines through + ``console.print`` while the Live region is up (single-writer invariant). + """ + + def __init__(self, *, renderer: "object | None") -> None: + self.renderer = renderer + self.on_progress: "Callable | None" = None + self.console: "object | None" = None + + def _jsonable(value: Any) -> Any: if hasattr(value, "model_dump"): return value.model_dump() @@ -266,7 +309,7 @@ def _cmd_init(args: argparse.Namespace) -> int: return 2 cfg.index_dir.mkdir(parents=True, exist_ok=True) - def work() -> int: + def work(progress: "PipelineProgress | None") -> int: env = cfg.subprocess_env() verbose = bool(args.verbose) coco = run_cocoindex_update( @@ -295,6 +338,8 @@ def work() -> int: verbose=verbose, quiet=bool(args.quiet), env=env, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) if g.returncode != 0: _emit( @@ -310,7 +355,9 @@ def work() -> int: _emit({"success": True, "message": "init completed"}) return 0 - return _run_with_pipeline_progress("init", cfg, quiet=bool(args.quiet), work=work) + return _run_with_pipeline_progress( + "init", cfg, quiet=bool(args.quiet), verbose=bool(args.verbose), work=work + ) def _cmd_increment(args: argparse.Namespace) -> int: @@ -323,7 +370,7 @@ def _cmd_increment(args: argparse.Namespace) -> int: if vectors_only: _emit_increment_ladybug_warning() - def work() -> int: + def work(progress: "PipelineProgress | None") -> int: env = cfg.subprocess_env() coco = run_cocoindex_update( env, @@ -356,6 +403,8 @@ def work() -> int: verbose=bool(args.verbose), quiet=bool(args.quiet), env=env, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) # Check if incremental fell back to full rebuild @@ -389,7 +438,9 @@ def work() -> int: _emit({"success": True, "message": "increment completed (Lance + graph updated)"}) return 0 - return _run_with_pipeline_progress("increment", cfg, quiet=bool(args.quiet), work=work) + return _run_with_pipeline_progress( + "increment", cfg, quiet=bool(args.quiet), verbose=bool(args.verbose), work=work + ) def _cmd_reprocess(args: argparse.Namespace) -> int: @@ -397,7 +448,7 @@ def _cmd_reprocess(args: argparse.Namespace) -> int: _startup_hints(cfg) cfg.apply_to_os_environ() - def work() -> int: + def work(progress: "PipelineProgress | None") -> int: env = cfg.subprocess_env() verbose = bool(args.verbose) vectors_only = bool(getattr(args, "vectors_only", False)) @@ -443,6 +494,8 @@ def work() -> int: verbose=verbose, quiet=bool(args.quiet), env=env, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) if _is_graph_preflight_blocker(g): payload = { @@ -482,7 +535,9 @@ def work() -> int: _emit_reprocess_outcome(payload) return _reprocess_exit_code(payload) - return _run_with_pipeline_progress("reprocess", cfg, quiet=bool(args.quiet), work=work) + return _run_with_pipeline_progress( + "reprocess", cfg, quiet=bool(args.quiet), verbose=bool(args.verbose), work=work + ) def _cmd_install(args: argparse.Namespace) -> int: @@ -537,7 +592,7 @@ def _cmd_erase(args: argparse.Namespace) -> int: print("Aborted.", file=sys.stderr) return 2 - def work() -> int: + def work(progress: "PipelineProgress | None") -> int: env = cfg.subprocess_env() drop = run_cocoindex_drop(env, quiet=bool(args.quiet)) if drop.returncode == 127: @@ -570,7 +625,7 @@ def work() -> int: _emit({"success": True, "message": "erase completed"}) return 0 - return _run_with_pipeline_progress("erase", cfg, quiet=bool(args.quiet), work=work) + return _run_with_pipeline_progress("erase", cfg, quiet=bool(args.quiet), verbose=bool(getattr(args, "verbose", False)), work=work) def _cmd_meta(args: argparse.Namespace) -> int: diff --git a/java_codebase_rag/cli_format.py b/java_codebase_rag/cli_format.py index 6b4f001..a8b7e80 100644 --- a/java_codebase_rag/cli_format.py +++ b/java_codebase_rag/cli_format.py @@ -25,6 +25,14 @@ b'"event": "brownfield-', b"unknown producer source strategy", b"unknown client source strategy", + # Builder verbose heartbeats / pass banners: in default mode the renderer's + # bar subsumes these, so they must NOT also appear as raw lines above the + # Live region. --verbose raw-relay bypasses this filter and still shows them. + b"[graph] pass ", + b"[graph] scoped write ", + b"[graph] writing ", + b"[graph] done ", + b"[increment] ", ) diff --git a/java_codebase_rag/cli_progress.py b/java_codebase_rag/cli_progress.py index f025738..9b31863 100644 --- a/java_codebase_rag/cli_progress.py +++ b/java_codebase_rag/cli_progress.py @@ -3,8 +3,10 @@ import asyncio import sys +from typing import Callable from java_codebase_rag.cli_format import bold_cyan, is_noise_line, styled_check, styled_cross +from java_codebase_rag.progress import CallbackRenderer, ProgressEvent, ProgressRelay def emit_vectors_start() -> None: @@ -61,8 +63,15 @@ async def accumulate_and_relay_subprocess_streams( *, relay: bool, verbose: bool = True, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> tuple[bytes, bytes]: - """Read stdout and stderr until EOF; optionally copy non-noise stderr chunks to stderr.""" + """Read stdout and stderr until EOF; optionally copy non-noise stderr chunks to stderr. + + When ``on_progress`` is set, stderr is drained through a :class:`ProgressRelay` + so ``JCIRAG_PROGRESS`` lines are parsed and routed to ``on_progress`` (and + suppressed from the relay), matching the sync ``pipeline._popen_capturing_stderr``. + """ stdout = proc.stdout stderr = proc.stderr if stdout is None or stderr is None: @@ -70,7 +79,16 @@ async def accumulate_and_relay_subprocess_streams( out_buf = bytearray() err_buf = bytearray() - filt = _AsyncLineFilter() if (relay and not verbose) else None + if on_progress is not None: + filt = ProgressRelay( + CallbackRenderer(on_progress, on_progress_console), + console=on_progress_console, + verbose=verbose, + ) + elif relay and not verbose: + filt = _AsyncLineFilter() + else: + filt = None async def drain_stdout(reader: asyncio.StreamReader, target: bytearray) -> None: while True: diff --git a/java_codebase_rag/pipeline.py b/java_codebase_rag/pipeline.py index 1caa7e5..8e668dd 100644 --- a/java_codebase_rag/pipeline.py +++ b/java_codebase_rag/pipeline.py @@ -9,10 +9,12 @@ import threading import time from pathlib import Path +from typing import Callable from java_codebase_rag.cli_format import Spinner, is_noise_line, stderr_is_tty from java_codebase_rag.cli_progress import emit_vectors_finish, emit_vectors_start from java_codebase_rag.config import cocoindex_subprocess_env_defaults +from java_codebase_rag.progress import CallbackRenderer, ProgressEvent, ProgressRelay COCOINDEX_TARGET = "java_index_flow_lancedb.py:JavaCodeIndexLance" @@ -66,11 +68,28 @@ def _popen_capturing_stderr( proc: subprocess.Popen[bytes], *, verbose: bool = True, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> tuple[str, str, int]: - """Capture stdout/stderr; relay stderr through noise filter (or verbatim in verbose mode).""" + """Capture stdout/stderr; relay stderr through noise filter (or verbatim in verbose mode). + + When ``on_progress`` is set, stderr is drained through a :class:`ProgressRelay` + instead of the bare ``_LineFilter``: progress lines are parsed first, routed to + ``on_progress``, and suppressed from the relay; non-progress lines follow the + relay's routing (``console.print`` while a Live region is up via + ``on_progress_console``, raw ``buffer.write`` in verbose mode). + """ out_buf = bytearray() err_buf = bytearray() - filt = _LineFilter() if not verbose else None + if on_progress is not None: + relay = ProgressRelay( + CallbackRenderer(on_progress, on_progress_console), + console=on_progress_console, + verbose=verbose, + ) + filt: _LineFilter | ProgressRelay | None = relay + else: + filt = _LineFilter() if not verbose else None def drain_out() -> None: assert proc.stdout is not None @@ -259,6 +278,8 @@ def run_build_ast_graph( verbose: bool, quiet: bool = False, env: dict[str, str] | None = None, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> subprocess.CompletedProcess[str]: builder = bundle_dir() / "build_ast_graph.py" if not builder.is_file(): @@ -297,7 +318,9 @@ def run_build_ast_graph( stderr=subprocess.PIPE, bufsize=0, ) - out_s, err_s, code = _popen_capturing_stderr(proc, verbose=verbose) + out_s, err_s, code = _popen_capturing_stderr( + proc, verbose=verbose, on_progress=on_progress, on_progress_console=on_progress_console + ) if not verbose: from java_codebase_rag.cli_format import bold_cyan, styled_check, styled_cross marker = styled_check() if code == 0 else styled_cross() @@ -312,6 +335,8 @@ def run_incremental_graph( verbose: bool, quiet: bool = False, env: dict[str, str] | None = None, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> subprocess.CompletedProcess[str]: """Run incremental graph rebuild by passing --incremental flag to build_ast_graph.py.""" builder = bundle_dir() / "build_ast_graph.py" @@ -352,7 +377,9 @@ def run_incremental_graph( stderr=subprocess.PIPE, bufsize=0, ) - out_s, err_s, code = _popen_capturing_stderr(proc, verbose=verbose) + out_s, err_s, code = _popen_capturing_stderr( + proc, verbose=verbose, on_progress=on_progress, on_progress_console=on_progress_console + ) if not verbose: from java_codebase_rag.cli_format import bold_cyan, styled_check, styled_cross marker = styled_check() if code == 0 else styled_cross() diff --git a/java_codebase_rag/progress.py b/java_codebase_rag/progress.py index 7527836..baa9725 100644 --- a/java_codebase_rag/progress.py +++ b/java_codebase_rag/progress.py @@ -55,6 +55,7 @@ "parse_progress_line", "IndexProgressRenderer", "ProgressRelay", + "CallbackRenderer", ] ProgressKind = Literal["vectors", "graph", "optimize"] @@ -271,7 +272,16 @@ def apply(self, ev: ProgressEvent) -> None: ``status == "failed"`` halts the task and marks the description with a red ``✗`` (rich renders the spinner stopped). On non-TTY consoles this delegates to the throttled concise-line printer. + + Safe to call after :meth:`stop`: once the Live region is torn down a + drain thread may still feed a trailing event; this is a no-op then so + the apply/stop pair is atomic from the caller's view (PR-1 review + Minor #6 / cross-PR risk #10). """ + if not self._started: + # After stop() the Live region is gone and rich tasks are torn down; + # a trailing event from the drain thread must not mutate state. + return if self._fallback: self._fallback_apply(ev) return @@ -375,6 +385,31 @@ def _format_concise( return Text(kind) +class CallbackRenderer: + """Adapter exposing a ``renderer.apply(ev)`` surface to :class:`ProgressRelay`. + + ``ProgressRelay`` calls ``renderer.apply(ev)`` for each parsed progress line. + This adapter forwards to a caller-supplied ``on_progress`` callback (used by + the sync/async subprocess drains in ``pipeline`` / ``cli_progress`` to route + progress events up to the command-level renderer without owning a Live region + themselves). It carries a ``_console`` attribute so the relay can route + non-progress lines through ``console.print`` while a Live region is up. + """ + + def __init__(self, on_progress, console=None) -> None: # type: ignore[no-untyped-def] + self._on_progress = on_progress + self._console = console + + def apply(self, ev: ProgressEvent) -> None: + self._on_progress(ev) + + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + # --------------------------------------------------------------------------- # Relay # --------------------------------------------------------------------------- diff --git a/tests/test_ast_graph_build.py b/tests/test_ast_graph_build.py index b78a1ac..57e06ea 100644 --- a/tests/test_ast_graph_build.py +++ b/tests/test_ast_graph_build.py @@ -404,3 +404,103 @@ def test_pass3_known_external_calls_preserved(ladybug_db_path: Path) -> None: found = [str(r[0]) for r in rows] assert found, "bank fixture should have known-external CALLS rows" assert all(s not in ("phantom", "chained_receiver") for s in found), found + + +# --------------------------------------------------------------------------- +# Graph-phase JCIRAG_PROGRESS emission (PR-2) +# --------------------------------------------------------------------------- + + +def _run_builder_verbose(corpus_root: Path, target_db: Path, *, extra_args: list[str] | None = None) -> subprocess.CompletedProcess: + """Run build_ast_graph.py --verbose and return the CompletedProcess.""" + script = Path(__file__).resolve().parent.parent / "build_ast_graph.py" + cmd = [ + sys.executable, + str(script), + "--source-root", str(corpus_root), + "--ladybug-path", str(target_db), + "--verbose", + *(extra_args or []), + ] + return subprocess.run(cmd, capture_output=True, text=True, timeout=300) + + +def _progress_lines(stderr: str) -> list[str]: + return [ln for ln in stderr.splitlines() if "JCIRAG_PROGRESS kind=graph" in ln] + + +def _count_filtered_java_files(corpus_root: Path) -> int: + from path_filtering import LayeredIgnore, iter_java_source_files + + return sum(1 for _ in iter_java_source_files(corpus_root.resolve(), ignore=LayeredIgnore(corpus_root.resolve()))) + + +def test_build_ast_graph_pass1_emits_per_file_progress(corpus_root: Path, tmp_path: Path) -> None: + """Pass 1 is count-first: a `total=` line precedes the first `done=` tick; ticks advance.""" + target = tmp_path / "p1.lbug" + proc = _run_builder_verbose(corpus_root, target) + assert proc.returncode == 0, f"stderr:\n{proc.stderr}" + lines = _progress_lines(proc.stderr) + # Pass-1 lines carry pass=1/6 with a total and per-file done ticks. + pass1 = [ln for ln in lines if "pass=1/6" in ln] + assert pass1, f"expected pass 1 progress lines, got: {lines!r}" + # The first pass-1 line with a total must precede the first line with a done tick. + totals = [ln for ln in pass1 if "total=" in ln] + dones = [ln for ln in pass1 if "done=" in ln] + assert totals, f"pass 1 must emit a count-first total; lines: {pass1!r}" + assert dones, f"pass 1 must emit per-file done ticks; lines: {pass1!r}" + first_total_idx = lines.index(totals[0]) + first_done_idx = lines.index(dones[0]) + assert first_total_idx <= first_done_idx, "total must precede the first done tick" + # Done ticks advance (monotonic non-decreasing values). + done_vals = [int(re.search(r"done=(\d+)", ln).group(1)) for ln in dones if re.search(r"done=(\d+)", ln)] + assert done_vals == sorted(done_vals), f"done ticks must be monotonic: {done_vals}" + + +def test_build_ast_graph_pass1_total_is_exact_filtered_count(corpus_root: Path, tmp_path: Path) -> None: + """The count-first pass-1 total equals the exact non-ignored .java file count.""" + target = tmp_path / "p1total.lbug" + proc = _run_builder_verbose(corpus_root, target) + assert proc.returncode == 0, f"stderr:\n{proc.stderr}" + lines = _progress_lines(proc.stderr) + pass1_total_lines = [ + ln for ln in lines if "pass=1/6" in ln and "total=" in ln and "done=" in ln + ] + # Fall back to any pass-1 line carrying a total. + if not pass1_total_lines: + pass1_total_lines = [ln for ln in lines if "pass=1/6" in ln and "total=" in ln] + assert pass1_total_lines, f"no pass-1 total line found; lines: {lines!r}" + totals = [int(re.search(r"total=(\d+)", ln).group(1)) for ln in pass1_total_lines if re.search(r"total=(\d+)", ln)] + assert totals, f"could not parse total from: {pass1_total_lines!r}" + expected = _count_filtered_java_files(corpus_root) + assert totals[0] == expected, f"pass-1 total {totals[0]} != filtered count {expected}" + + +def test_build_ast_graph_passes_2_to_6_emit_step_progress(corpus_root: Path, tmp_path: Path) -> None: + """Each of passes 2–6 emits a `pass=N/6` step line on entry/exit.""" + target = tmp_path / "p2to6.lbug" + proc = _run_builder_verbose(corpus_root, target) + assert proc.returncode == 0, f"stderr:\n{proc.stderr}" + lines = _progress_lines(proc.stderr) + for n in range(2, 7): + step_lines = [ln for ln in lines if f"pass={n}/6" in ln] + assert step_lines, f"pass {n}/6 emitted no progress lines; full: {lines!r}" + + +def test_build_ast_graph_quiet_emits_no_progress(corpus_root: Path, tmp_path: Path) -> None: + """Without --verbose the builder emits no JCIRAG_PROGRESS lines.""" + script = Path(__file__).resolve().parent.parent / "build_ast_graph.py" + target = tmp_path / "quiet.lbug" + proc = subprocess.run( + [ + sys.executable, + str(script), + "--source-root", str(corpus_root), + "--ladybug-path", str(target), + ], + capture_output=True, + text=True, + timeout=300, + ) + assert proc.returncode == 0, f"stderr:\n{proc.stderr}" + assert _progress_lines(proc.stderr) == [], "quiet build must not emit JCIRAG_PROGRESS" diff --git a/tests/test_cli_progress_stdout_invariant.py b/tests/test_cli_progress_stdout_invariant.py index 444f826..739f879 100644 --- a/tests/test_cli_progress_stdout_invariant.py +++ b/tests/test_cli_progress_stdout_invariant.py @@ -272,7 +272,7 @@ def capture_footer(_sub: str, _t0: float, code: int) -> None: monkeypatch.setattr(cli_mod, "_pipeline_footer", capture_footer) - def boom() -> int: + def boom(_progress) -> int: raise RuntimeError("simulated handler failure") with pytest.raises(RuntimeError, match="simulated handler failure"): @@ -281,7 +281,7 @@ def boom() -> int: codes.clear() - def exit5() -> int: + def exit5(_progress) -> int: raise SystemExit(5) with pytest.raises(SystemExit) as excinfo: diff --git a/tests/test_java_codebase_rag_cli.py b/tests/test_java_codebase_rag_cli.py index f2b2c29..3b35324 100644 --- a/tests/test_java_codebase_rag_cli.py +++ b/tests/test_java_codebase_rag_cli.py @@ -1230,3 +1230,134 @@ def test_console_script_entry_point_routes_through_wrapper() -> None: assert 'java-codebase-rag = "java_codebase_rag.cli:_console_script_main"' in pyproject assert 'java-codebase-rag = "java-codebase-rag:main"' not in pyproject assert 'java-codebase-rag = "java_codebase_rag.cli:main"' not in pyproject + + +# --------------------------------------------------------------------------- +# PR-2: graph-phase progress wiring (default vs --quiet) +# --------------------------------------------------------------------------- + + +def _make_stub_completed(*, returncode: int = 0, stderr: str = "") -> "subprocess.CompletedProcess[str]": + import subprocess + + return subprocess.CompletedProcess(args=["stub"], returncode=returncode, stdout="", stderr=stderr) + + +def _patch_pipeline_for_graph_progress(monkeypatch: pytest.MonkeyPatch, *, emit_graph: bool) -> None: + """Patch the cocoindex + graph pipeline helpers used by init/increment. + + When ``emit_graph`` is True the patched graph helper invokes the caller's + ``on_progress`` callback with a synthetic ``kind=graph`` event — simulating + what the real subprocess drain would feed the renderer in default mode. + """ + from java_codebase_rag import cli as _cli + from java_codebase_rag import pipeline as _pipeline + + def _fake_cocoindex_update(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None): + return _make_stub_completed(returncode=0) + + def _fake_run_build_ast_graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, on_progress=None, on_progress_console=None): + if emit_graph and on_progress is not None: + from java_codebase_rag.progress import ProgressEvent + + on_progress( + ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=10, total=130, + status="running", elapsed_s=None, + ) + ) + return _make_stub_completed(returncode=0) + + def _fake_run_incremental_graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, on_progress=None, on_progress_console=None): + if emit_graph and on_progress is not None: + from java_codebase_rag.progress import ProgressEvent + + on_progress( + ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=3, total=130, + status="running", elapsed_s=None, + ) + ) + return _make_stub_completed(returncode=0) + + # Patch where cli.py imported them (module-level names in cli). + monkeypatch.setattr(_cli, "run_cocoindex_update", _fake_cocoindex_update) + monkeypatch.setattr(_cli, "run_build_ast_graph", _fake_run_build_ast_graph) + monkeypatch.setattr(_cli, "run_incremental_graph", _fake_run_incremental_graph) + # Also patch the pipeline module attributes in case anything imports there. + monkeypatch.setattr(_pipeline, "run_cocoindex_update", _fake_cocoindex_update) + monkeypatch.setattr(_pipeline, "run_build_ast_graph", _fake_run_build_ast_graph) + monkeypatch.setattr(_pipeline, "run_incremental_graph", _fake_run_incremental_graph) + + +def test_cli_init_default_mode_graph_phase_progress_on_stderr( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """In default mode a graph-phase progress event is parsed and rendered to + stderr; the raw ``JCIRAG_PROGRESS`` line is NOT echoed verbatim.""" + idx = tmp_path / "idx_init_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + _patch_pipeline_for_graph_progress(monkeypatch, emit_graph=True) + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx)] + ) + assert rc == 0 + err = buf.getvalue() + # The raw structured line is consumed by the parser, never raw-relayed. + assert "JCIRAG_PROGRESS kind=graph" not in err + # But graph-phase progress IS rendered (non-TTY concise fallback prints a + # "graph ..." line). The synthetic event had total=130, done=10. + assert "graph" in err.lower() + + +def test_cli_increment_graph_phase_progress( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Symmetric: increment default mode parses and renders graph progress.""" + idx = tmp_path / "idx_inc_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + # init first (quiet) to populate the index dir so increment has state. + _patch_pipeline_for_graph_progress(monkeypatch, emit_graph=False) + init_rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"] + ) + assert init_rc == 0 + # Now increment in default mode with graph progress emitted. + _patch_pipeline_for_graph_progress(monkeypatch, emit_graph=True) + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["increment", "--source-root", str(corpus_root), "--index-dir", str(idx)] + ) + assert rc == 0 + err = buf.getvalue() + assert "JCIRAG_PROGRESS kind=graph" not in err + assert "graph" in err.lower() + + +def test_cli_graph_progress_absent_when_quiet( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """--quiet suppresses all progress stderr; no graph rendering occurs.""" + idx = tmp_path / "idx_quiet_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + _patch_pipeline_for_graph_progress(monkeypatch, emit_graph=True) + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"] + ) + assert rc == 0 + err = buf.getvalue() + assert "JCIRAG_PROGRESS kind=graph" not in err + # In quiet mode there is no header/footer framing either. + assert "java-codebase-rag init" not in err + diff --git a/tests/test_progress.py b/tests/test_progress.py index ba25e83..172a4ee 100644 --- a/tests/test_progress.py +++ b/tests/test_progress.py @@ -311,3 +311,104 @@ def test_non_tty_fallback_emits_concise_lines() -> None: assert "42.1" in final finally: r.stop() + + +# --------------------------------------------------------------------------- +# PR-2: drain-thread safety (apply/stop atomicity) + heartbeat suppression +# --------------------------------------------------------------------------- + + +def test_renderer_apply_is_noop_after_stop() -> None: + """apply() after stop() must not raise and must not mutate task state. + + Once the drain thread is wired (PR-2), a progress event can arrive just as + the main thread calls stop(); apply() must be a safe no-op in that window. + """ + console = Console(file=io.StringIO(), force_terminal=False, width=80) + r = IndexProgressRenderer(["graph"], console=console) + assert r._fallback is True # non-TTY path + r.start() + # A normal event before stop sets state. + r.apply( + ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=10, total=100, status="running", elapsed_s=None + ) + ) + r.stop() + # Feeding events after stop must not raise and must not change state. + for _ in range(5): + r.apply( + ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=50, total=100, status="running", elapsed_s=None + ) + ) + # No exception → pass. The carry-forward total stays at the last-seen value. + assert r._last_total["graph"] == 100 + + +def test_renderer_apply_stop_stress_does_not_crash() -> None: + """Concurrent apply()/stop() from two threads must not raise (drain-thread model).""" + import threading + + console = Console(file=io.StringIO(), force_terminal=False, width=80) + r = IndexProgressRenderer(["graph"], console=console) + stop_flag = threading.Event() + errors: list[BaseException] = [] + + def feeder() -> None: + i = 0 + while not stop_flag.is_set(): + try: + r.apply( + ProgressEvent( + kind="graph", + phase=None, + pass_="1/6", + done=i, + total=1000, + status="running", + elapsed_s=None, + ) + ) + except BaseException as exc: # noqa: BLE001 + errors.append(exc) + i += 1 + + t = threading.Thread(target=feeder, daemon=True) + t.start() + try: + for _ in range(20): + r.start() + stop_flag.clear() + # let the feeder run briefly + for _ in range(100): + pass + r.stop() + finally: + stop_flag.set() + t.join(timeout=2.0) + assert not errors, f"concurrent apply/stop raised: {errors!r}" + + +def test_progress_relay_suppresses_graph_heartbeat_in_live_mode() -> None: + """In default (Live) mode the builder's `[graph] pass N` heartbeat must NOT + leak as a raw line above the Live region — the renderer's bar subsumes it. + The heartbeat only reappears in --verbose raw-relay mode (no Live region). + """ + # Live region active (renderer attached) → heartbeat is noise-suppressed. + stub = _StubRenderer() + buf = io.StringIO() + relay = ProgressRelay( + renderer=stub, console=Console(file=buf, force_terminal=False, force_interactive=False) + ) + relay.feed(b"[graph] pass 1 \xc2\xb7 5s elapsed\n") + relay.feed(b"JCIRAG_PROGRESS kind=graph pass=1/6 done=10 total=100\n") + relay.flush() + # The structured progress line was routed to the renderer (1 apply). + assert len(stub.applied) == 1 + assert stub.applied[0].kind == "graph" + # The heartbeat was not echoed as a raw line above the Live region. The + # renderer's bar subsumes it; assert the heartbeat content is entirely + # absent (not just the ``[graph]`` markup tag, which rich would strip). + assert "5s elapsed" not in buf.getvalue() + From 418413ff47fe3623be408ed7cc3f47afb8115768 Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Mon, 15 Jun 2026 02:01:14 +0300 Subject: [PATCH 2/2] harden PR-2: survive renderer exceptions, exact incremental total, relay factory - ProgressRelay guards renderer.apply() so a render-chain exception can't silently kill the drain thread (try/except + stderr note) - incremental pass-1 total excludes removed files (done no longer undercounts then clamps) - make_relay() factory centralizes the sync+async drain wiring - drop unreachable console fallback in ProgressRelay._route_line erase unused-renderer (review Minor #3) and failing-builder tests (Minor #6) deferred. Co-Authored-By: Claude --- build_ast_graph.py | 26 +++++++++++++--- java_codebase_rag/cli_progress.py | 8 ++--- java_codebase_rag/pipeline.py | 8 ++--- java_codebase_rag/progress.py | 44 ++++++++++++++++++++++++--- tests/test_ast_graph_build.py | 36 +++++++++++++++++++++++ tests/test_progress.py | 49 +++++++++++++++++++++++++++++++ 6 files changed, 152 insertions(+), 19 deletions(-) diff --git a/build_ast_graph.py b/build_ast_graph.py index 4f10b56..6b2c377 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -885,7 +885,14 @@ def _register_type( return entry -def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: set[str] | None = None) -> dict[str, JavaFileAst]: +def pass1_parse( + root: Path, + tables: GraphTables, + *, + verbose: bool, + scope_files: set[str] | None = None, + removed_files: set[str] | None = None, +) -> dict[str, JavaFileAst]: """Walk files, parse them, populate node indexes. Returns path -> AST. Args: @@ -893,6 +900,11 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: tables: GraphTables to populate. verbose: Whether to emit progress output. scope_files: Optional set of relative POSIX paths to parse. If None, parse all files. + removed_files: Optional set of relative POSIX paths that no longer exist + on disk (incremental deletions). These are members of ``scope_files`` + (they were deleted, so they participate in scoped deletion) but are + never visited by the parse walk, so they must be excluded from the + pass-1 total to keep ``done`` from undercounting then two-way-clamping. """ asts: dict[str, JavaFileAst] = {} ignore = LayeredIgnore(root) @@ -903,10 +915,14 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: # Count-first: one filtered walk (no parsing) to set the EXACT total before # the parse loop ticks. Single-layer ignore → the count is exact, so the # rendered bar is determinate. For a scoped (incremental) parse the total is - # the scope size; for a full rebuild it is the non-ignored .java count. + # the number of files that will actually be visited: scope minus any removed + # files (which are members of scope for deletion but gone from disk, so the + # parse walk never ticks them); for a full rebuild it is the non-ignored + # .java count. if verbose: if scope_files is not None: - pass1_total = len(scope_files) + removed = removed_files if removed_files is not None else set() + pass1_total = len(scope_files - removed) else: pass1_total = sum(1 for _ in iter_java_source_files(root, ignore=ignore)) _emit_graph_progress( @@ -3656,7 +3672,9 @@ def incremental_rebuild( _verbose_stderr_line("[increment] rebuilding scoped files (passes 1-4)") tables = GraphTables() - asts = pass1_parse(source_root, tables, verbose=verbose, scope_files=scope_files) + asts = pass1_parse( + source_root, tables, verbose=verbose, scope_files=scope_files, removed_files=removed + ) # Load existing types and members for cross-file resolution (only from unchanged files) _load_existing_types(conn, tables, exclude_files=scope_files) diff --git a/java_codebase_rag/cli_progress.py b/java_codebase_rag/cli_progress.py index 9b31863..0419252 100644 --- a/java_codebase_rag/cli_progress.py +++ b/java_codebase_rag/cli_progress.py @@ -6,7 +6,7 @@ from typing import Callable from java_codebase_rag.cli_format import bold_cyan, is_noise_line, styled_check, styled_cross -from java_codebase_rag.progress import CallbackRenderer, ProgressEvent, ProgressRelay +from java_codebase_rag.progress import ProgressEvent, make_relay def emit_vectors_start() -> None: @@ -80,11 +80,7 @@ async def accumulate_and_relay_subprocess_streams( out_buf = bytearray() err_buf = bytearray() if on_progress is not None: - filt = ProgressRelay( - CallbackRenderer(on_progress, on_progress_console), - console=on_progress_console, - verbose=verbose, - ) + filt = make_relay(on_progress, console=on_progress_console, verbose=verbose) elif relay and not verbose: filt = _AsyncLineFilter() else: diff --git a/java_codebase_rag/pipeline.py b/java_codebase_rag/pipeline.py index 8e668dd..d5eb7a0 100644 --- a/java_codebase_rag/pipeline.py +++ b/java_codebase_rag/pipeline.py @@ -14,7 +14,7 @@ from java_codebase_rag.cli_format import Spinner, is_noise_line, stderr_is_tty from java_codebase_rag.cli_progress import emit_vectors_finish, emit_vectors_start from java_codebase_rag.config import cocoindex_subprocess_env_defaults -from java_codebase_rag.progress import CallbackRenderer, ProgressEvent, ProgressRelay +from java_codebase_rag.progress import ProgressEvent, ProgressRelay, make_relay COCOINDEX_TARGET = "java_index_flow_lancedb.py:JavaCodeIndexLance" @@ -82,10 +82,8 @@ def _popen_capturing_stderr( out_buf = bytearray() err_buf = bytearray() if on_progress is not None: - relay = ProgressRelay( - CallbackRenderer(on_progress, on_progress_console), - console=on_progress_console, - verbose=verbose, + relay = make_relay( + on_progress, console=on_progress_console, verbose=verbose ) filt: _LineFilter | ProgressRelay | None = relay else: diff --git a/java_codebase_rag/progress.py b/java_codebase_rag/progress.py index baa9725..62c8a45 100644 --- a/java_codebase_rag/progress.py +++ b/java_codebase_rag/progress.py @@ -33,7 +33,7 @@ import sys import time from dataclasses import dataclass -from typing import Literal +from typing import Callable, Literal from rich.console import Console from rich.live import Live @@ -56,6 +56,7 @@ "IndexProgressRenderer", "ProgressRelay", "CallbackRenderer", + "make_relay", ] ProgressKind = Literal["vectors", "graph", "optimize"] @@ -410,6 +411,28 @@ def stop(self) -> None: pass +def make_relay( + on_progress: Callable[[ProgressEvent], None], + *, + console: object | None, + verbose: bool, +) -> ProgressRelay: + """Build the standard drain-side :class:`ProgressRelay`. + + Both subprocess drains (``pipeline._popen_capturing_stderr`` sync path and + ``cli_progress.accumulate_and_relay_subprocess_streams`` async path) wire a + ``CallbackRenderer`` that forwards parsed events to the command-level + ``on_progress`` callback, and route non-progress lines through the same + caller-supplied ``console``. Centralizing the construction here keeps the + sync and async wiring identical (PR-3 forks the async path further). + """ + return ProgressRelay( + CallbackRenderer(on_progress, console), + console=console, + verbose=verbose, + ) + + # --------------------------------------------------------------------------- # Relay # --------------------------------------------------------------------------- @@ -466,7 +489,18 @@ def _route_line(self, line: bytes) -> None: # Consumed by the protocol — never echoed to any sink. It is not # noise, so it must not keep the suppression flag armed. self._suppress_next = False - self._renderer.apply(ev) + # Guard the render chain: a drain thread is a daemon, so an + # unhandled exception here dies silently and truncates the captured + # stderr, masking real bugs. Mirror the defensive ``except Exception`` + # around the raw ``buffer.write`` path below: log a one-line note to + # real stderr and keep draining. Never re-raise. + try: + self._renderer.apply(ev) + except Exception as exc: + sys.stderr.write( + f"java-codebase-rag: progress renderer error: {exc}\n" + ) + sys.stderr.flush() return if ev is not None and self._renderer is None: # Parsed as progress but no renderer attached: still reset the flag @@ -485,11 +519,13 @@ def _route_line(self, line: bytes) -> None: self._suppress_next = False text = line.decode("utf-8", errors="replace") if self._renderer is not None and self._live_active: - console = self._console if self._console is not None else self._renderer._console # noqa: SLF001 + # The drains always construct the relay with the caller's console + # (see make_relay), so ``self._console`` is authoritative here. + assert self._console is not None # invariant: drains always supply it # rich.Console over a Live region must suspend/resume to interleave # a one-off line without corrupting the bar redraw; print() handles # this correctly when the Live was started on the same console. - console.print(text, end="") + self._console.print(text, end="") return if self._verbose and self._renderer is None: try: diff --git a/tests/test_ast_graph_build.py b/tests/test_ast_graph_build.py index 57e06ea..3751fad 100644 --- a/tests/test_ast_graph_build.py +++ b/tests/test_ast_graph_build.py @@ -504,3 +504,39 @@ def test_build_ast_graph_quiet_emits_no_progress(corpus_root: Path, tmp_path: Pa ) assert proc.returncode == 0, f"stderr:\n{proc.stderr}" assert _progress_lines(proc.stderr) == [], "quiet build must not emit JCIRAG_PROGRESS" + + +def test_pass1_parse_incremental_total_excludes_removed_files(tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None: + """Incremental pass-1 total must count only files that will actually be visited. + + On an incremental run, ``scope_files`` includes removed files (they were + deleted, so they participate in scoped deletion), but they no longer exist + on disk and are therefore never visited by the parse walk. Counting them in + ``pass1_total`` makes ``done`` undercount then two-way-clamp on completion. + The fix: the total is ``len(scope_files - removed_files)``. + """ + import build_ast_graph + + root = tmp_path / "proj" + java_dir = root / "src/main/java/smoke" + java_dir.mkdir(parents=True) + (java_dir / "Real.java").write_text( + "package smoke;\nclass Real { void go() { } }\n", encoding="utf-8" + ) + tables = build_ast_graph.GraphTables() + # scope includes the real file plus a removed (gone-from-disk) file. + scope_files = {"src/main/java/smoke/Real.java", "src/main/java/smoke/Gone.java"} + removed_files = {"src/main/java/smoke/Gone.java"} + build_ast_graph.pass1_parse( + root, tables, verbose=True, scope_files=scope_files, removed_files=removed_files + ) + captured = capsys.readouterr() + pass1_totals = [ + int(m.group(1)) + for m in re.finditer(r"pass=1/6 done=0 total=(\d+)", captured.err) + ] + assert pass1_totals, f"expected a count-first pass-1 total line; stderr:\n{captured.err}" + # The removed file must NOT be counted: total is 1 (only Real.java), not 2. + assert pass1_totals[0] == 1, ( + f"incremental pass-1 total must exclude removed files; got {pass1_totals[0]}" + ) diff --git a/tests/test_progress.py b/tests/test_progress.py index 172a4ee..872504a 100644 --- a/tests/test_progress.py +++ b/tests/test_progress.py @@ -412,3 +412,52 @@ def test_progress_relay_suppresses_graph_heartbeat_in_live_mode() -> None: # absent (not just the ``[graph]`` markup tag, which rich would strip). assert "5s elapsed" not in buf.getvalue() + +# --------------------------------------------------------------------------- +# PR-2 review hardening: survive a renderer exception in the drain thread +# --------------------------------------------------------------------------- + + +class _ExplodingRenderer: + """A renderer whose apply() raises on every call.""" + + def __init__(self) -> None: + self.applied: list[ProgressEvent] = [] + + def apply(self, ev: ProgressEvent) -> None: + raise RuntimeError("boom") + + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + +def test_progress_relay_survives_renderer_apply_exception(capsys) -> None: + """A renderer exception must NOT propagate out of feed() and must not kill + the drain thread (which would silently truncate the captured stderr). + + Feed a JCIRAG_PROGRESS line (triggers the raise) followed by a normal + non-noise line; assert feed() returns cleanly and the normal line still + routes to the sink (console). The exception is noted to stderr instead. + """ + relay = ProgressRelay( + renderer=_ExplodingRenderer(), + console=Console(file=io.StringIO(), force_terminal=False, force_interactive=False), + ) + buf = io.StringIO() + # Swap in a console backed by a buffer we can inspect for the normal line. + relay._console = Console(file=buf, force_terminal=False, force_interactive=False) # noqa: SLF001 + # The progress line triggers renderer.apply() → RuntimeError("boom"). + relay.feed(b"JCIRAG_PROGRESS kind=graph pass=1/6 done=10 total=100\n") + # A normal non-noise line after the exception must still route to the sink. + relay.feed(b"cocoindex: indexing batch\n") + relay.flush() + out = buf.getvalue() + assert "cocoindex: indexing batch" in out + # The exception was noted to real stderr (capsys captures sys.stderr). + captured = capsys.readouterr() + assert "progress renderer error" in captured.err + assert "boom" in captured.err +