diff --git a/README.md b/README.md index 3707696..1f2b68a 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,8 @@ java-codebase-rag install --non-interactive --agent claude-code After `pip install --upgrade java-codebase-rag`, run `java-codebase-rag update` to refresh shipped artifacts and catch up the index (Lance + graph). +All indexing lifecycle commands (`init`, `increment`, `reprocess`, `install`, `update`) show a unified `Vectors → Optimize → Graph` progress bar on stderr during the index build (powered by `rich`); pass `--quiet` to suppress it. + ### Manual registration If you prefer manual configuration, see [`docs/JAVA-CODEBASE-RAG-CLI.md`](./docs/JAVA-CODEBASE-RAG-CLI.md) for the full CLI reference. diff --git a/build_ast_graph.py b/build_ast_graph.py index 5fda415..6b2c377 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -25,6 +25,7 @@ from __future__ import annotations import argparse +import contextlib import hashlib import json import logging @@ -84,6 +85,53 @@ def _verbose_stderr_line(content: str) -> None: print(content, file=sys.stderr, flush=True) +def _emit_graph_progress(parts: dict[str, object], *, verbose: bool) -> None: + """Emit one ``JCIRAG_PROGRESS kind=graph …`` line to stderr (gated by verbose). + + The parent process (``pipeline.run_build_ast_graph`` / + ``run_incremental_graph``) passes ``--verbose`` in default AND verbose modes + (only suppressed for ``--quiet``), so this structured progress surfaces in + default mode (where the parent renders it) and verbose mode (raw relay). In + ``--quiet`` the builder is never invoked with ``--verbose`` so nothing is + emitted. Field order is fixed so the parser and tests can pin substrings. + """ + if not verbose: + return + fields = ["kind=graph"] + for key in ("pass", "done", "total", "status", "elapsed_s"): + if key in parts: + fields.append(f"{key}={parts[key]}") + line = "JCIRAG_PROGRESS " + " ".join(fields) + _verbose_stderr_line(line) + + +# Pass-1 per-file tick cadence: bound stderr volume on huge trees without making +# the bar feel stale. A final tick on pass completion carries status=done. +_PASS1_TICK_EVERY = 25 + + +@contextlib.contextmanager +def _graph_pass_progress(pass_label: str, *, verbose: bool): + """Emit ``pass=N/6 status=running`` on entry and ``status=done elapsed_s=…`` + on exit for passes 2–6 (each advances the rendered bar by 1/6). + + Usage: ``with _graph_pass_progress("2/6", verbose=verbose): …`` + """ + if not verbose: + yield + return + _emit_graph_progress({"pass": pass_label, "status": "running"}, verbose=verbose) + t0 = time.time() + try: + yield + finally: + elapsed = time.time() - t0 + _emit_graph_progress( + {"pass": pass_label, "status": "done", "elapsed_s": f"{elapsed:.2f}"}, + verbose=verbose, + ) + + class _VerbosePassHeartbeats: """Emit ``[tag] running … Ns elapsed`` every 5s on stderr while in scope (verbose only).""" @@ -837,7 +885,14 @@ def _register_type( return entry -def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: set[str] | None = None) -> dict[str, JavaFileAst]: +def pass1_parse( + root: Path, + tables: GraphTables, + *, + verbose: bool, + scope_files: set[str] | None = None, + removed_files: set[str] | None = None, +) -> dict[str, JavaFileAst]: """Walk files, parse them, populate node indexes. Returns path -> AST. Args: @@ -845,6 +900,11 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: tables: GraphTables to populate. verbose: Whether to emit progress output. scope_files: Optional set of relative POSIX paths to parse. If None, parse all files. + removed_files: Optional set of relative POSIX paths that no longer exist + on disk (incremental deletions). These are members of ``scope_files`` + (they were deleted, so they participate in scoped deletion) but are + never visited by the parse walk, so they must be excluded from the + pass-1 total to keep ``done`` from undercounting then two-way-clamping. """ asts: dict[str, JavaFileAst] = {} ignore = LayeredIgnore(root) @@ -852,6 +912,23 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: n_files = 0 if verbose: _verbose_stderr_line(_PASS1_START) + # Count-first: one filtered walk (no parsing) to set the EXACT total before + # the parse loop ticks. Single-layer ignore → the count is exact, so the + # rendered bar is determinate. For a scoped (incremental) parse the total is + # the number of files that will actually be visited: scope minus any removed + # files (which are members of scope for deletion but gone from disk, so the + # parse walk never ticks them); for a full rebuild it is the non-ignored + # .java count. + if verbose: + if scope_files is not None: + removed = removed_files if removed_files is not None else set() + pass1_total = len(scope_files - removed) + else: + pass1_total = sum(1 for _ in iter_java_source_files(root, ignore=ignore)) + _emit_graph_progress( + {"pass": "1/6", "done": 0, "total": pass1_total, "status": "running"}, + verbose=verbose, + ) slow_sec = 0.0 raw_slow = os.environ.get("JAVA_CODEBASE_RAG_TEST_GRAPH_SLOW_SEC", "").strip() if raw_slow: @@ -871,6 +948,11 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: if scope_files is not None and rel not in scope_files: continue n_files += 1 + if verbose and (n_files % _PASS1_TICK_EVERY == 0): + _emit_graph_progress( + {"pass": "1/6", "done": n_files, "status": "running"}, + verbose=verbose, + ) try: content = p.read_bytes() except OSError: @@ -906,6 +988,10 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: if verbose: elapsed = time.time() - t0 + _emit_graph_progress( + {"pass": "1/6", "done": n_files, "status": "done", "elapsed_s": f"{elapsed:.2f}"}, + verbose=verbose, + ) _verbose_stderr_line( f"[graph] pass 1 · parsed {n_files} files in {elapsed:.2f}s: " f"{len(tables.types)} types, {len(tables.members)} members, " @@ -1145,7 +1231,7 @@ def pass2_edges(tables: GraphTables, asts: dict[str, JavaFileAst], *, verbose: b seen_inj: set[tuple[str, str, str, str]] = set() if verbose: _verbose_stderr_line(_PASS2_START) - with _VerbosePassHeartbeats("[graph] pass 2", verbose=verbose): + with _graph_pass_progress("2/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 2", verbose=verbose): for fqn, entry in tables.types.items(): ast = asts.get(entry.file_path) if ast is None: @@ -1818,7 +1904,7 @@ def pass3_calls(tables: GraphTables, asts: dict[str, JavaFileAst], *, verbose: b _verbose_stderr_line(_PASS3_START) _build_member_indexes(tables) stats = CallResolutionStats() - with _VerbosePassHeartbeats("[graph] pass 3", verbose=verbose): + with _graph_pass_progress("3/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 3", verbose=verbose): for rel_path, file_ast in asts.items(): try: _process_file_calls(file_ast, rel_path, tables, stats) @@ -1972,7 +2058,7 @@ def pass4_routes( meta_chain = collect_annotation_meta_chain(prs) if verbose: _verbose_stderr_line(_PASS4_START) - with _VerbosePassHeartbeats("[graph] pass 4", verbose=verbose): + with _graph_pass_progress("4/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 4", verbose=verbose): for ast in asts.values(): stats.routes_skipped_unresolved += ast.routes_skipped_unresolved @@ -2149,7 +2235,7 @@ def _phantom_async_route_id(call: OutgoingCallDecl) -> str: if verbose: _verbose_stderr_line(_PASS5_START) - with _VerbosePassHeartbeats("[graph] pass 5", verbose=verbose): + with _graph_pass_progress("5/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 5", verbose=verbose): for member in sorted(tables.members, key=lambda x: x.node_id): if member.decl.is_constructor: continue @@ -2551,7 +2637,7 @@ def _micro_factor(member: MemberEntry | None) -> float: if verbose: _verbose_stderr_line(_PASS6_START) - with _VerbosePassHeartbeats("[graph] pass 6", verbose=verbose): + with _graph_pass_progress("6/6", verbose=verbose), _VerbosePassHeartbeats("[graph] pass 6", verbose=verbose): for row in tables.http_call_rows: if row.match != "unresolved": continue @@ -3586,7 +3672,9 @@ def incremental_rebuild( _verbose_stderr_line("[increment] rebuilding scoped files (passes 1-4)") tables = GraphTables() - asts = pass1_parse(source_root, tables, verbose=verbose, scope_files=scope_files) + asts = pass1_parse( + source_root, tables, verbose=verbose, scope_files=scope_files, removed_files=removed + ) # Load existing types and members for cross-file resolution (only from unchanged files) _load_existing_types(conn, tables, exclude_files=scope_files) diff --git a/docs/JAVA-CODEBASE-RAG-CLI.md b/docs/JAVA-CODEBASE-RAG-CLI.md index c761adc..06a6dd3 100644 --- a/docs/JAVA-CODEBASE-RAG-CLI.md +++ b/docs/JAVA-CODEBASE-RAG-CLI.md @@ -43,6 +43,8 @@ java-codebase-rag install --scope user - `--agent {claude-code,qwen-code,gigacode}` — Agent host to configure (can be passed multiple times). - `--scope {project,user}` — Installation scope (default: `project`). Project scope writes to `./` in the project repo; user scope writes to `~/./` (globally available). - `--model MODEL` — Embedding model path or `auto` (default: `auto`, downloads `sentence-transformers/all-MiniLM-L6-v2` on first run). +- `--quiet` / `-q` — Suppress the indexing progress stream on stderr (wizard prompts unchanged). +- `--verbose` / `-v` — Raw-relay subprocess output during the indexing sub-step (no progress bar). **Exit codes:** - `0` — Success (all stages completed). @@ -55,7 +57,7 @@ java-codebase-rag install --scope user 3. Agent host selection — Claude Code, Qwen Code, GigaCode (multi-select). 4. Install scope — project or user. 5. MCP entrypoint resolution + artifact deployment — config, skill, agent files. -6. Index + finish — YAML generation, `.gitignore` update, `init`. +6. Index + finish — YAML generation, `.gitignore` update, `init`. Stage 6's indexing sub-step renders the unified `Vectors → Optimize → Graph` progress on **stderr** (see [Indexing progress](#indexing-progress-stderr)); the wizard's conversational stdout is unchanged. **Re-running `install`:** If `.java-codebase-rag.yml` exists, the installer shows current values and offers "Update" (pre-filled) or "Start fresh". Existing MCP entries are updated in-place (merged, not duplicated). Skill/agent files trigger overwrite confirmation. @@ -78,13 +80,14 @@ java-codebase-rag update --force **Flags:** - `--force` — Overwrite all artifacts even if content matches. - `--dry-run` — Print changes without writing files. +- `--quiet` / `-q` — Suppress the indexing progress stream on stderr (wizard stdout unchanged). +- `--verbose` / `-v` — Raw-relay subprocess output during the indexing sub-step (no progress bar). **Behavior:** - Detects previously configured agent hosts (scans both project-level and user-level config files). - Refreshes skill and agent files (versioned assets from the package). - Updates MCP entrypoint path if `java-codebase-rag-mcp` has moved. -- Runs an incremental index update (Lance + graph) if an index exists — same as `java-codebase-rag increment`. -- Skips MCP config if the entry already exists and is correct. +- Runs an incremental index update (Lance + graph) if an index exists — same as `java-codebase-rag increment`. The indexing sub-step renders the unified `Vectors → Optimize → Graph` progress on **stderr** (see [Indexing progress](#indexing-progress-stderr)); it no longer runs silently. **Exit codes:** - `0` — Success. @@ -95,7 +98,7 @@ java-codebase-rag update --force - **TTY:** human-readable `pprint` of the payload on stdout (except **successful selective `reprocess`** with `--vectors-only` / `--graph-only`, which prints `Rebuilt:` / `Skipped:` lines instead of dumping the full dict). - **Piped / non-TTY:** **single JSON object** per invocation on stdout (no trailing noise). Use this in scripts and CI. -- **Lifecycle stderr:** `init`, `increment`, `reprocess`, and `erase` stream subprocess progress (and relayed child stdout) to **stderr**; pass **`--quiet`** to suppress that stream. **stdout** stays the JSON/pprint payload only. +- **Lifecycle stderr:** `init`, `increment`, `reprocess`, `install`, `update`, and `erase` stream subprocess progress (and relayed child stdout) to **stderr**; pass **`--quiet`** to suppress that stream. **stdout** stays the JSON/pprint payload (`init`/`increment`/`reprocess`) or the wizard conversational text (`install`/`update`) only. Example: @@ -103,6 +106,37 @@ Example: java-codebase-rag meta --source-root /path/to/java/repo --index-dir /path/to/.java-codebase-rag | jq .ontology_version ``` +### Indexing progress (stderr) + +All five lifecycle commands that build the index (`init`, `increment`, `reprocess`, `install`, `update`) render the **same unified progress** on **stderr** during indexing: a header line, a three-phase list `Vectors → Optimize → Graph`, and a footer line. The phase list is the single source of truth for "what's happening right now": + +- **Vectors** — the `cocoindex update` Lance catch-up / full reprocess. +- **Optimize** — the serialized Lance table compaction that runs after a successful vectors phase. +- **Graph** — the `build_ast_graph.py` Kuzu/LadybugDB build (full or incremental). + +**Determinate vs indeterminate per command:** + +| Phase | Determinate? | +| ----- | ------------ | +| `Vectors` (full `init` / `reprocess`) | Approximately determinate — a pre-walk estimates the file count; the bar **clamps to 100% on completion** (the pre-walk overstates by ignored/empty files). | +| `Vectors` (incremental `increment` / `update`) | Indeterminate — CocoIndex's `memo=True` cache only calls the per-file function for changed files, so no denominator is known up front. A pulsing bar plus a "files touched: N" counter. | +| `Optimize` | Always indeterminate (no item count exposed by Lance compaction). | +| `Graph` (full `init` / `reprocess`) | Determinate — pass 1 does a count-first filtered walk for an exact total; passes 2–6 are six known steps. | +| `Graph` (incremental `increment` / `update`) | Determinate when it runs; falls back to a full rebuild on schema change. | + +**Flags, TTY, and failure:** + +| Mode | Behaviour | +| ---- | --------- | +| TTY (default) | `rich` `Live` region — the multi-line phase display (spinner + bar + `%` + ETA). | +| Non-TTY / CI | `rich` auto-disables; concise throttled stderr lines (~every 5 s per phase + a terminal line) so CI logs still show progress. | +| `--quiet` / `-q` | Suppresses the entire progress stream (no header, phases, or footer). The stdout payload is unchanged. | +| `--verbose` / `-v` | Bypasses parsing; relays raw subprocess output verbatim (Lance warnings, brownfield events, the raw `JCIRAG_PROGRESS` protocol lines). No `Live` region. | +| Phase failure | The failing phase renders a red `✗`; the footer carries `(exit=N)`. The `rich` `Live` region is torn down cleanly so the error stays visible. | +| Missing `cocoindex` / builder binary | The pre-spawn stub emits a `status=failed` line; no phase is left hung at `running`. | + +> **Behaviour change (this release).** `install` and `update` now emit their indexing progress on **stderr** (previously `install` printed indexing chatter to stdout, and `update` ran the whole indexing step with `quiet=True` — completely silent). The wizard conversational stdout for both commands is otherwise unchanged. `update`'s previously-ignored `--quiet` / `--verbose` flags, and `install`'s previously-ignored `--verbose` flag, are now wired through (`install` already honored `--quiet`). + ## Environment variables (summary) | Variable | Role | diff --git a/java_codebase_rag/cli.py b/java_codebase_rag/cli.py index 5fca79f..58a44b2 100644 --- a/java_codebase_rag/cli.py +++ b/java_codebase_rag/cli.py @@ -131,15 +131,41 @@ def _run_with_pipeline_progress( cfg: ResolvedOperatorConfig, *, quiet: bool, - work: Callable[[], int], + verbose: bool = False, + work: Callable[["PipelineProgress | None"], int], ) -> int: - if quiet: - return int(work()) + """Run ``work`` under the unified progress renderer (default TTY mode only). + + ``work`` receives a :class:`PipelineProgress` whose ``on_progress`` callback + should be forwarded to the graph/vectors pipeline helpers so their + ``JCIRAG_PROGRESS`` events feed the renderer. In ``--quiet`` or ``--verbose`` + mode the context is ``None`` (no Live region: quiet is silent, verbose + raw-relays subprocess output). + """ + if quiet or verbose: + return int(work(None)) + from java_codebase_rag.progress import build_index_progress_context + + # PR-3 owns all three tasks in order: Vectors → Optimize → Graph. The vectors + # task is fed by the cocoindex child's per-file ticks + approximate total + # (subprocess transport, parsed by ProgressRelay); the optimize task is fed + # in-process by lance_optimize; the graph task is fed by the build_ast_graph + # child (subprocess transport). A task only becomes visible/running once its + # first event arrives. + renderer, on_progress, console = build_index_progress_context() + progress = PipelineProgress(renderer=renderer) + progress.on_progress = on_progress + progress.console = console + _pipeline_header(subcommand, cfg) t0 = time.perf_counter() code = 0 + # start() always flips _started (the non-TTY fallback is a no-op for Live but + # still needs the flag so apply() routes to the concise-line printer). The + # TTY Live region is entered inside start() only when the console is a TTY. + renderer.start() try: - code = int(work()) + code = int(work(progress)) return code except BaseException as exc: # Keep footer aligned with process outcome (main maps unhandled Exception -> exit 2). @@ -155,9 +181,26 @@ def _run_with_pipeline_progress( code = 2 raise finally: + renderer.stop() _pipeline_footer(subcommand, t0, code) +class PipelineProgress: + """Progress context handed to ``work``: the renderer + a ready ``on_progress``. + + ``on_progress``/``console`` are wired by :func:`_run_with_pipeline_progress` + and should be forwarded to the pipeline helpers' ``on_progress`` / + ``on_progress_console`` parameters. ``console`` is the renderer's stderr + ``rich.Console`` so the subprocess drain routes non-progress lines through + ``console.print`` while the Live region is up (single-writer invariant). + """ + + def __init__(self, *, renderer: "object | None") -> None: + self.renderer = renderer + self.on_progress: "Callable | None" = None + self.console: "object | None" = None + + def _jsonable(value: Any) -> Any: if hasattr(value, "model_dump"): return value.model_dump() @@ -266,7 +309,7 @@ def _cmd_init(args: argparse.Namespace) -> int: return 2 cfg.index_dir.mkdir(parents=True, exist_ok=True) - def work() -> int: + def work(progress: "PipelineProgress | None") -> int: env = cfg.subprocess_env() verbose = bool(args.verbose) coco = run_cocoindex_update( @@ -275,6 +318,8 @@ def work() -> int: quiet=bool(args.quiet), verbose=verbose, lance_project_root=None if args.quiet else cfg.source_root, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) if coco.returncode != 0: _emit( @@ -295,6 +340,8 @@ def work() -> int: verbose=verbose, quiet=bool(args.quiet), env=env, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) if g.returncode != 0: _emit( @@ -310,7 +357,9 @@ def work() -> int: _emit({"success": True, "message": "init completed"}) return 0 - return _run_with_pipeline_progress("init", cfg, quiet=bool(args.quiet), work=work) + return _run_with_pipeline_progress( + "init", cfg, quiet=bool(args.quiet), verbose=bool(args.verbose), work=work + ) def _cmd_increment(args: argparse.Namespace) -> int: @@ -323,7 +372,7 @@ def _cmd_increment(args: argparse.Namespace) -> int: if vectors_only: _emit_increment_ladybug_warning() - def work() -> int: + def work(progress: "PipelineProgress | None") -> int: env = cfg.subprocess_env() coco = run_cocoindex_update( env, @@ -331,6 +380,8 @@ def work() -> int: quiet=bool(args.quiet), verbose=bool(args.verbose), lance_project_root=None if args.quiet else cfg.source_root, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) if coco.returncode != 0: _emit( @@ -356,6 +407,8 @@ def work() -> int: verbose=bool(args.verbose), quiet=bool(args.quiet), env=env, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) # Check if incremental fell back to full rebuild @@ -389,7 +442,9 @@ def work() -> int: _emit({"success": True, "message": "increment completed (Lance + graph updated)"}) return 0 - return _run_with_pipeline_progress("increment", cfg, quiet=bool(args.quiet), work=work) + return _run_with_pipeline_progress( + "increment", cfg, quiet=bool(args.quiet), verbose=bool(args.verbose), work=work + ) def _cmd_reprocess(args: argparse.Namespace) -> int: @@ -397,14 +452,18 @@ def _cmd_reprocess(args: argparse.Namespace) -> int: _startup_hints(cfg) cfg.apply_to_os_environ() - def work() -> int: + def work(progress: "PipelineProgress | None") -> int: env = cfg.subprocess_env() verbose = bool(args.verbose) vectors_only = bool(getattr(args, "vectors_only", False)) graph_only = bool(getattr(args, "graph_only", False)) if vectors_only: - coco = run_cocoindex_update(env, full_reprocess=True, quiet=bool(args.quiet), verbose=verbose) + coco = run_cocoindex_update( + env, full_reprocess=True, quiet=bool(args.quiet), verbose=verbose, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, + ) if _is_cocoindex_preflight_blocker(coco): payload: dict[str, Any] = { "success": False, @@ -443,6 +502,8 @@ def work() -> int: verbose=verbose, quiet=bool(args.quiet), env=env, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) if _is_graph_preflight_blocker(g): payload = { @@ -477,12 +538,21 @@ def work() -> int: import server # lazy: pulls sentence_transformers/torch/lancedb/kuzu - result = asyncio.run(server.run_refresh_pipeline(quiet=bool(args.quiet), verbose=verbose)) + result = asyncio.run( + server.run_refresh_pipeline( + quiet=bool(args.quiet), + verbose=verbose, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, + ) + ) payload = result.model_dump() _emit_reprocess_outcome(payload) return _reprocess_exit_code(payload) - return _run_with_pipeline_progress("reprocess", cfg, quiet=bool(args.quiet), work=work) + return _run_with_pipeline_progress( + "reprocess", cfg, quiet=bool(args.quiet), verbose=bool(args.verbose), work=work + ) def _cmd_install(args: argparse.Namespace) -> int: @@ -495,6 +565,7 @@ def _cmd_install(args: argparse.Namespace) -> int: model=args.model, source_root=None, # None means cwd; installer confirms interactively quiet=bool(args.quiet), + verbose=bool(args.verbose), ) @@ -504,6 +575,8 @@ def _cmd_update(args: argparse.Namespace) -> int: return run_update( force=bool(args.force), dry_run=bool(args.dry_run), + quiet=bool(args.quiet), + verbose=bool(args.verbose), ) @@ -537,7 +610,7 @@ def _cmd_erase(args: argparse.Namespace) -> int: print("Aborted.", file=sys.stderr) return 2 - def work() -> int: + def work(progress: "PipelineProgress | None") -> int: env = cfg.subprocess_env() drop = run_cocoindex_drop(env, quiet=bool(args.quiet)) if drop.returncode == 127: @@ -570,7 +643,7 @@ def work() -> int: _emit({"success": True, "message": "erase completed"}) return 0 - return _run_with_pipeline_progress("erase", cfg, quiet=bool(args.quiet), work=work) + return _run_with_pipeline_progress("erase", cfg, quiet=bool(args.quiet), verbose=bool(getattr(args, "verbose", False)), work=work) def _cmd_meta(args: argparse.Namespace) -> int: diff --git a/java_codebase_rag/cli_format.py b/java_codebase_rag/cli_format.py index 6b4f001..73c5dee 100644 --- a/java_codebase_rag/cli_format.py +++ b/java_codebase_rag/cli_format.py @@ -1,10 +1,7 @@ """TTY-aware ANSI formatting for CLI stderr progress.""" from __future__ import annotations -import itertools import sys -import threading -import time _RESET = "\033[0m" _BOLD = "\033[1m" @@ -16,8 +13,6 @@ CHECK = "✓" CROSS = "✗" -_SPINNER_FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" - _NOISE_CONTAINS: tuple[bytes, ...] = ( b"lance::", b"FutureWarning", @@ -25,6 +20,14 @@ b'"event": "brownfield-', b"unknown producer source strategy", b"unknown client source strategy", + # Builder verbose heartbeats / pass banners: in default mode the renderer's + # bar subsumes these, so they must NOT also appear as raw lines above the + # Live region. --verbose raw-relay bypasses this filter and still shows them. + b"[graph] pass ", + b"[graph] scoped write ", + b"[graph] writing ", + b"[graph] done ", + b"[increment] ", ) @@ -80,33 +83,3 @@ def styled_check() -> str: def styled_cross() -> str: return red(CROSS) if stderr_is_tty() else CROSS - - -class Spinner: - """Braille spinner that overwrites the current stderr line until stopped.""" - - def __init__(self, label: str) -> None: - self._label = label - self._stop = threading.Event() - self._thread: threading.Thread | None = None - - def start(self) -> None: - self._thread = threading.Thread(target=self._run, name="spinner", daemon=True) - self._thread.start() - - def stop(self) -> None: - self._stop.set() - if self._thread is not None: - self._thread.join(timeout=2.0) - sys.stderr.buffer.write(b"\r\x1b[2K") - sys.stderr.buffer.flush() - - def _run(self) -> None: - frames = itertools.cycle(_SPINNER_FRAMES) - t0 = time.monotonic() - while not self._stop.wait(0.3): - elapsed = time.monotonic() - t0 - frame = next(frames) - line = f"\r{frame} {self._label} · {elapsed:.0f}s" - sys.stderr.buffer.write(line.encode()) - sys.stderr.buffer.flush() diff --git a/java_codebase_rag/cli_progress.py b/java_codebase_rag/cli_progress.py index f025738..8d4e518 100644 --- a/java_codebase_rag/cli_progress.py +++ b/java_codebase_rag/cli_progress.py @@ -3,26 +3,10 @@ import asyncio import sys +from typing import Callable -from java_codebase_rag.cli_format import bold_cyan, is_noise_line, styled_check, styled_cross - - -def emit_vectors_start() -> None: - print( - bold_cyan("[vectors]") + " running · cocoindex update", - file=sys.stderr, - flush=True, - ) - - -def emit_vectors_finish(*, elapsed_s: float, exit_code: int) -> None: - marker = styled_check() if exit_code == 0 else styled_cross() - print( - f"{marker} {bold_cyan('[vectors]')} finished · {elapsed_s:.2f}s" - + (f" (exit={exit_code})" if exit_code != 0 else ""), - file=sys.stderr, - flush=True, - ) +from java_codebase_rag.cli_format import is_noise_line +from java_codebase_rag.progress import ProgressEvent, make_relay class _AsyncLineFilter: @@ -61,8 +45,15 @@ async def accumulate_and_relay_subprocess_streams( *, relay: bool, verbose: bool = True, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> tuple[bytes, bytes]: - """Read stdout and stderr until EOF; optionally copy non-noise stderr chunks to stderr.""" + """Read stdout and stderr until EOF; optionally copy non-noise stderr chunks to stderr. + + When ``on_progress`` is set, stderr is drained through a :class:`ProgressRelay` + so ``JCIRAG_PROGRESS`` lines are parsed and routed to ``on_progress`` (and + suppressed from the relay), matching the sync ``pipeline._popen_capturing_stderr``. + """ stdout = proc.stdout stderr = proc.stderr if stdout is None or stderr is None: @@ -70,7 +61,12 @@ async def accumulate_and_relay_subprocess_streams( out_buf = bytearray() err_buf = bytearray() - filt = _AsyncLineFilter() if (relay and not verbose) else None + if on_progress is not None: + filt = make_relay(on_progress, console=on_progress_console, verbose=verbose) + elif relay and not verbose: + filt = _AsyncLineFilter() + else: + filt = None async def drain_stdout(reader: asyncio.StreamReader, target: bytearray) -> None: while True: diff --git a/java_codebase_rag/installer.py b/java_codebase_rag/installer.py index 3a80e8f..fc948e8 100644 --- a/java_codebase_rag/installer.py +++ b/java_codebase_rag/installer.py @@ -14,6 +14,7 @@ import shutil import sys import tempfile +import time from dataclasses import dataclass from pathlib import Path from typing import Literal, NamedTuple @@ -806,6 +807,38 @@ def update_gitignore(cwd: Path) -> None: gitignore_path.write_text("\n".join(lines), encoding="utf-8") +def _index_progress_header(subcommand: str, source_root: Path, index_dir: Path) -> None: + """Print the stderr header framing the indexing sub-step (install/update). + + Mirrors the operator commands' ``_pipeline_header`` but lives in the + installer because the wizard's stdout framing differs. This brackets ONLY + the indexing sub-step — the wizard's prompts stay outside it on stdout. + """ + from java_codebase_rag.cli_format import bold + + print( + bold( + f"java-codebase-rag {subcommand} · source={source_root.resolve()} " + f"· index={index_dir.resolve()}" + ), + file=sys.stderr, + flush=True, + ) + + +def _index_progress_footer(subcommand: str, started: float, *, ok: bool) -> None: + """Print the stderr footer closing the indexing sub-step framing.""" + from java_codebase_rag.cli_format import bold, styled_check, styled_cross + + elapsed = time.perf_counter() - started + marker = styled_check() if ok else styled_cross() + print( + f"{marker} {bold(f'java-codebase-rag {subcommand} · finished in {elapsed:.2f}s')}", + file=sys.stderr, + flush=True, + ) + + def run_init_if_needed( source_root: Path, index_dir: Path, @@ -813,15 +846,25 @@ def run_init_if_needed( *, non_interactive: bool, quiet: bool, + verbose: bool = False, ) -> bool: """Run init if index directory has no artifacts. Return True if init was run. + The indexing sub-step (CocoIndex update + AST graph build) renders the + unified ``Vectors → Optimize → Graph`` progress on **stderr** in default + mode (same renderer the operator commands use); the wizard's conversational + stdout is untouched by this function. ``--quiet`` is silent; ``--verbose`` + raw-relays subprocess output. The indexing chatter that used to print to + stdout (``Creating index…`` / ``Index created successfully.``) now lives + on stderr framing so stdout stays the wizard payload. + Args: source_root: Source root directory index_dir: Index directory path model: Embedding model path or "auto" non_interactive: If True, suppress prompts - quiet: If True, suppress output + quiet: If True, suppress progress output + verbose: If True, raw-relay subprocess output (no Live region) Returns: True if init was run, False if skipped @@ -837,36 +880,71 @@ def run_init_if_needed( print("Index already exists. Run `java-codebase-rag reprocess` to rebuild.") return False - print("Creating index...") cfg = resolve_operator_config( source_root=source_root, cli_index_dir=None, # use default (/.java-codebase-rag) cli_embedding_model=model if model != "auto" else None, ) cfg.apply_to_os_environ() - env = cfg.subprocess_env() - # Run CocoIndex update - coco = run_cocoindex_update(env, full_reprocess=False, quiet=quiet) - if coco.returncode != 0: - print(f"Error: CocoIndex update failed with code {coco.returncode}") - return False - - # Run AST graph build - g = run_build_ast_graph( - source_root=cfg.source_root, - ladybug_path=cfg.ladybug_path, - verbose=not quiet, - quiet=quiet, - env=env, - ) - if g.returncode != 0: - print(f"Error: AST graph build failed with code {g.returncode}") - return False - - print("Index created successfully.") - return True + # Indexing sub-step: render unified progress on stderr in default mode only + # (quiet = silent; verbose = raw relay, no Live region). The renderer wraps + # just this sub-step, not the surrounding wizard. + on_progress, on_progress_console = None, None + renderer = None + if not quiet and not verbose: + from java_codebase_rag.progress import build_index_progress_context + + renderer, on_progress, on_progress_console = build_index_progress_context() + + started = time.perf_counter() + if renderer is not None: + _index_progress_header("install", cfg.source_root, cfg.index_dir) + renderer.start() + index_ok = True + try: + coco = run_cocoindex_update( + env, + full_reprocess=False, + quiet=quiet, + verbose=verbose, + on_progress=on_progress, + on_progress_console=on_progress_console, + ) + if coco.returncode != 0: + print( + f"Error: CocoIndex update failed with code {coco.returncode}", + file=sys.stderr, + ) + index_ok = False + else: + g = run_build_ast_graph( + source_root=cfg.source_root, + ladybug_path=cfg.ladybug_path, + verbose=verbose, + quiet=quiet, + env=env, + on_progress=on_progress, + on_progress_console=on_progress_console, + ) + if g.returncode != 0: + print( + f"Error: AST graph build failed with code {g.returncode}", + file=sys.stderr, + ) + index_ok = False + except BaseException: + # An exception from cocoindex/graph means the index did not succeed; + # flip the footer marker before re-raising so it renders a red cross + # (mirrors cli._run_with_pipeline_progress's BaseException handler). + index_ok = False + raise + finally: + if renderer is not None: + renderer.stop() + _index_progress_footer("install", started, ok=index_ok) + return index_ok def handle_rerun(cwd: Path, *, non_interactive: bool) -> dict | None: @@ -1201,13 +1279,25 @@ def run_update( force: bool, dry_run: bool, cwd: Path | None = None, + quiet: bool = False, + verbose: bool = False, ) -> int: """Run the update pipeline. Returns exit code. + The indexing sub-step (Lance catch-up + incremental graph) renders the + unified ``Vectors → Optimize → Graph`` progress on **stderr** in default + mode and no longer runs with ``quiet=True`` (the reason ``update`` was + silent). ``--quiet`` is silent; ``--verbose`` raw-relays subprocess output. + The wizard's host-detection / refresh / summary stdout is preserved; only + the indexing chatter that used to print to stdout moves onto the stderr + renderer framing. + Args: force: If True, overwrite all artifacts even if matching dry_run: If True, print changes without writing cwd: Current working directory (defaults to Path.cwd()) + quiet: If True, suppress progress output + verbose: If True, raw-relay subprocess output (no Live region) Returns: Exit code (0=success, 1=partial, 2=fatal) @@ -1282,30 +1372,74 @@ def run_update( # The "graph not implemented" warning belongs only on the vectors-only path # (increment --vectors-only), where the graph step is deliberately skipped. if not dry_run: - print("\nUpdating index (Lance + graph)...") cfg.apply_to_os_environ() env = cfg.subprocess_env() - coco = run_cocoindex_update(env, full_reprocess=False, quiet=True) - if coco.returncode != 0: - print(f"Error: Lance index update failed with code {coco.returncode}") - return 1 - - g = run_incremental_graph( - source_root=cfg.source_root, - ladybug_path=cfg.ladybug_path, - verbose=False, - quiet=True, - env=env, - ) - if g.returncode != 0: - # Artifacts above already refreshed; the graph catch-up is best-effort - # here. Surface a truthful, actionable message instead of leaving the - # graph silently stale or claiming the feature is unimplemented. - print( - f"\nWarning: incremental graph update failed (exit {g.returncode}). " - "Run `java-codebase-rag reprocess` for a full rebuild." + # Indexing sub-step: render unified progress on stderr in default mode + # only (quiet = silent; verbose = raw relay). No longer runs quiet=True + # — that was why `update` was silent. The renderer wraps just this + # sub-step; the wizard's summary stdout below is outside it. + on_progress, on_progress_console = None, None + renderer = None + if not quiet and not verbose: + from java_codebase_rag.progress import build_index_progress_context + + renderer, on_progress, on_progress_console = build_index_progress_context() + + started = time.perf_counter() + if renderer is not None: + _index_progress_header("update", cfg.source_root, cfg.index_dir) + renderer.start() + index_ok = True + try: + coco = run_cocoindex_update( + env, + full_reprocess=False, + quiet=quiet, + verbose=verbose, + on_progress=on_progress, + on_progress_console=on_progress_console, ) + if coco.returncode != 0: + print( + f"Error: Lance index update failed with code {coco.returncode}", + file=sys.stderr, + ) + index_ok = False + else: + g = run_incremental_graph( + source_root=cfg.source_root, + ladybug_path=cfg.ladybug_path, + verbose=verbose, + quiet=quiet, + env=env, + on_progress=on_progress, + on_progress_console=on_progress_console, + ) + if g.returncode != 0: + # The graph catch-up is best-effort: `update`'s primary job + # is refreshing shipped artifacts + vectors (cocoindex). A + # graph failure surfaces a truthful, actionable Warning on + # stderr but does NOT flip index_ok (which drives both the + # footer marker and the return code) — exit 0 with a green + # check + the Warning line carrying the graph caveat. + print( + f"\nWarning: incremental graph update failed (exit {g.returncode}). " + "Run `java-codebase-rag reprocess` for a full rebuild.", + file=sys.stderr, + ) + except BaseException: + # An exception from cocoindex/graph means the index did not succeed; + # flip the footer marker before re-raising so it renders a red cross + # (mirrors cli._run_with_pipeline_progress's BaseException handler). + index_ok = False + raise + finally: + if renderer is not None: + renderer.stop() + _index_progress_footer("update", started, ok=index_ok) + if not index_ok: + return 1 else: print("\nWould run incremental index update (Lance + graph).") @@ -1325,6 +1459,7 @@ def run_install( model: str | None, source_root: Path | None = None, quiet: bool = False, + verbose: bool = False, ) -> int: """Run the install pipeline. Returns exit code. @@ -1335,6 +1470,7 @@ def run_install( model: Model from CLI flag source_root: Source root path (defaults to cwd if None) quiet: If True, suppress output + verbose: If True, raw-relay subprocess indexing output (no Live region) Returns: Exit code (0=success, 1=partial, 2=fatal) @@ -1433,6 +1569,7 @@ def run_install( resolved_model, non_interactive=non_interactive, quiet=quiet, + verbose=verbose, ) return 0 diff --git a/java_codebase_rag/lance_optimize.py b/java_codebase_rag/lance_optimize.py index cb435dd..b16bebd 100644 --- a/java_codebase_rag/lance_optimize.py +++ b/java_codebase_rag/lance_optimize.py @@ -21,7 +21,14 @@ import asyncio import sys +import time from pathlib import Path +from typing import Callable, Literal + +# Mirrors ``ProgressStatus`` in ``progress.py``; kept local (rather than imported) +# so this module never pays the ``rich`` cost at import time — see +# ``_make_optimize_event``. +_OptimizeStatus = Literal["running", "done", "failed"] # Single source of truth for the three Lance table names created by the flow. # Keep in sync with ``search_lancedb.TABLES`` (the values there mirror these). @@ -31,6 +38,33 @@ "yamlconfigindex_yaml_config", ) + +def _make_optimize_event( + *, + status: _OptimizeStatus, + elapsed_s: float | None = None, +): + """Build a ``ProgressEvent(kind="optimize", …)`` lazily (progress is parent-side). + + ``lance_optimize`` runs in-process in the parent (called by + ``pipeline._maybe_run_serialized_optimize`` and + ``server.run_refresh_pipeline``); it routes progress to the renderer via the + in-process ``on_progress`` callback — NOT via stderr (which would corrupt + the Live region). The import is local so the flow (which imports + ``LANCE_TABLE_NAMES`` at definition time) never pays the ``rich`` cost. + """ + from java_codebase_rag.progress import ProgressEvent + + return ProgressEvent( + kind="optimize", + phase=None, + pass_=None, + done=None, + total=None, + status=status, + elapsed_s=elapsed_s, + ) + # Commit conflicts are transient; a handful of exponential-backoff retries is # enough because, post-flow, there are no concurrent writers — only successive # optimize/compaction passes within this single serialized call can still @@ -60,7 +94,12 @@ async def _list_table_names(db: object) -> set[str]: return set(await db.table_names()) -async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict[str, str]: +async def optimize_lance_tables( + index_dir: Path, + *, + quiet: bool = False, + on_progress: Callable | None = None, +) -> dict[str, str]: """Optimize all known Lance tables under *index_dir*, serially, with retry. Runs ``table.optimize()`` for each name in :data:`LANCE_TABLE_NAMES` that @@ -73,6 +112,13 @@ async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict index_dir: directory holding the Lance tables (the flow's LanceDB URI). quiet: when True, suppress the per-table success/skip info lines on stderr (errors are always logged). + on_progress: optional in-process progress callback (the parent's + renderer ``on_progress``). When given, emits + ``ProgressEvent(kind="optimize", status="running")`` on entry and a + terminal ``status="done"``/``"failed"`` event on exit (covers BOTH + call sites: ``pipeline._maybe_run_serialized_optimize`` and + ``server.run_refresh_pipeline``). In-process only — NEVER prints to + stderr (that would corrupt the Live region). Returns: Mapping of table name → status. Values are ``"ok"``, ``"skipped"`` @@ -82,67 +128,95 @@ async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict # not pay the lancedb import cost at flow-definition time. import lancedb + if on_progress is not None: + on_progress(_make_optimize_event(status="running")) + t0 = time.perf_counter() results: dict[str, str] = {} - db = await lancedb.connect_async(str(index_dir)) + failed = False try: + db = await lancedb.connect_async(str(index_dir)) try: - existing = await _list_table_names(db) - except Exception as exc: - print( - f"java-codebase-rag: optimize: failed to list tables in " - f"{index_dir}: {exc}", - file=sys.stderr, - ) - return {name: f"error: list failed: {exc}" for name in LANCE_TABLE_NAMES} - - for name in LANCE_TABLE_NAMES: - if name not in existing: - results[name] = "skipped" - if not quiet: - print( - f"java-codebase-rag: optimize: {name} absent, skipped", - file=sys.stderr, - ) - continue try: - table = await db.open_table(name) + existing = await _list_table_names(db) except Exception as exc: - results[name] = f"error: open failed: {exc}" print( - f"java-codebase-rag: optimize: {name} open failed: {exc}", + f"java-codebase-rag: optimize: failed to list tables in " + f"{index_dir}: {exc}", file=sys.stderr, ) - continue - - last_exc: BaseException | None = None - for attempt in range(_MAX_ATTEMPTS): + failed = True + return {name: f"error: list failed: {exc}" for name in LANCE_TABLE_NAMES} + + for name in LANCE_TABLE_NAMES: + if name not in existing: + results[name] = "skipped" + if not quiet: + print( + f"java-codebase-rag: optimize: {name} absent, skipped", + file=sys.stderr, + ) + continue try: - await table.optimize() - last_exc = None - break + table = await db.open_table(name) except Exception as exc: - last_exc = exc - if _is_retryable(exc) and attempt < _MAX_ATTEMPTS - 1: - await asyncio.sleep(_BASE_BACKOFF_S * (2**attempt)) - continue - # Non-retryable, or retries exhausted: stop the loop and - # surface below — do not swallow silently. - break - - if last_exc is None: - results[name] = "ok" - if not quiet: + results[name] = f"error: open failed: {exc}" + failed = True print( - f"java-codebase-rag: optimize: {name} ok", + f"java-codebase-rag: optimize: {name} open failed: {exc}", file=sys.stderr, ) - else: - results[name] = f"error: {last_exc}" - print( - f"java-codebase-rag: optimize: {name} failed: {last_exc}", - file=sys.stderr, - ) + continue + + last_exc: BaseException | None = None + for attempt in range(_MAX_ATTEMPTS): + try: + await table.optimize() + last_exc = None + break + except Exception as exc: + last_exc = exc + if _is_retryable(exc) and attempt < _MAX_ATTEMPTS - 1: + await asyncio.sleep(_BASE_BACKOFF_S * (2**attempt)) + continue + # Non-retryable, or retries exhausted: stop the loop and + # surface below — do not swallow silently. + break + + if last_exc is None: + results[name] = "ok" + if not quiet: + print( + f"java-codebase-rag: optimize: {name} ok", + file=sys.stderr, + ) + else: + results[name] = f"error: {last_exc}" + failed = True + print( + f"java-codebase-rag: optimize: {name} failed: {last_exc}", + file=sys.stderr, + ) + finally: + # ``AsyncConnection.close`` is a *sync* method in lancedb 0.30.x. + db.close() + return results + except Exception: + # An unexpected exception (e.g. ``connect_async`` raised, or a table- + # independent failure) must still flip the terminal event to failed so + # the renderer's task doesn't render a green check on a crash. Re-raise + # after marking — the caller (``_maybe_run_serialized_optimize`` / + # ``run_refresh_pipeline``) treats optimize failure as non-fatal and + # logs it, but the renderer must reflect the truth. + failed = True + raise finally: - # ``AsyncConnection.close`` is a *sync* method in lancedb 0.30.x. - db.close() - return results + # Always emit a terminal optimize event so the renderer's task never + # hangs at "running" — even on exception (the parent treats a failed + # optimize as non-fatal: the index is still searchable un-compacted). + if on_progress is not None: + on_progress( + _make_optimize_event( + status="failed" if failed else "done", + elapsed_s=time.perf_counter() - t0, + ) + ) diff --git a/java_codebase_rag/pipeline.py b/java_codebase_rag/pipeline.py index 1caa7e5..fe6376a 100644 --- a/java_codebase_rag/pipeline.py +++ b/java_codebase_rag/pipeline.py @@ -9,10 +9,11 @@ import threading import time from pathlib import Path +from typing import Callable -from java_codebase_rag.cli_format import Spinner, is_noise_line, stderr_is_tty -from java_codebase_rag.cli_progress import emit_vectors_finish, emit_vectors_start +from java_codebase_rag.cli_format import is_noise_line from java_codebase_rag.config import cocoindex_subprocess_env_defaults +from java_codebase_rag.progress import ProgressEvent, ProgressRelay, make_relay COCOINDEX_TARGET = "java_index_flow_lancedb.py:JavaCodeIndexLance" @@ -66,11 +67,26 @@ def _popen_capturing_stderr( proc: subprocess.Popen[bytes], *, verbose: bool = True, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> tuple[str, str, int]: - """Capture stdout/stderr; relay stderr through noise filter (or verbatim in verbose mode).""" + """Capture stdout/stderr; relay stderr through noise filter (or verbatim in verbose mode). + + When ``on_progress`` is set, stderr is drained through a :class:`ProgressRelay` + instead of the bare ``_LineFilter``: progress lines are parsed first, routed to + ``on_progress``, and suppressed from the relay; non-progress lines follow the + relay's routing (``console.print`` while a Live region is up via + ``on_progress_console``, raw ``buffer.write`` in verbose mode). + """ out_buf = bytearray() err_buf = bytearray() - filt = _LineFilter() if not verbose else None + if on_progress is not None: + relay = make_relay( + on_progress, console=on_progress_console, verbose=verbose + ) + filt: _LineFilter | ProgressRelay | None = relay + else: + filt = _LineFilter() if not verbose else None def drain_out() -> None: assert proc.stdout is not None @@ -112,6 +128,8 @@ def run_cocoindex_update( quiet: bool, verbose: bool = True, lance_project_root: Path | None = None, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> subprocess.CompletedProcess[str]: result = _run_cocoindex_update_impl( env, @@ -119,19 +137,25 @@ def run_cocoindex_update( quiet=quiet, verbose=verbose, lance_project_root=lance_project_root, + on_progress=on_progress, + on_progress_console=on_progress_console, ) # After cocoindex returns exit 0 there are no concurrent writers, so this # is the safe window to compact the Lance tables. The flow disabled its # in-flight background optimize (see java_index_flow_lancedb.py), making # this serialized pass the sole optimizer. Optimize failure does not flip # the cocoindex CompletedProcess (a successful index is still usable, just - # not compacted); the outcome is logged to stderr only. + # not compacted); the outcome is logged to stderr only. Thread the + # in-process on_progress so the optimize phase renders via the same + # renderer (the flow cannot emit it — it runs in the child). if result.returncode == 0: - _maybe_run_serialized_optimize(env, quiet=quiet) + _maybe_run_serialized_optimize(env, quiet=quiet, on_progress=on_progress) return result -def _maybe_run_serialized_optimize(env: dict[str, str], *, quiet: bool) -> None: +def _maybe_run_serialized_optimize( + env: dict[str, str], *, quiet: bool, on_progress: Callable | None = None +) -> None: """Resolve the index dir from *env* and run the serialized Lance optimize. The flow's lifespan reads ``JAVA_CODEBASE_RAG_INDEX_DIR`` (set by the CLI / @@ -150,7 +174,7 @@ def _maybe_run_serialized_optimize(env: dict[str, str], *, quiet: bool) -> None: try: from java_codebase_rag.lance_optimize import optimize_lance_tables - asyncio.run(optimize_lance_tables(Path(idx_raw), quiet=quiet)) + asyncio.run(optimize_lance_tables(Path(idx_raw), quiet=quiet, on_progress=on_progress)) except Exception as exc: # Never crash the CLI on an optimize failure — surface on stderr only. print(f"java-codebase-rag: optimize failed: {exc}", file=sys.stderr) @@ -163,9 +187,15 @@ def _run_cocoindex_update_impl( quiet: bool, verbose: bool = True, lance_project_root: Path | None = None, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> subprocess.CompletedProcess[str]: exe = cocoindex_bin() if not exe.is_file(): + # 127 pre-spawn stub: never mark the vectors task running — emit a + # terminal failed event so the renderer doesn't leave it hung. + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return subprocess.CompletedProcess( args=[str(exe)], returncode=127, @@ -175,6 +205,8 @@ def _run_cocoindex_update_impl( bd = bundle_dir() flow = bd / "java_index_flow_lancedb.py" if not flow.is_file(): + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return subprocess.CompletedProcess( args=[], returncode=126, @@ -201,17 +233,10 @@ def _run_cocoindex_update_impl( text=True, ) - emit_progress = lance_project_root is not None - use_spinner = emit_progress and stderr_is_tty() - if emit_progress and not use_spinner: - emit_vectors_start() - spinner: Spinner | None = None - if use_spinner: - spinner = Spinner("[vectors] running · cocoindex update") - spinner.start() t0 = time.perf_counter() code = -1 out_s, err_s = "", "" + proc: subprocess.Popen[bytes] | None = None try: proc = subprocess.Popen( cmd, @@ -221,12 +246,27 @@ def _run_cocoindex_update_impl( stderr=subprocess.PIPE, bufsize=0, ) - out_s, err_s, code = _popen_capturing_stderr(proc, verbose=verbose) + # Vectors task is marked running only AFTER Popen succeeds — the flow's + # per-file ticks + approximate total stream in from the child via the + # relay (parsed by ProgressRelay, routed to on_progress). + out_s, err_s, code = _popen_capturing_stderr( + proc, verbose=verbose, on_progress=on_progress, on_progress_console=on_progress_console + ) finally: - if spinner is not None: - spinner.stop() - if emit_progress: - emit_vectors_finish(elapsed_s=time.perf_counter() - t0, exit_code=code) + # The flow cannot emit the terminal vectors event (no "all files done" + # hook in cocoindex flows), so the PARENT emits it here based on the + # cocoindex exit code. This drives clamp-on-completion + the phase + # transition to Optimize. Emitted even on a spawn failure (code stays + # -1 → failed) so the renderer's task never hangs at running. + if on_progress is not None: + elapsed = time.perf_counter() - t0 + status = "done" if code == 0 else "failed" + on_progress( + ProgressEvent( + kind="vectors", phase=None, pass_=None, done=None, total=None, + status=status, elapsed_s=elapsed, + ) + ) return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s) @@ -259,6 +299,8 @@ def run_build_ast_graph( verbose: bool, quiet: bool = False, env: dict[str, str] | None = None, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> subprocess.CompletedProcess[str]: builder = bundle_dir() / "build_ast_graph.py" if not builder.is_file(): @@ -297,7 +339,9 @@ def run_build_ast_graph( stderr=subprocess.PIPE, bufsize=0, ) - out_s, err_s, code = _popen_capturing_stderr(proc, verbose=verbose) + out_s, err_s, code = _popen_capturing_stderr( + proc, verbose=verbose, on_progress=on_progress, on_progress_console=on_progress_console + ) if not verbose: from java_codebase_rag.cli_format import bold_cyan, styled_check, styled_cross marker = styled_check() if code == 0 else styled_cross() @@ -312,6 +356,8 @@ def run_incremental_graph( verbose: bool, quiet: bool = False, env: dict[str, str] | None = None, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> subprocess.CompletedProcess[str]: """Run incremental graph rebuild by passing --incremental flag to build_ast_graph.py.""" builder = bundle_dir() / "build_ast_graph.py" @@ -352,7 +398,9 @@ def run_incremental_graph( stderr=subprocess.PIPE, bufsize=0, ) - out_s, err_s, code = _popen_capturing_stderr(proc, verbose=verbose) + out_s, err_s, code = _popen_capturing_stderr( + proc, verbose=verbose, on_progress=on_progress, on_progress_console=on_progress_console + ) if not verbose: from java_codebase_rag.cli_format import bold_cyan, styled_check, styled_cross marker = styled_check() if code == 0 else styled_cross() diff --git a/java_codebase_rag/progress.py b/java_codebase_rag/progress.py index 7527836..c07df5f 100644 --- a/java_codebase_rag/progress.py +++ b/java_codebase_rag/progress.py @@ -33,7 +33,7 @@ import sys import time from dataclasses import dataclass -from typing import Literal +from typing import Callable, Literal from rich.console import Console from rich.live import Live @@ -55,6 +55,9 @@ "parse_progress_line", "IndexProgressRenderer", "ProgressRelay", + "CallbackRenderer", + "make_relay", + "build_index_progress_context", ] ProgressKind = Literal["vectors", "graph", "optimize"] @@ -271,7 +274,16 @@ def apply(self, ev: ProgressEvent) -> None: ``status == "failed"`` halts the task and marks the description with a red ``✗`` (rich renders the spinner stopped). On non-TTY consoles this delegates to the throttled concise-line printer. + + Safe to call after :meth:`stop`: once the Live region is torn down a + drain thread may still feed a trailing event; this is a no-op then so + the apply/stop pair is atomic from the caller's view (PR-1 review + Minor #6 / cross-PR risk #10). """ + if not self._started: + # After stop() the Live region is gone and rich tasks are torn down; + # a trailing event from the drain thread must not mutate state. + return if self._fallback: self._fallback_apply(ev) return @@ -375,6 +387,85 @@ def _format_concise( return Text(kind) +class CallbackRenderer: + """Adapter exposing a ``renderer.apply(ev)`` surface to :class:`ProgressRelay`. + + ``ProgressRelay`` calls ``renderer.apply(ev)`` for each parsed progress line. + This adapter forwards to a caller-supplied ``on_progress`` callback (used by + the sync/async subprocess drains in ``pipeline`` / ``cli_progress`` to route + progress events up to the command-level renderer without owning a Live region + themselves). It carries a ``_console`` attribute so the relay can route + non-progress lines through ``console.print`` while a Live region is up. + """ + + def __init__(self, on_progress, console=None) -> None: # type: ignore[no-untyped-def] + self._on_progress = on_progress + self._console = console + + def apply(self, ev: ProgressEvent) -> None: + self._on_progress(ev) + + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + +def make_relay( + on_progress: Callable[[ProgressEvent], None], + *, + console: object | None, + verbose: bool, +) -> ProgressRelay: + """Build the standard drain-side :class:`ProgressRelay`. + + Both subprocess drains (``pipeline._popen_capturing_stderr`` sync path and + ``cli_progress.accumulate_and_relay_subprocess_streams`` async path) wire a + ``CallbackRenderer`` that forwards parsed events to the command-level + ``on_progress`` callback, and route non-progress lines through the same + caller-supplied ``console``. Centralizing the construction here keeps the + sync and async wiring identical (PR-3 forks the async path further). + """ + return ProgressRelay( + CallbackRenderer(on_progress, console), + console=console, + verbose=verbose, + ) + + +# The canonical phase order shared by every lifecycle command that renders +# progress. The operator commands (init/increment/reprocess) and the installer +# sub-steps (install/update indexing) all render this same list so the +# Vectors → Optimize → Graph shape is uniform across the CLI. +_INDEX_PHASES = ["vectors", "optimize", "graph"] + + +def build_index_progress_context( + phases: list[str] | None = None, +) -> tuple["IndexProgressRenderer", Callable[[ProgressEvent], None], "Console"]: + """Construct the shared ``(renderer, on_progress, console)`` triple. + + Both ``cli._run_with_pipeline_progress`` (operator commands, default TTY + mode) and the installer's indexing sub-step (``installer.run_init_if_needed`` + / ``run_update``) use this so the phase list, the callback wiring, and the + single-writer console are defined in exactly one place. The returned + ``on_progress`` forwards each event to ``renderer.apply``; ``console`` is + the renderer's stderr ``rich.Console`` so the subprocess drain routes + non-progress lines through ``console.print`` while a Live region is up. + + The caller owns ``renderer.start()``/``stop()`` lifecycle. In ``--quiet`` + or ``--verbose`` mode the caller simply does not call this helper (quiet + is silent; verbose raw-relays). + """ + renderer = IndexProgressRenderer(phases if phases is not None else _INDEX_PHASES) + + def on_progress(ev: ProgressEvent) -> None: + renderer.apply(ev) + + return renderer, on_progress, renderer._console # noqa: SLF001 — shared console for the drain + + # --------------------------------------------------------------------------- # Relay # --------------------------------------------------------------------------- @@ -431,7 +522,18 @@ def _route_line(self, line: bytes) -> None: # Consumed by the protocol — never echoed to any sink. It is not # noise, so it must not keep the suppression flag armed. self._suppress_next = False - self._renderer.apply(ev) + # Guard the render chain: a drain thread is a daemon, so an + # unhandled exception here dies silently and truncates the captured + # stderr, masking real bugs. Mirror the defensive ``except Exception`` + # around the raw ``buffer.write`` path below: log a one-line note to + # real stderr and keep draining. Never re-raise. + try: + self._renderer.apply(ev) + except Exception as exc: + sys.stderr.write( + f"java-codebase-rag: progress renderer error: {exc}\n" + ) + sys.stderr.flush() return if ev is not None and self._renderer is None: # Parsed as progress but no renderer attached: still reset the flag @@ -450,11 +552,13 @@ def _route_line(self, line: bytes) -> None: self._suppress_next = False text = line.decode("utf-8", errors="replace") if self._renderer is not None and self._live_active: - console = self._console if self._console is not None else self._renderer._console # noqa: SLF001 + # The drains always construct the relay with the caller's console + # (see make_relay), so ``self._console`` is authoritative here. + assert self._console is not None # invariant: drains always supply it # rich.Console over a Live region must suspend/resume to interleave # a one-off line without corrupting the bar redraw; print() handles # this correctly when the Live was started on the same console. - console.print(text, end="") + self._console.print(text, end="") return if self._verbose and self._renderer is None: try: diff --git a/java_index_flow_lancedb.py b/java_index_flow_lancedb.py index 6213616..10425a2 100644 --- a/java_index_flow_lancedb.py +++ b/java_index_flow_lancedb.py @@ -18,10 +18,13 @@ import inspect import os +import sys +import threading import uuid from collections.abc import AsyncIterator from contextlib import asynccontextmanager from dataclasses import dataclass +from fnmatch import fnmatch from pathlib import Path from typing import Annotated, Any @@ -84,6 +87,138 @@ _NUM_TXN_BEFORE_OPTIMIZE = 10**12 +# --- Vectors-phase progress emission (JCIRAG_PROGRESS kind=vectors) ----------- +# +# The flow runs in a CHILD cocoindex process; it prints structured progress to +# its stderr and the parent (pipeline._popen_capturing_stderr / +# cli_progress.accumulate_and_relay_subprocess_streams) parses it via +# ProgressRelay and feeds the renderer. The flow CANNOT know when all files are +# done (cocoindex offers no "all files done" hook in the flow), so it emits: +# - ONE ``total=N status=running`` line from ``app_main`` (approximate +# pre-walk: matcher includes + LayeredIgnore), and +# - per-file ``done=k status=running`` ticks (throttled every ~25 files) from +# ``process_*_file`` (shared atomic counter). +# The PARENT emits the terminal ``status=done``/``failed`` vectors event on +# cocoindex exit (drives clamp-on-completion + phase transition to Optimize). + +# Per-file tick cadence: bound stderr volume on huge trees without making the +# bar feel stale. Every 25th file (and the modulo boundary is enough — the +# parent clamps to total on the terminal event anyway). +_VECTORS_TICK_EVERY = 25 + +# Thread-safe counter: cocoindex may call process_*_file concurrently +# (mount_each parallelism is implementation-defined). A module-level lock guards +# both the counter and the emission so two threads never interleave a tick. +_vectors_done_lock = threading.Lock() +_vectors_done_count = 0 + + +def _emit_vectors_progress( + *, + done: int | None = None, + total: int | None = None, + status: str = "running", + elapsed_s: float | None = None, +) -> None: + """Emit one ``JCIRAG_PROGRESS kind=vectors …`` line to stderr (flushed). + + Field order is fixed (kind, done, total, status, elapsed_s) so the parser + and tests can pin substrings. Omitted fields are simply absent. + """ + fields = ["kind=vectors"] + if done is not None: + fields.append(f"done={done}") + if total is not None: + fields.append(f"total={total}") + fields.append(f"status={status}") + if elapsed_s is not None: + fields.append(f"elapsed_s={elapsed_s:.2f}") + print("JCIRAG_PROGRESS " + " ".join(fields), file=sys.stderr, flush=True) + + +def _tick_vectors_done() -> None: + """Increment the shared per-file counter and emit a throttled ``done=k`` tick. + + Called once per successfully-processed file (after the ignore / empty + early-returns). The tick is emitted every ``_VECTORS_TICK_EVERY`` files so + stderr volume stays bounded on huge trees; the parent clamps to total on + the terminal event, so the exact tick cadence is not load-bearing. + """ + global _vectors_done_count + with _vectors_done_lock: + _vectors_done_count += 1 + n = _vectors_done_count + if n % _VECTORS_TICK_EVERY != 0: + return + # Emit under the lock: the docstring above promises the lock guards both + # the counter AND the emission, so two concurrent ticks can't emit their + # ``done=N`` lines out of order. Contention is negligible (fires every + # ~25 files). + _emit_vectors_progress(done=n, status="running") + + +def _approximate_vectors_total(project_root: Path) -> int: + """Reproduce the matchers' include globs + LayeredIgnore for an approximate total. + + The flow applies two filtering layers: (1) ``PatternFilePathMatcher`` + excludes at walk time via ``LayeredIgnore.cocoindex_excluded_patterns()``, + then (2) ``LayeredIgnore.is_ignored()`` plus an early-return for empty / + undecodable files inside each ``process_*_file``. Files that early-return + never tick, so this pre-walk OVERSTATES the total by the ignored / empty + count. The parent clamps the bar to 100% on the terminal ``status=done`` + event, so the over-count cannot stall the bar. + + Mirrors the three ``localfs.walk_dir`` matchers in ``app_main``: + - ``**/*.java`` + - ``**/src/main/resources/db/migration/*.sql`` + - ``**/src/main/resources/application*.yml`` and ``.yaml`` + """ + ignore = LayeredIgnore(project_root) + excluded = ignore.cocoindex_excluded_patterns() + + def _excluded(rel_posix: str) -> bool: + return any(fnmatch(rel_posix, pat) for pat in excluded) + + total = 0 + for dirpath, dirnames, filenames in os.walk(project_root): + # Prune the same universal nuisance dirs as iter_java_source_files / + # cocoindex walk. (build-output pruning is matcher-dependent in the + # real walk; for an APPROXIMATE total this cheap prune is sufficient + # — the clamp absorbs any residual divergence.) + dirnames[:] = [ + d for d in dirnames if d not in (".git", ".hg", ".svn", "node_modules", ".venv", "venv") + ] + for fn in filenames: + full = Path(dirpath) / fn + try: + rel = full.resolve().relative_to(project_root).as_posix() + except ValueError: + continue + if _excluded(rel): + continue + # Java: **/*.java + if fn.endswith(".java"): + if not ignore.is_ignored(full)[0]: + total += 1 + continue + # SQL: **/src/main/resources/db/migration/*.sql + if fn.endswith(".sql") and "/db/migration/" in rel: + if not ignore.is_ignored(full)[0]: + total += 1 + continue + # YAML: **/src/main/resources/application*.yml / .yaml + # NOTE: ``fn`` is the bare filename (e.g. ``application-cloud.yml``), so + # the prefix predicate must be ``fn.startswith("application")`` — + # ``"/application" in fn`` was always False (no leading slash in a bare + # name) and under-counted every application YAML, driving the pre-walk + # total below the actual done count. The ``rel``-based + # ``"/src/main/resources/"`` gate stays (full path component). + if fn.endswith((".yml", ".yaml")) and fn.startswith("application") and "/src/main/resources/" in rel: + if not ignore.is_ignored(full)[0]: + total += 1 + return total + + @dataclass class JavaLanceChunk: id: str @@ -187,6 +322,8 @@ async def process_java_file( if not content.strip(): return + _tick_vectors_done() + language = detect_code_language(filename=file.file_path.path.name) or "text" cs, mn, ov = JAVA_CHUNK chunks = splitter.split( @@ -251,6 +388,8 @@ async def process_sql_file( if not content.strip(): return + _tick_vectors_done() + language = "sql" cs, mn, ov = SQL_CHUNK chunks = splitter.split( @@ -295,6 +434,8 @@ async def process_yaml_file( if not content.strip(): return + _tick_vectors_done() + ext = file.file_path.path.suffix.lower() language = "yaml" if ext in (".yml", ".yaml") else "text" cs, mn, ov = YAML_CHUNK @@ -362,6 +503,20 @@ async def app_main() -> None: project_root = coco.use_context(PROJECT_ROOT) _ignore = LayeredIgnore(project_root) _walk_excludes = _ignore.cocoindex_excluded_patterns() + # Emit ONE approximate total so the parent's renderer can show a determinate + # bar (clamps to 100% on the terminal vectors event the parent emits on + # cocoindex exit). Approximate — ignored / empty files over-state it; see + # ``_approximate_vectors_total``. ``--full-reprocess`` only: on incremental + # catch-up the @coco.fn(memo=True) cache skips unchanged files, so no total + # is knowable up front → the parent renders indeterminate from the absence. + try: + total = _approximate_vectors_total(project_root) + if total > 0: + _emit_vectors_progress(total=total, status="running") + except Exception: + # The pre-walk must never break indexing — a failure here just means + # the parent falls back to indeterminate. Swallow and continue. + pass java_files = localfs.walk_dir( PROJECT_ROOT, recursive=True, diff --git a/server.py b/server.py index c151f94..526c4aa 100644 --- a/server.py +++ b/server.py @@ -13,9 +13,8 @@ from index_common import SBERT_MODEL from java_codebase_rag.cli_progress import ( accumulate_and_relay_subprocess_streams, - emit_vectors_finish, - emit_vectors_start, ) +from java_codebase_rag.progress import ProgressEvent from java_codebase_rag._fdlimit import raise_fd_limit from java_codebase_rag.config import ( cocoindex_subprocess_env_defaults, @@ -270,10 +269,20 @@ def list_code_index_tables_payload() -> IndexInfoOutput: ) -async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> RefreshIndexOutput: +async def run_refresh_pipeline( + *, + quiet: bool = False, + verbose: bool = True, + on_progress=None, + on_progress_console: object | None = None, +) -> RefreshIndexOutput: root = _project_root() cocoindex_bin = Path(sys.executable).parent / "cocoindex" if not cocoindex_bin.is_file(): + # 127 pre-spawn: emit a terminal failed vectors event so the renderer's + # task doesn't hang at running (matches the sync pipeline path). + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return RefreshIndexOutput( success=False, message=f"cocoindex not found next to Python: {cocoindex_bin}", @@ -286,6 +295,8 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> if fallback.is_file(): flow_path = fallback else: + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return RefreshIndexOutput( success=False, message=f"java_index_flow_lancedb.py not found under {root} nor {bundle_dir}", @@ -308,13 +319,14 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> ) out_b, err_b = await proc.communicate() except Exception as exc: + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return RefreshIndexOutput( success=False, message=f"spawn failed: {exc!s}", phases_run=[], ) else: - emit_vectors_start() t0 = time.perf_counter() code_c = -1 try: @@ -329,16 +341,30 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - out_b, err_b = await accumulate_and_relay_subprocess_streams(proc, relay=True, verbose=verbose) + # The vectors task is fed by the child's per-file ticks + the + # approximate total line, parsed by the ProgressRelay inside the + # async drain and routed to on_progress. + out_b, err_b = await accumulate_and_relay_subprocess_streams( + proc, relay=True, verbose=verbose, + on_progress=on_progress, on_progress_console=on_progress_console, + ) code_c = proc.returncode if proc.returncode is not None else -1 except Exception as exc: + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return RefreshIndexOutput( success=False, message=f"spawn failed: {exc!s}", phases_run=[], ) finally: - emit_vectors_finish(elapsed_s=time.perf_counter() - t0, exit_code=code_c) + # The parent emits the terminal vectors event (the flow can't — no + # "all files done" hook). Drives clamp-on-completion + phase + # transition to Optimize. + if on_progress is not None: + elapsed = time.perf_counter() - t0 + status = "done" if code_c == 0 else "failed" + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status=status, elapsed_s=elapsed)) assert proc is not None out = out_b.decode(errors="replace") err = err_b.decode(errors="replace") @@ -366,7 +392,7 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> idx_dir = Path(idx_raw) else: idx_dir = (root / ".java-codebase-rag").resolve() - await optimize_lance_tables(idx_dir, quiet=quiet) + await optimize_lance_tables(idx_dir, quiet=quiet, on_progress=on_progress) except Exception as exc: optimize_error = f"lance optimize failed: {exc}" print(f"java-codebase-rag: {optimize_error}", file=sys.stderr) @@ -394,7 +420,10 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> if quiet: gout_b, gerr_b = await gproc.communicate() else: - gout_b, gerr_b = await accumulate_and_relay_subprocess_streams(gproc, relay=True, verbose=verbose) + gout_b, gerr_b = await accumulate_and_relay_subprocess_streams( + gproc, relay=True, verbose=verbose, + on_progress=on_progress, on_progress_console=on_progress_console, + ) graph_code = gproc.returncode graph_out = gout_b.decode(errors="replace") graph_err = gerr_b.decode(errors="replace") diff --git a/tests/test_ast_graph_build.py b/tests/test_ast_graph_build.py index b78a1ac..3751fad 100644 --- a/tests/test_ast_graph_build.py +++ b/tests/test_ast_graph_build.py @@ -404,3 +404,139 @@ def test_pass3_known_external_calls_preserved(ladybug_db_path: Path) -> None: found = [str(r[0]) for r in rows] assert found, "bank fixture should have known-external CALLS rows" assert all(s not in ("phantom", "chained_receiver") for s in found), found + + +# --------------------------------------------------------------------------- +# Graph-phase JCIRAG_PROGRESS emission (PR-2) +# --------------------------------------------------------------------------- + + +def _run_builder_verbose(corpus_root: Path, target_db: Path, *, extra_args: list[str] | None = None) -> subprocess.CompletedProcess: + """Run build_ast_graph.py --verbose and return the CompletedProcess.""" + script = Path(__file__).resolve().parent.parent / "build_ast_graph.py" + cmd = [ + sys.executable, + str(script), + "--source-root", str(corpus_root), + "--ladybug-path", str(target_db), + "--verbose", + *(extra_args or []), + ] + return subprocess.run(cmd, capture_output=True, text=True, timeout=300) + + +def _progress_lines(stderr: str) -> list[str]: + return [ln for ln in stderr.splitlines() if "JCIRAG_PROGRESS kind=graph" in ln] + + +def _count_filtered_java_files(corpus_root: Path) -> int: + from path_filtering import LayeredIgnore, iter_java_source_files + + return sum(1 for _ in iter_java_source_files(corpus_root.resolve(), ignore=LayeredIgnore(corpus_root.resolve()))) + + +def test_build_ast_graph_pass1_emits_per_file_progress(corpus_root: Path, tmp_path: Path) -> None: + """Pass 1 is count-first: a `total=` line precedes the first `done=` tick; ticks advance.""" + target = tmp_path / "p1.lbug" + proc = _run_builder_verbose(corpus_root, target) + assert proc.returncode == 0, f"stderr:\n{proc.stderr}" + lines = _progress_lines(proc.stderr) + # Pass-1 lines carry pass=1/6 with a total and per-file done ticks. + pass1 = [ln for ln in lines if "pass=1/6" in ln] + assert pass1, f"expected pass 1 progress lines, got: {lines!r}" + # The first pass-1 line with a total must precede the first line with a done tick. + totals = [ln for ln in pass1 if "total=" in ln] + dones = [ln for ln in pass1 if "done=" in ln] + assert totals, f"pass 1 must emit a count-first total; lines: {pass1!r}" + assert dones, f"pass 1 must emit per-file done ticks; lines: {pass1!r}" + first_total_idx = lines.index(totals[0]) + first_done_idx = lines.index(dones[0]) + assert first_total_idx <= first_done_idx, "total must precede the first done tick" + # Done ticks advance (monotonic non-decreasing values). + done_vals = [int(re.search(r"done=(\d+)", ln).group(1)) for ln in dones if re.search(r"done=(\d+)", ln)] + assert done_vals == sorted(done_vals), f"done ticks must be monotonic: {done_vals}" + + +def test_build_ast_graph_pass1_total_is_exact_filtered_count(corpus_root: Path, tmp_path: Path) -> None: + """The count-first pass-1 total equals the exact non-ignored .java file count.""" + target = tmp_path / "p1total.lbug" + proc = _run_builder_verbose(corpus_root, target) + assert proc.returncode == 0, f"stderr:\n{proc.stderr}" + lines = _progress_lines(proc.stderr) + pass1_total_lines = [ + ln for ln in lines if "pass=1/6" in ln and "total=" in ln and "done=" in ln + ] + # Fall back to any pass-1 line carrying a total. + if not pass1_total_lines: + pass1_total_lines = [ln for ln in lines if "pass=1/6" in ln and "total=" in ln] + assert pass1_total_lines, f"no pass-1 total line found; lines: {lines!r}" + totals = [int(re.search(r"total=(\d+)", ln).group(1)) for ln in pass1_total_lines if re.search(r"total=(\d+)", ln)] + assert totals, f"could not parse total from: {pass1_total_lines!r}" + expected = _count_filtered_java_files(corpus_root) + assert totals[0] == expected, f"pass-1 total {totals[0]} != filtered count {expected}" + + +def test_build_ast_graph_passes_2_to_6_emit_step_progress(corpus_root: Path, tmp_path: Path) -> None: + """Each of passes 2–6 emits a `pass=N/6` step line on entry/exit.""" + target = tmp_path / "p2to6.lbug" + proc = _run_builder_verbose(corpus_root, target) + assert proc.returncode == 0, f"stderr:\n{proc.stderr}" + lines = _progress_lines(proc.stderr) + for n in range(2, 7): + step_lines = [ln for ln in lines if f"pass={n}/6" in ln] + assert step_lines, f"pass {n}/6 emitted no progress lines; full: {lines!r}" + + +def test_build_ast_graph_quiet_emits_no_progress(corpus_root: Path, tmp_path: Path) -> None: + """Without --verbose the builder emits no JCIRAG_PROGRESS lines.""" + script = Path(__file__).resolve().parent.parent / "build_ast_graph.py" + target = tmp_path / "quiet.lbug" + proc = subprocess.run( + [ + sys.executable, + str(script), + "--source-root", str(corpus_root), + "--ladybug-path", str(target), + ], + capture_output=True, + text=True, + timeout=300, + ) + assert proc.returncode == 0, f"stderr:\n{proc.stderr}" + assert _progress_lines(proc.stderr) == [], "quiet build must not emit JCIRAG_PROGRESS" + + +def test_pass1_parse_incremental_total_excludes_removed_files(tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None: + """Incremental pass-1 total must count only files that will actually be visited. + + On an incremental run, ``scope_files`` includes removed files (they were + deleted, so they participate in scoped deletion), but they no longer exist + on disk and are therefore never visited by the parse walk. Counting them in + ``pass1_total`` makes ``done`` undercount then two-way-clamp on completion. + The fix: the total is ``len(scope_files - removed_files)``. + """ + import build_ast_graph + + root = tmp_path / "proj" + java_dir = root / "src/main/java/smoke" + java_dir.mkdir(parents=True) + (java_dir / "Real.java").write_text( + "package smoke;\nclass Real { void go() { } }\n", encoding="utf-8" + ) + tables = build_ast_graph.GraphTables() + # scope includes the real file plus a removed (gone-from-disk) file. + scope_files = {"src/main/java/smoke/Real.java", "src/main/java/smoke/Gone.java"} + removed_files = {"src/main/java/smoke/Gone.java"} + build_ast_graph.pass1_parse( + root, tables, verbose=True, scope_files=scope_files, removed_files=removed_files + ) + captured = capsys.readouterr() + pass1_totals = [ + int(m.group(1)) + for m in re.finditer(r"pass=1/6 done=0 total=(\d+)", captured.err) + ] + assert pass1_totals, f"expected a count-first pass-1 total line; stderr:\n{captured.err}" + # The removed file must NOT be counted: total is 1 (only Real.java), not 2. + assert pass1_totals[0] == 1, ( + f"incremental pass-1 total must exclude removed files; got {pass1_totals[0]}" + ) diff --git a/tests/test_cli_progress_stdout_invariant.py b/tests/test_cli_progress_stdout_invariant.py index 444f826..81fd670 100644 --- a/tests/test_cli_progress_stdout_invariant.py +++ b/tests/test_cli_progress_stdout_invariant.py @@ -160,7 +160,7 @@ def test_cli_lifecycle_stdout_invariant_reprocess( baseline = (_FIXTURE_DIR / "reprocess_quiet_success.stdout.txt").read_text(encoding="utf-8") - async def fake_refresh(*, quiet: bool = False, verbose: bool = True) -> RefreshIndexOutput: + async def fake_refresh(*, quiet: bool = False, verbose: bool = True, on_progress=None, on_progress_console=None) -> RefreshIndexOutput: _ = quiet _ = verbose return RefreshIndexOutput( @@ -272,7 +272,7 @@ def capture_footer(_sub: str, _t0: float, code: int) -> None: monkeypatch.setattr(cli_mod, "_pipeline_footer", capture_footer) - def boom() -> int: + def boom(_progress) -> int: raise RuntimeError("simulated handler failure") with pytest.raises(RuntimeError, match="simulated handler failure"): @@ -281,7 +281,7 @@ def boom() -> int: codes.clear() - def exit5() -> int: + def exit5(_progress) -> int: raise SystemExit(5) with pytest.raises(SystemExit) as excinfo: diff --git a/tests/test_installer.py b/tests/test_installer.py index 1e2e1df..15c3a42 100644 --- a/tests/test_installer.py +++ b/tests/test_installer.py @@ -1271,7 +1271,8 @@ def test_update_honors_yaml_source_root_for_nested_config_dir( # resolved JAVA_CODEBASE_RAG_SOURCE_ROOT / _INDEX_DIR. captured: dict = {} - def capture_coco(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None): + def capture_coco(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None, + on_progress=None, on_progress_console=None): captured["env"] = env return CompletedProcess(["cocoindex"], 0) @@ -1401,3 +1402,294 @@ def test_update_missing_mcp_binary_returns_partial_failure(self, tmp_path, monke result = run_update(force=False, dry_run=False, cwd=tmp_path) # Should return partial failure (1) because artifact refresh failed assert result == 1 + + +# --------------------------------------------------------------------------- +# PR-4 — install/update unified index progress (stderr renderer) +# --------------------------------------------------------------------------- + + +def _patch_pipeline_for_progress(monkeypatch, *, emit: bool = True) -> dict: + """Patch the three pipeline helpers the installer uses to emit progress. + + Records the ``quiet``/``verbose`` kwargs each was called with so tests can + assert the installer no longer forces ``quiet=True``. Returns the call log. + """ + import subprocess + from java_codebase_rag import pipeline as _pipeline + + calls: dict = {"coco": [], "graph": [], "incremental": []} + + def _coco(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None, + on_progress=None, on_progress_console=None): + calls["coco"].append({"quiet": quiet, "verbose": verbose}) + if emit and on_progress is not None: + from java_codebase_rag.progress import ProgressEvent + on_progress(ProgressEvent( + kind="vectors", phase=None, pass_=None, done=1, total=10, + status="running", elapsed_s=None)) + return subprocess.CompletedProcess(args=["stub"], returncode=0, stdout="", stderr="") + + def _graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, + on_progress=None, on_progress_console=None): + calls["graph"].append({"quiet": quiet, "verbose": verbose}) + if emit and on_progress is not None: + from java_codebase_rag.progress import ProgressEvent + on_progress(ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=1, total=10, + status="running", elapsed_s=None)) + return subprocess.CompletedProcess(args=["stub"], returncode=0, stdout="", stderr="") + + def _incremental(*, source_root, ladybug_path, verbose, quiet=False, env=None, + on_progress=None, on_progress_console=None): + calls["incremental"].append({"quiet": quiet, "verbose": verbose}) + if emit and on_progress is not None: + from java_codebase_rag.progress import ProgressEvent + on_progress(ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=1, total=10, + status="running", elapsed_s=None)) + return subprocess.CompletedProcess(args=["stub"], returncode=0, stdout="", stderr="") + + monkeypatch.setattr(_pipeline, "run_cocoindex_update", _coco) + monkeypatch.setattr(_pipeline, "run_build_ast_graph", _graph) + monkeypatch.setattr(_pipeline, "run_incremental_graph", _incremental) + return calls + + +class TestPR4IndexProgress: + """PR-4: install/update emit unified index progress on stderr.""" + + def _setup_repo(self, tmp_path, monkeypatch): + """Copy the bank-chat fixture and stub MCP discovery for install/update. + + Also writes a configured ``.mcp.json`` so ``update`` (which requires a + prior ``install`` per its docstring) detects a configured host and + reaches its indexing sub-step. + """ + import shutil + bank_chat = Path("tests/bank-chat-system") + if not bank_chat.is_dir(): + pytest.skip("bank-chat-system fixture not found") + shutil.copytree(bank_chat, tmp_path / "bank-chat") + cwd = tmp_path / "bank-chat" + (cwd / ".git").mkdir() + # A configured host entry — the state `update` expects post-install. + (cwd / ".mcp.json").write_text( + json.dumps( + { + "mcpServers": { + "java-codebase-rag": { + "command": "/fake/bin/java-codebase-rag-mcp", + "type": "stdio", + } + } + } + ), + encoding="utf-8", + ) + monkeypatch.setattr(shutil, "which", lambda x: "/fake/bin/java-codebase-rag-mcp") + monkeypatch.setattr( + "java_codebase_rag.installer._read_package_artifact", + lambda path: "PACKAGE CONTENT", + ) + monkeypatch.chdir(cwd) + return cwd + + def test_install_emits_indexing_progress_on_stderr(self, tmp_path, monkeypatch): + """install drives the renderer from the patched pipeline helpers; the + JCIRAG_PROGRESS event is consumed by the parser and surfaces as a + rendered progress line on stderr. Wizard stdout prompts remain on + stdout.""" + import io + import contextlib + from java_codebase_rag.installer import run_install + + cwd = self._setup_repo(tmp_path, monkeypatch) + _patch_pipeline_for_progress(monkeypatch, emit=True) + + out, err = io.StringIO(), io.StringIO() + with contextlib.redirect_stdout(out), contextlib.redirect_stderr(err): + rc = run_install( + non_interactive=True, + agents=["claude-code"], + scope="project", + model="auto", + source_root=cwd, + quiet=False, + ) + assert rc == 0 + err_text = err.getvalue() + out_text = out.getvalue() + # The raw structured protocol line is parsed, never raw-relayed. + assert "JCIRAG_PROGRESS kind=vectors" not in err_text + # But indexing progress IS rendered on stderr (non-TTY concise fallback + # prints a "vectors ..." line; the patched coco helper emitted a vectors + # event). A graph event is emitted by the patched graph helper too. + assert "vectors" in err_text.lower() + # The wizard's conversational stdout is preserved (it writes the YAML + # config path when not quiet). + assert "Configuration written" in out_text or ".java-codebase-rag.yml" in out_text + + def test_update_emits_indexing_progress_on_stderr(self, tmp_path, monkeypatch): + """update is no longer silent: the patched cocoindex + incremental + graph helpers drive the renderer, and progress surfaces on stderr.""" + import io + import contextlib + from java_codebase_rag.installer import run_update + + cwd = self._setup_repo(tmp_path, monkeypatch) + # A configured host + a real-looking index so run_update reaches indexing. + index_dir = cwd / ".java-codebase-rag" + index_dir.mkdir(exist_ok=True) + (index_dir / "code_graph.lbug").write_text("", encoding="utf-8") + + _patch_pipeline_for_progress(monkeypatch, emit=True) + monkeypatch.delenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", raising=False) + monkeypatch.delenv("JAVA_CODEBASE_RAG_INDEX_DIR", raising=False) + + out, err = io.StringIO(), io.StringIO() + with contextlib.redirect_stdout(out), contextlib.redirect_stderr(err): + rc = run_update(force=False, dry_run=False, cwd=cwd) + assert rc in (0, 1) + err_text = err.getvalue() + # Progress reached the renderer (coco + incremental both emitted). + assert "JCIRAG_PROGRESS kind=vectors" not in err_text + assert "vectors" in err_text.lower() + + def test_update_runs_indexing_without_quiet_true(self, tmp_path, monkeypatch): + """Regression: update no longer forces quiet=True on the indexing + helpers (the reason it was silent today). In the default path both + helpers are called with quiet=False.""" + from java_codebase_rag.installer import run_update + + cwd = self._setup_repo(tmp_path, monkeypatch) + index_dir = cwd / ".java-codebase-rag" + index_dir.mkdir(exist_ok=True) + (index_dir / "code_graph.lbug").write_text("", encoding="utf-8") + + calls = _patch_pipeline_for_progress(monkeypatch, emit=False) + monkeypatch.delenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", raising=False) + monkeypatch.delenv("JAVA_CODEBASE_RAG_INDEX_DIR", raising=False) + + rc = run_update(force=False, dry_run=False, cwd=cwd) + assert rc in (0, 1) + # Both indexing helpers ran and were NOT silenced. + assert calls["coco"], "run_cocoindex_update was not called" + assert calls["incremental"], "run_incremental_graph was not called" + assert calls["coco"][-1]["quiet"] is False + assert calls["incremental"][-1]["quiet"] is False + + def test_install_update_stdout_contract_preserved(self, tmp_path, monkeypatch): + """The wizard's human-readable stdout shape is unchanged: NO + JCIRAG_PROGRESS line leaks to stdout, and the indexing chatter that + used to live on stdout ("Creating index..." / "Updating index...") + no longer appears there.""" + import io + import contextlib + from java_codebase_rag.installer import run_install, run_update + + cwd = self._setup_repo(tmp_path, monkeypatch) + _patch_pipeline_for_progress(monkeypatch, emit=True) + + # --- install --- + out, err = io.StringIO(), io.StringIO() + with contextlib.redirect_stdout(out), contextlib.redirect_stderr(err): + run_install( + non_interactive=True, agents=["claude-code"], scope="project", + model="auto", source_root=cwd, quiet=False, + ) + install_out = out.getvalue() + # No structured progress line on stdout (stdout is the wizard payload). + assert "JCIRAG_PROGRESS" not in install_out + # The old stdout indexing chatter is gone (moved to stderr framing). + assert "Creating index..." not in install_out + assert "Index created successfully." not in install_out + + # --- update --- + index_dir = cwd / ".java-codebase-rag" + index_dir.mkdir(exist_ok=True) + (index_dir / "code_graph.lbug").write_text("", encoding="utf-8") + _patch_pipeline_for_progress(monkeypatch, emit=True) + monkeypatch.delenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", raising=False) + monkeypatch.delenv("JAVA_CODEBASE_RAG_INDEX_DIR", raising=False) + + out2, err2 = io.StringIO(), io.StringIO() + with contextlib.redirect_stdout(out2), contextlib.redirect_stderr(err2): + run_update(force=False, dry_run=False, cwd=cwd) + update_out = out2.getvalue() + assert "JCIRAG_PROGRESS" not in update_out + # The old stdout indexing chatter moved off stdout. + assert "Updating index (Lance + graph)..." not in update_out + + def test_update_graph_catchup_failure_is_best_effort_exit_0(self, tmp_path, monkeypatch): + """run_update's graph catch-up is best-effort: a graph-only failure must + NOT flip the exit code. Vectors (cocoindex) succeeded, so exit 0 with a + Warning on stderr carrying the graph caveat — matches the original + semantics and the output/UX-only scope of PR-4.""" + import io + import contextlib + import subprocess + from java_codebase_rag.installer import run_update + + cwd = self._setup_repo(tmp_path, monkeypatch) + index_dir = cwd / ".java-codebase-rag" + index_dir.mkdir(exist_ok=True) + (index_dir / "code_graph.lbug").write_text("", encoding="utf-8") + monkeypatch.delenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", raising=False) + monkeypatch.delenv("JAVA_CODEBASE_RAG_INDEX_DIR", raising=False) + + # Patch at the installer import site (java_codebase_rag.pipeline). + # cocoindex succeeds; the incremental graph returns a non-zero exit. + def coco_ok(env, *, full_reprocess, quiet, verbose=True, + lance_project_root=None, on_progress=None, on_progress_console=None): + return subprocess.CompletedProcess(args=["stub"], returncode=0, stdout="", stderr="") + + def graph_fail(**kwargs): + return subprocess.CompletedProcess(args=["stub"], returncode=3, stdout="", stderr="") + + monkeypatch.setattr("java_codebase_rag.pipeline.run_cocoindex_update", coco_ok) + monkeypatch.setattr("java_codebase_rag.pipeline.run_incremental_graph", graph_fail) + + out, err = io.StringIO(), io.StringIO() + with contextlib.redirect_stdout(out), contextlib.redirect_stderr(err): + rc = run_update(force=False, dry_run=False, cwd=cwd) + + assert rc == 0, f"graph-only failure must be best-effort (exit 0), got {rc}" + err_text = err.getvalue() + assert "Warning:" in err_text + assert "incremental graph update failed" in err_text + + def test_install_indexing_exception_renders_failed_footer(self, tmp_path, monkeypatch): + """If run_cocoindex_update raises during install's indexing sub-step, + the renderer bracket must render a failed (red cross) footer before the + exception propagates — not a green check right before the traceback. + Mirrors cli._run_with_pipeline_progress's BaseException handler.""" + import io + import contextlib + from java_codebase_rag import cli_format + from java_codebase_rag.installer import run_install + + cwd = self._setup_repo(tmp_path, monkeypatch) + + def boom(env, *, full_reprocess, quiet, verbose=True, + lance_project_root=None, on_progress=None, on_progress_console=None): + raise RuntimeError("boom from cocoindex") + + monkeypatch.setattr("java_codebase_rag.pipeline.run_cocoindex_update", boom) + + out, err = io.StringIO(), io.StringIO() + with contextlib.redirect_stdout(out), contextlib.redirect_stderr(err): + with pytest.raises(RuntimeError, match="boom from cocoindex"): + run_install( + non_interactive=True, + agents=["claude-code"], + scope="project", + model="auto", + source_root=cwd, + quiet=False, + ) + + err_text = err.getvalue() + # The footer rendered the failure marker (red cross), not the green check. + assert cli_format.styled_cross() in err_text + assert cli_format.styled_check() not in err_text diff --git a/tests/test_java_codebase_rag_cli.py b/tests/test_java_codebase_rag_cli.py index f2b2c29..cbac80d 100644 --- a/tests/test_java_codebase_rag_cli.py +++ b/tests/test_java_codebase_rag_cli.py @@ -579,7 +579,7 @@ def test_reprocess_graph_only_then_increment_graph_is_noop( hash_file.write_text(json.dumps(data), encoding="utf-8") # Stub cocoindex so increment exercises ONLY its graph stage. - def _noop_coco(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None): + def _noop_coco(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None, on_progress=None, on_progress_console=None): return subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="") monkeypatch.setattr(cli_mod, "run_cocoindex_update", _noop_coco) @@ -1017,7 +1017,7 @@ def test_reprocess_no_flag_cocoindex_failure_records_vectors_only( monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(tmp_path)) - async def fake_refresh(*, quiet: bool = False, verbose: bool = True) -> server_mod.RefreshIndexOutput: + async def fake_refresh(*, quiet: bool = False, verbose: bool = True, on_progress=None, on_progress_console=None) -> server_mod.RefreshIndexOutput: return server_mod.RefreshIndexOutput( success=False, exit_code=1, @@ -1230,3 +1230,227 @@ def test_console_script_entry_point_routes_through_wrapper() -> None: assert 'java-codebase-rag = "java_codebase_rag.cli:_console_script_main"' in pyproject assert 'java-codebase-rag = "java-codebase-rag:main"' not in pyproject assert 'java-codebase-rag = "java_codebase_rag.cli:main"' not in pyproject + + +# --------------------------------------------------------------------------- +# PR-2: graph-phase progress wiring (default vs --quiet) +# --------------------------------------------------------------------------- + + +def _make_stub_completed(*, returncode: int = 0, stderr: str = "") -> "subprocess.CompletedProcess[str]": + import subprocess + + return subprocess.CompletedProcess(args=["stub"], returncode=returncode, stdout="", stderr=stderr) + + +def _patch_pipeline_for_graph_progress(monkeypatch: pytest.MonkeyPatch, *, emit_graph: bool) -> None: + """Patch the cocoindex + graph pipeline helpers used by init/increment. + + When ``emit_graph`` is True the patched graph helper invokes the caller's + ``on_progress`` callback with a synthetic ``kind=graph`` event — simulating + what the real subprocess drain would feed the renderer in default mode. + """ + from java_codebase_rag import cli as _cli + from java_codebase_rag import pipeline as _pipeline + + def _fake_cocoindex_update(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None, on_progress=None, on_progress_console=None): + return _make_stub_completed(returncode=0) + + def _fake_run_build_ast_graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, on_progress=None, on_progress_console=None): + if emit_graph and on_progress is not None: + from java_codebase_rag.progress import ProgressEvent + + on_progress( + ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=10, total=130, + status="running", elapsed_s=None, + ) + ) + return _make_stub_completed(returncode=0) + + def _fake_run_incremental_graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, on_progress=None, on_progress_console=None): + if emit_graph and on_progress is not None: + from java_codebase_rag.progress import ProgressEvent + + on_progress( + ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=3, total=130, + status="running", elapsed_s=None, + ) + ) + return _make_stub_completed(returncode=0) + + # Patch where cli.py imported them (module-level names in cli). + monkeypatch.setattr(_cli, "run_cocoindex_update", _fake_cocoindex_update) + monkeypatch.setattr(_cli, "run_build_ast_graph", _fake_run_build_ast_graph) + monkeypatch.setattr(_cli, "run_incremental_graph", _fake_run_incremental_graph) + # Also patch the pipeline module attributes in case anything imports there. + monkeypatch.setattr(_pipeline, "run_cocoindex_update", _fake_cocoindex_update) + monkeypatch.setattr(_pipeline, "run_build_ast_graph", _fake_run_build_ast_graph) + monkeypatch.setattr(_pipeline, "run_incremental_graph", _fake_run_incremental_graph) + + +def test_cli_init_default_mode_graph_phase_progress_on_stderr( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """In default mode a graph-phase progress event is parsed and rendered to + stderr; the raw ``JCIRAG_PROGRESS`` line is NOT echoed verbatim.""" + idx = tmp_path / "idx_init_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + _patch_pipeline_for_graph_progress(monkeypatch, emit_graph=True) + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx)] + ) + assert rc == 0 + err = buf.getvalue() + # The raw structured line is consumed by the parser, never raw-relayed. + assert "JCIRAG_PROGRESS kind=graph" not in err + # But graph-phase progress IS rendered (non-TTY concise fallback prints a + # "graph ..." line). The synthetic event had total=130, done=10. + assert "graph" in err.lower() + + +def test_cli_increment_graph_phase_progress( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Symmetric: increment default mode parses and renders graph progress.""" + idx = tmp_path / "idx_inc_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + # init first (quiet) to populate the index dir so increment has state. + _patch_pipeline_for_graph_progress(monkeypatch, emit_graph=False) + init_rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"] + ) + assert init_rc == 0 + # Now increment in default mode with graph progress emitted. + _patch_pipeline_for_graph_progress(monkeypatch, emit_graph=True) + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["increment", "--source-root", str(corpus_root), "--index-dir", str(idx)] + ) + assert rc == 0 + err = buf.getvalue() + assert "JCIRAG_PROGRESS kind=graph" not in err + assert "graph" in err.lower() + + +def test_cli_graph_progress_absent_when_quiet( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """--quiet suppresses all progress stderr; no graph rendering occurs.""" + idx = tmp_path / "idx_quiet_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + _patch_pipeline_for_graph_progress(monkeypatch, emit_graph=True) + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"] + ) + assert rc == 0 + err = buf.getvalue() + assert "JCIRAG_PROGRESS kind=graph" not in err + # In quiet mode there is no header/footer framing either. + assert "java-codebase-rag init" not in err + + +# --------------------------------------------------------------------------- +# PR-4 — wire --quiet/--verbose through update / install +# --------------------------------------------------------------------------- + + +def test_cmd_update_forwards_quiet_flag( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """`_cmd_update --quiet` forwards quiet=True to run_update. + + Until PR-4 _cmd_update ignored both --quiet and --verbose entirely. + """ + import java_codebase_rag.installer as _installer + + captured: dict = {} + + def _fake_run_update(*, force=False, dry_run=False, cwd=None, + quiet=False, verbose=False): + captured["quiet"] = quiet + captured["verbose"] = verbose + captured["force"] = force + captured["dry_run"] = dry_run + return 0 + + monkeypatch.setattr(_installer, "run_update", _fake_run_update) + monkeypatch.chdir(tmp_path) + + rc = cli_mod.main(["update", "--quiet"]) + assert rc == 0 + assert captured["quiet"] is True + + +def test_cmd_update_forwards_verbose_flag( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """`_cmd_update --verbose` forwards verbose=True to run_update.""" + import java_codebase_rag.installer as _installer + + captured: dict = {} + + def _fake_run_update(*, force=False, dry_run=False, cwd=None, + quiet=False, verbose=False): + captured["quiet"] = quiet + captured["verbose"] = verbose + return 0 + + monkeypatch.setattr(_installer, "run_update", _fake_run_update) + monkeypatch.chdir(tmp_path) + + rc = cli_mod.main(["update", "--verbose"]) + assert rc == 0 + assert captured["verbose"] is True + # And the default path (no flag) forwards both as False. + rc2 = cli_mod.main(["update"]) + assert rc2 == 0 + assert captured["quiet"] is False + assert captured["verbose"] is False + + +def test_cmd_install_forwards_verbose_flag( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """`_cmd_install --verbose` forwards verbose=True to run_install. + + Until PR-4 _cmd_install wired only --quiet through. + """ + import java_codebase_rag.installer as _installer + + captured: dict = {} + + def _fake_run_install(*, non_interactive, agents, scope, model, + source_root=None, quiet=False, verbose=False): + captured["quiet"] = quiet + captured["verbose"] = verbose + captured["non_interactive"] = non_interactive + return 0 + + monkeypatch.setattr(_installer, "run_install", _fake_run_install) + monkeypatch.chdir(tmp_path) + + rc = cli_mod.main( + ["install", "--non-interactive", "--agent", "claude-code", "--verbose"] + ) + assert rc == 0 + assert captured["verbose"] is True + # quiet still flows through too. + rc2 = cli_mod.main( + ["install", "--non-interactive", "--agent", "claude-code", "--quiet"] + ) + assert rc2 == 0 + assert captured["quiet"] is True + diff --git a/tests/test_progress.py b/tests/test_progress.py index ba25e83..872504a 100644 --- a/tests/test_progress.py +++ b/tests/test_progress.py @@ -311,3 +311,153 @@ def test_non_tty_fallback_emits_concise_lines() -> None: assert "42.1" in final finally: r.stop() + + +# --------------------------------------------------------------------------- +# PR-2: drain-thread safety (apply/stop atomicity) + heartbeat suppression +# --------------------------------------------------------------------------- + + +def test_renderer_apply_is_noop_after_stop() -> None: + """apply() after stop() must not raise and must not mutate task state. + + Once the drain thread is wired (PR-2), a progress event can arrive just as + the main thread calls stop(); apply() must be a safe no-op in that window. + """ + console = Console(file=io.StringIO(), force_terminal=False, width=80) + r = IndexProgressRenderer(["graph"], console=console) + assert r._fallback is True # non-TTY path + r.start() + # A normal event before stop sets state. + r.apply( + ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=10, total=100, status="running", elapsed_s=None + ) + ) + r.stop() + # Feeding events after stop must not raise and must not change state. + for _ in range(5): + r.apply( + ProgressEvent( + kind="graph", phase=None, pass_="1/6", done=50, total=100, status="running", elapsed_s=None + ) + ) + # No exception → pass. The carry-forward total stays at the last-seen value. + assert r._last_total["graph"] == 100 + + +def test_renderer_apply_stop_stress_does_not_crash() -> None: + """Concurrent apply()/stop() from two threads must not raise (drain-thread model).""" + import threading + + console = Console(file=io.StringIO(), force_terminal=False, width=80) + r = IndexProgressRenderer(["graph"], console=console) + stop_flag = threading.Event() + errors: list[BaseException] = [] + + def feeder() -> None: + i = 0 + while not stop_flag.is_set(): + try: + r.apply( + ProgressEvent( + kind="graph", + phase=None, + pass_="1/6", + done=i, + total=1000, + status="running", + elapsed_s=None, + ) + ) + except BaseException as exc: # noqa: BLE001 + errors.append(exc) + i += 1 + + t = threading.Thread(target=feeder, daemon=True) + t.start() + try: + for _ in range(20): + r.start() + stop_flag.clear() + # let the feeder run briefly + for _ in range(100): + pass + r.stop() + finally: + stop_flag.set() + t.join(timeout=2.0) + assert not errors, f"concurrent apply/stop raised: {errors!r}" + + +def test_progress_relay_suppresses_graph_heartbeat_in_live_mode() -> None: + """In default (Live) mode the builder's `[graph] pass N` heartbeat must NOT + leak as a raw line above the Live region — the renderer's bar subsumes it. + The heartbeat only reappears in --verbose raw-relay mode (no Live region). + """ + # Live region active (renderer attached) → heartbeat is noise-suppressed. + stub = _StubRenderer() + buf = io.StringIO() + relay = ProgressRelay( + renderer=stub, console=Console(file=buf, force_terminal=False, force_interactive=False) + ) + relay.feed(b"[graph] pass 1 \xc2\xb7 5s elapsed\n") + relay.feed(b"JCIRAG_PROGRESS kind=graph pass=1/6 done=10 total=100\n") + relay.flush() + # The structured progress line was routed to the renderer (1 apply). + assert len(stub.applied) == 1 + assert stub.applied[0].kind == "graph" + # The heartbeat was not echoed as a raw line above the Live region. The + # renderer's bar subsumes it; assert the heartbeat content is entirely + # absent (not just the ``[graph]`` markup tag, which rich would strip). + assert "5s elapsed" not in buf.getvalue() + + +# --------------------------------------------------------------------------- +# PR-2 review hardening: survive a renderer exception in the drain thread +# --------------------------------------------------------------------------- + + +class _ExplodingRenderer: + """A renderer whose apply() raises on every call.""" + + def __init__(self) -> None: + self.applied: list[ProgressEvent] = [] + + def apply(self, ev: ProgressEvent) -> None: + raise RuntimeError("boom") + + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + +def test_progress_relay_survives_renderer_apply_exception(capsys) -> None: + """A renderer exception must NOT propagate out of feed() and must not kill + the drain thread (which would silently truncate the captured stderr). + + Feed a JCIRAG_PROGRESS line (triggers the raise) followed by a normal + non-noise line; assert feed() returns cleanly and the normal line still + routes to the sink (console). The exception is noted to stderr instead. + """ + relay = ProgressRelay( + renderer=_ExplodingRenderer(), + console=Console(file=io.StringIO(), force_terminal=False, force_interactive=False), + ) + buf = io.StringIO() + # Swap in a console backed by a buffer we can inspect for the normal line. + relay._console = Console(file=buf, force_terminal=False, force_interactive=False) # noqa: SLF001 + # The progress line triggers renderer.apply() → RuntimeError("boom"). + relay.feed(b"JCIRAG_PROGRESS kind=graph pass=1/6 done=10 total=100\n") + # A normal non-noise line after the exception must still route to the sink. + relay.feed(b"cocoindex: indexing batch\n") + relay.flush() + out = buf.getvalue() + assert "cocoindex: indexing batch" in out + # The exception was noted to real stderr (capsys captures sys.stderr). + captured = capsys.readouterr() + assert "progress renderer error" in captured.err + assert "boom" in captured.err + diff --git a/tests/test_vectors_progress.py b/tests/test_vectors_progress.py new file mode 100644 index 0000000..8c9a754 --- /dev/null +++ b/tests/test_vectors_progress.py @@ -0,0 +1,349 @@ +"""PR-3: vectors-phase + optimize-phase index progress. + +Two tests are HEAVY-gated (``JAVA_CODEBASE_RAG_RUN_HEAVY=1``) because they run +a real ``cocoindex update`` against the bank-chat-system corpus (embedding model +cached). The remaining tests are LIGHT: they exercise the renderer against a +synthetic event stream or patch the pipeline helpers so no cocoindex/torch loads. +""" +from __future__ import annotations + +import io +import os +import re +import subprocess +import sys +from pathlib import Path + +import pytest +from rich.console import Console + +from java_codebase_rag.progress import IndexProgressRenderer, ProgressEvent + +_PREFIX = "JCIRAG_PROGRESS" + +HEAVY = os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip().lower() in ("1", "true", "yes") + + +# --------------------------------------------------------------------------- +# Light: renderer clamp-on-completion + indeterminate (tests 2, 3, 4) +# --------------------------------------------------------------------------- + + +def _renderer(*, terminal: bool = True) -> IndexProgressRenderer: + """Build a renderer over a buffer; force_terminal=True exercises the Live path.""" + buf = io.StringIO() + console = Console(file=buf, force_terminal=terminal, width=120, color_system=None) + return IndexProgressRenderer(["vectors", "optimize", "graph"], console=console) + + +def test_vectors_progress_clamps_on_completion() -> None: + """total=100, done=80, then parent status=done -> completed clamps to 100.""" + r = _renderer() + r.start() + try: + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=80, total=100, status="running", elapsed_s=None)) + assert r._progress.tasks[r._task_ids["vectors"]].completed == 80 + # Parent emits the terminal event (the flow cannot — no "all files done" + # hook). done=None here; the clamp must still snap completed to total. + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=100, status="done", elapsed_s=1.0)) + task = r._progress.tasks[r._task_ids["vectors"]] + assert task.completed == 100 + assert task.finished + finally: + r.stop() + + +def test_vectors_progress_approximate_total_overstates_then_clamps() -> None: + """Approximate pre-walk overstates total (100) but done only reaches 95; the + parent's status=done still clamps to 100 (no 95% stall).""" + r = _renderer() + r.start() + try: + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=100, status="running", elapsed_s=None)) + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=95, total=100, status="running", elapsed_s=None)) + assert r._progress.tasks[r._task_ids["vectors"]].completed == 95 + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=95, total=100, status="done", elapsed_s=42.1)) + assert r._progress.tasks[r._task_ids["vectors"]].completed == 100 + assert r._progress.tasks[r._task_ids["vectors"]].finished + finally: + r.stop() + + +def test_vectors_incremental_renders_indeterminate() -> None: + """No total event (incremental catch-up) -> task stays indeterminate (total None).""" + r = _renderer() + r.start() + try: + # Only a done tick, no total — mirrors incremental catch-up where the + # memo cache skips unchanged files and no total is knowable up front. + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=3, total=None, status="running", elapsed_s=None)) + task = r._progress.tasks[r._task_ids["vectors"]] + assert task.total is None + finally: + r.stop() + + +# --------------------------------------------------------------------------- +# Light: CLI-level vectors/optimize progress (tests 6, 7) +# --------------------------------------------------------------------------- + + +def _make_stub_completed(*, returncode: int = 0, stderr: str = "") -> "subprocess.CompletedProcess[str]": + return subprocess.CompletedProcess(args=["stub"], returncode=returncode, stdout="", stderr=stderr) + + +def _patch_pipeline_for_vectors_progress(monkeypatch: pytest.MonkeyPatch, *, emit_vectors: bool) -> None: + """Patch cocoindex + graph helpers so init/increment run without heavy deps. + + When ``emit_vectors`` is True the patched cocoindex helper invokes the + caller's ``on_progress`` with a synthetic ``kind=vectors`` event — simulating + what the real subprocess drain would feed the renderer in default mode. + """ + from java_codebase_rag import cli as _cli + from java_codebase_rag import pipeline as _pipeline + + def _fake_cocoindex_update(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None, on_progress=None, on_progress_console=None): + if emit_vectors and on_progress is not None: + on_progress( + ProgressEvent(kind="vectors", phase=None, pass_=None, done=10, total=130, status="running", elapsed_s=None) + ) + return _make_stub_completed(returncode=0) + + def _fake_run_build_ast_graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, on_progress=None, on_progress_console=None): + return _make_stub_completed(returncode=0) + + def _fake_run_incremental_graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, on_progress=None, on_progress_console=None): + return _make_stub_completed(returncode=0) + + monkeypatch.setattr(_cli, "run_cocoindex_update", _fake_cocoindex_update) + monkeypatch.setattr(_cli, "run_build_ast_graph", _fake_run_build_ast_graph) + monkeypatch.setattr(_cli, "run_incremental_graph", _fake_run_incremental_graph) + monkeypatch.setattr(_pipeline, "run_cocoindex_update", _fake_cocoindex_update) + monkeypatch.setattr(_pipeline, "run_build_ast_graph", _fake_run_build_ast_graph) + monkeypatch.setattr(_pipeline, "run_incremental_graph", _fake_run_incremental_graph) + + +def test_cli_init_vectors_phase_progress_on_stderr( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """In default mode a vectors-phase progress event is parsed and rendered to + stderr; the raw ``JCIRAG_PROGRESS`` line is NOT echoed verbatim.""" + from java_codebase_rag import cli as cli_mod + + idx = tmp_path / "idx_vectors_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + _patch_pipeline_for_vectors_progress(monkeypatch, emit_vectors=True) + buf = io.StringIO() + import contextlib + + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx)] + ) + assert rc == 0 + err = buf.getvalue() + # The raw structured line is consumed by the parser, never raw-relayed. + assert "JCIRAG_PROGRESS kind=vectors" not in err + # But vectors-phase progress IS rendered (non-TTY concise fallback prints a + # "vectors ..." line). The synthetic event had done=10, total=130. + assert "vectors" in err.lower() + + +def test_cli_reprocess_optimize_phase_progress( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """The optimize phase renders when optimize_lance_tables emits kind=optimize. + + Patches server.run_refresh_pipeline to drive the renderer's on_progress with + a synthetic optimize event (mirrors the in-process emission). Asserts the + phase renders and the raw line is not echoed.""" + import contextlib + + from java_codebase_rag import cli as cli_mod + import server + + idx = tmp_path / "idx_optimize_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + + async def _fake_refresh(*, quiet=False, verbose=True, on_progress=None, on_progress_console=None): + # Emit a synthetic optimize event (mirrors lance_optimize's in-process + # emission) so the renderer's optimize task is exercised. + if on_progress is not None: + on_progress(ProgressEvent(kind="optimize", phase=None, pass_=None, done=None, total=None, status="running", elapsed_s=None)) + on_progress(ProgressEvent(kind="optimize", phase=None, pass_=None, done=None, total=None, status="done", elapsed_s=1.2)) + from server import RefreshIndexOutput + + return RefreshIndexOutput(success=True, message=None, phases_run=["vectors", "graph"]) + + monkeypatch.setattr(server, "run_refresh_pipeline", _fake_refresh) + + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["reprocess", "--source-root", str(corpus_root), "--index-dir", str(idx)] + ) + assert rc == 0 + err = buf.getvalue() + # The raw structured line is consumed by the parser, never raw-relayed. + assert "JCIRAG_PROGRESS kind=optimize" not in err + # The optimize phase rendered (non-TTY concise fallback prints an + # "optimize done ..." line for the terminal event). + assert "optimize" in err.lower() + + +# --------------------------------------------------------------------------- +# Light: retirement guard (test 8) +# --------------------------------------------------------------------------- + + +def test_spinner_removed_and_emit_vectors_helpers_removed() -> None: + """Spinner and emit_vectors_start/_finish are no longer importable; no + remaining references anywhere in the production tree.""" + from java_codebase_rag import cli_format, cli_progress + + # The retired symbols must not be importable. + assert not hasattr(cli_format, "Spinner"), "Spinner should have been removed from cli_format" + assert not hasattr(cli_progress, "emit_vectors_start"), "emit_vectors_start should have been removed" + assert not hasattr(cli_progress, "emit_vectors_finish"), "emit_vectors_finish should have been removed" + # And no remaining references anywhere in the production tree. + repo_root = Path(__file__).resolve().parent.parent + offenders: list[str] = [] + for py in (repo_root / "java_codebase_rag").rglob("*.py"): + text = py.read_text(encoding="utf-8") + # Word-boundary match for the retired Spinner class (not rich's SpinnerColumn). + if re.search(r"\bSpinner\b", text): + offenders.append(str(py)) + server_py = repo_root / "server.py" + if server_py.is_file(): + text = server_py.read_text(encoding="utf-8") + if re.search(r"\bSpinner\b", text) or "emit_vectors_start" in text or "emit_vectors_finish" in text: + offenders.append(str(server_py)) + assert not offenders, f"retired symbols still referenced in: {offenders}" + + +# --------------------------------------------------------------------------- +# Heavy: real cocoindex flow emission + pre-walk divergence (tests 1, 5) +# --------------------------------------------------------------------------- + + +def _cocoindex_bin() -> Path: + return Path(sys.executable).parent / "cocoindex" + + +def _require_cocoindex_runtime_deps() -> None: + try: + import tree_sitter_java # noqa: F401 + except ImportError as exc: + pytest.skip(f"heavy e2e needs project deps: {exc}") + + +pytestmark_heavy = pytest.mark.skipif( + not HEAVY, + reason="set JAVA_CODEBASE_RAG_RUN_HEAVY=1 to run the cocoindex vectors-flow test", +) + + +def _run_cocoindex_update(corpus_root: Path, index_dir: Path) -> subprocess.CompletedProcess: + """Run a real ``cocoindex update --full-reprocess`` and return the result.""" + _require_cocoindex_runtime_deps() + cocoindex_bin = _cocoindex_bin() + if not cocoindex_bin.is_file(): + pytest.skip(f"cocoindex not installed in venv: {cocoindex_bin}") + bundle_dir = Path(__file__).resolve().parent.parent + flow = (bundle_dir / "java_index_flow_lancedb.py").resolve() + start = Path(corpus_root).resolve() + relp = os.path.relpath(str(flow), start=str(start)) + relp = Path(relp).as_posix() + app_spec = f"{relp}:JavaCodeIndexLance" + env = { + **os.environ, + "JAVA_CODEBASE_RAG_INDEX_DIR": str(index_dir.resolve()), + "JAVA_CODEBASE_RAG_SOURCE_ROOT": str(Path(corpus_root).resolve()), + } + return subprocess.run( + [str(cocoindex_bin), "update", app_spec, "--full-reprocess", "-f"], + cwd=str(corpus_root), + env=env, + capture_output=True, + text=True, + timeout=900, + ) + + +@pytestmark_heavy +def test_flow_emits_vectors_progress_per_file(corpus_root: Path, tmp_path: Path) -> None: + """A real ``cocoindex update`` emits ``JCIRAG_PROGRESS kind=vectors`` lines + in captured stderr: the one-shot approximate ``total=`` line from app_main + plus per-file ``done=`` ticks (the gating spike, promoted to a regression test).""" + index_dir = tmp_path / ".java-codebase-rag" + index_dir.mkdir(parents=True) + proc = _run_cocoindex_update(corpus_root, index_dir) + assert proc.returncode == 0, f"cocoindex failed: stdout={proc.stdout}\nstderr={proc.stderr}" + lines = [ln for ln in proc.stderr.splitlines() if "JCIRAG_PROGRESS kind=vectors" in ln] + assert lines, f"expected vectors progress lines, got stderr:\n{proc.stderr}" + # The one-shot approximate total line from app_main. + totals = [ln for ln in lines if "total=" in ln and "done=" not in ln] + assert totals, f"expected a one-shot total line; lines: {lines!r}" + # Per-file done ticks (throttled every ~25 files). + ticks = [ln for ln in lines if "done=" in ln] + assert ticks, f"expected per-file done ticks; lines: {lines!r}" + # Done ticks are monotonic non-decreasing. + done_vals = [int(m.group(1)) for ln in ticks if (m := re.search(r"done=(\d+)", ln))] + assert done_vals, f"could not parse done values from: {ticks!r}" + assert done_vals == sorted(done_vals), f"done ticks must be monotonic: {done_vals}" + + +@pytestmark_heavy +def test_pre_walk_total_divergence_bounded(corpus_root: Path, tmp_path: Path) -> None: + """On the fixture, the approximate pre-walk total exactly equals the number of + files actually processed (the spike measured gap == 0). + + The TRUE processed count is read from the LanceDB tables (distinct + ``filename`` values across the three tables) rather than the throttled + ``done=k`` tick stream: ticks fire only every 25th file, so ``max(done)`` + lands on a multiple of 25 and would mask the real divergence. The accepted + over-count on larger trees is the ignored / empty / undecodable file count + (the renderer clamps to total on completion regardless).""" + index_dir = tmp_path / ".java-codebase-rag" + index_dir.mkdir(parents=True) + proc = _run_cocoindex_update(corpus_root, index_dir) + assert proc.returncode == 0, f"cocoindex failed: stdout={proc.stdout}\nstderr={proc.stderr}" + lines = [ln for ln in proc.stderr.splitlines() if "JCIRAG_PROGRESS kind=vectors" in ln] + # The one-shot approximate total. + total_lines = [ln for ln in lines if "total=" in ln and "done=" not in ln] + assert total_lines, f"expected a total line; lines: {lines!r}" + total_match = re.search(r"total=(\d+)", total_lines[0]) + assert total_match, f"could not parse total from: {total_lines[0]!r}" + pre_walk_total = int(total_match.group(1)) + # The tick stream is throttled (every 25th file), so it cannot yield the + # true processed count. Read the ground truth from the LanceDB tables: + # distinct filename values across the three tables the flow populated. + import lancedb + + db = lancedb.connect(str(index_dir.resolve())) + actual_done = 0 + for tname in ("javacodeindex_java_code", "sqlschemaindex_sql_schema", "yamlconfigindex_yaml_config"): + try: + tbl = db.open_table(tname) + except Exception as exc: # pragma: no cover - table missing only on a broken flow + raise AssertionError(f"Lance table {tname} missing after flow: {exc}") from exc + rows = tbl.search().select(["filename"]).limit(1_000_000).to_list() + actual_done += len({r["filename"] for r in rows if r.get("filename") is not None}) + # On this fixture the pre-walk matches exactly (gap == 0): all counted + # files are non-empty / decodable / not-ignored, and the YAML predicate + # was fixed to include application*.yml. The accepted over-count on larger + # trees is the ignored / empty file count (the renderer clamps regardless). + gap = pre_walk_total - actual_done + assert gap >= 0, ( + f"pre-walk total {pre_walk_total} < actual done {actual_done} (under-count: " + f"a matcher predicate is dropping files the flow still processes)" + ) + assert gap == 0, ( + f"pre-walk over-count on fixture: pre_walk_total={pre_walk_total} " + f"actual_done={actual_done} gap={gap} (expected 0; the over-count is the " + f"ignored/empty/undecodable file count — the renderer clamps regardless)" + )