diff --git a/java_codebase_rag/cli.py b/java_codebase_rag/cli.py index 02173cf..7981c84 100644 --- a/java_codebase_rag/cli.py +++ b/java_codebase_rag/cli.py @@ -146,8 +146,13 @@ def _run_with_pipeline_progress( return int(work(None)) from java_codebase_rag.progress import IndexProgressRenderer, ProgressEvent - # PR-2 owns the graph task only; vectors/optimize stay pending (PR-3). - phases = ["graph"] + # PR-3 owns all three tasks in order: Vectors → Optimize → Graph. The vectors + # task is fed by the cocoindex child's per-file ticks + approximate total + # (subprocess transport, parsed by ProgressRelay); the optimize task is fed + # in-process by lance_optimize; the graph task is fed by the build_ast_graph + # child (subprocess transport). A task only becomes visible/running once its + # first event arrives. + phases = ["vectors", "optimize", "graph"] renderer = IndexProgressRenderer(phases) progress = PipelineProgress(renderer=renderer) @@ -318,6 +323,8 @@ def work(progress: "PipelineProgress | None") -> int: quiet=bool(args.quiet), verbose=verbose, lance_project_root=None if args.quiet else cfg.source_root, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) if coco.returncode != 0: _emit( @@ -378,6 +385,8 @@ def work(progress: "PipelineProgress | None") -> int: quiet=bool(args.quiet), verbose=bool(args.verbose), lance_project_root=None if args.quiet else cfg.source_root, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, ) if coco.returncode != 0: _emit( @@ -455,7 +464,11 @@ def work(progress: "PipelineProgress | None") -> int: graph_only = bool(getattr(args, "graph_only", False)) if vectors_only: - coco = run_cocoindex_update(env, full_reprocess=True, quiet=bool(args.quiet), verbose=verbose) + coco = run_cocoindex_update( + env, full_reprocess=True, quiet=bool(args.quiet), verbose=verbose, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, + ) if _is_cocoindex_preflight_blocker(coco): payload: dict[str, Any] = { "success": False, @@ -530,7 +543,14 @@ def work(progress: "PipelineProgress | None") -> int: import server # lazy: pulls sentence_transformers/torch/lancedb/kuzu - result = asyncio.run(server.run_refresh_pipeline(quiet=bool(args.quiet), verbose=verbose)) + result = asyncio.run( + server.run_refresh_pipeline( + quiet=bool(args.quiet), + verbose=verbose, + on_progress=progress.on_progress if progress is not None else None, + on_progress_console=progress.console if progress is not None else None, + ) + ) payload = result.model_dump() _emit_reprocess_outcome(payload) return _reprocess_exit_code(payload) diff --git a/java_codebase_rag/cli_format.py b/java_codebase_rag/cli_format.py index a8b7e80..73c5dee 100644 --- a/java_codebase_rag/cli_format.py +++ b/java_codebase_rag/cli_format.py @@ -1,10 +1,7 @@ """TTY-aware ANSI formatting for CLI stderr progress.""" from __future__ import annotations -import itertools import sys -import threading -import time _RESET = "\033[0m" _BOLD = "\033[1m" @@ -16,8 +13,6 @@ CHECK = "✓" CROSS = "✗" -_SPINNER_FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" - _NOISE_CONTAINS: tuple[bytes, ...] = ( b"lance::", b"FutureWarning", @@ -88,33 +83,3 @@ def styled_check() -> str: def styled_cross() -> str: return red(CROSS) if stderr_is_tty() else CROSS - - -class Spinner: - """Braille spinner that overwrites the current stderr line until stopped.""" - - def __init__(self, label: str) -> None: - self._label = label - self._stop = threading.Event() - self._thread: threading.Thread | None = None - - def start(self) -> None: - self._thread = threading.Thread(target=self._run, name="spinner", daemon=True) - self._thread.start() - - def stop(self) -> None: - self._stop.set() - if self._thread is not None: - self._thread.join(timeout=2.0) - sys.stderr.buffer.write(b"\r\x1b[2K") - sys.stderr.buffer.flush() - - def _run(self) -> None: - frames = itertools.cycle(_SPINNER_FRAMES) - t0 = time.monotonic() - while not self._stop.wait(0.3): - elapsed = time.monotonic() - t0 - frame = next(frames) - line = f"\r{frame} {self._label} · {elapsed:.0f}s" - sys.stderr.buffer.write(line.encode()) - sys.stderr.buffer.flush() diff --git a/java_codebase_rag/cli_progress.py b/java_codebase_rag/cli_progress.py index 0419252..8d4e518 100644 --- a/java_codebase_rag/cli_progress.py +++ b/java_codebase_rag/cli_progress.py @@ -5,28 +5,10 @@ import sys from typing import Callable -from java_codebase_rag.cli_format import bold_cyan, is_noise_line, styled_check, styled_cross +from java_codebase_rag.cli_format import is_noise_line from java_codebase_rag.progress import ProgressEvent, make_relay -def emit_vectors_start() -> None: - print( - bold_cyan("[vectors]") + " running · cocoindex update", - file=sys.stderr, - flush=True, - ) - - -def emit_vectors_finish(*, elapsed_s: float, exit_code: int) -> None: - marker = styled_check() if exit_code == 0 else styled_cross() - print( - f"{marker} {bold_cyan('[vectors]')} finished · {elapsed_s:.2f}s" - + (f" (exit={exit_code})" if exit_code != 0 else ""), - file=sys.stderr, - flush=True, - ) - - class _AsyncLineFilter: """Buffers byte chunks and relays only non-noise lines to stderr (async drain path).""" diff --git a/java_codebase_rag/lance_optimize.py b/java_codebase_rag/lance_optimize.py index cb435dd..b16bebd 100644 --- a/java_codebase_rag/lance_optimize.py +++ b/java_codebase_rag/lance_optimize.py @@ -21,7 +21,14 @@ import asyncio import sys +import time from pathlib import Path +from typing import Callable, Literal + +# Mirrors ``ProgressStatus`` in ``progress.py``; kept local (rather than imported) +# so this module never pays the ``rich`` cost at import time — see +# ``_make_optimize_event``. +_OptimizeStatus = Literal["running", "done", "failed"] # Single source of truth for the three Lance table names created by the flow. # Keep in sync with ``search_lancedb.TABLES`` (the values there mirror these). @@ -31,6 +38,33 @@ "yamlconfigindex_yaml_config", ) + +def _make_optimize_event( + *, + status: _OptimizeStatus, + elapsed_s: float | None = None, +): + """Build a ``ProgressEvent(kind="optimize", …)`` lazily (progress is parent-side). + + ``lance_optimize`` runs in-process in the parent (called by + ``pipeline._maybe_run_serialized_optimize`` and + ``server.run_refresh_pipeline``); it routes progress to the renderer via the + in-process ``on_progress`` callback — NOT via stderr (which would corrupt + the Live region). The import is local so the flow (which imports + ``LANCE_TABLE_NAMES`` at definition time) never pays the ``rich`` cost. + """ + from java_codebase_rag.progress import ProgressEvent + + return ProgressEvent( + kind="optimize", + phase=None, + pass_=None, + done=None, + total=None, + status=status, + elapsed_s=elapsed_s, + ) + # Commit conflicts are transient; a handful of exponential-backoff retries is # enough because, post-flow, there are no concurrent writers — only successive # optimize/compaction passes within this single serialized call can still @@ -60,7 +94,12 @@ async def _list_table_names(db: object) -> set[str]: return set(await db.table_names()) -async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict[str, str]: +async def optimize_lance_tables( + index_dir: Path, + *, + quiet: bool = False, + on_progress: Callable | None = None, +) -> dict[str, str]: """Optimize all known Lance tables under *index_dir*, serially, with retry. Runs ``table.optimize()`` for each name in :data:`LANCE_TABLE_NAMES` that @@ -73,6 +112,13 @@ async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict index_dir: directory holding the Lance tables (the flow's LanceDB URI). quiet: when True, suppress the per-table success/skip info lines on stderr (errors are always logged). + on_progress: optional in-process progress callback (the parent's + renderer ``on_progress``). When given, emits + ``ProgressEvent(kind="optimize", status="running")`` on entry and a + terminal ``status="done"``/``"failed"`` event on exit (covers BOTH + call sites: ``pipeline._maybe_run_serialized_optimize`` and + ``server.run_refresh_pipeline``). In-process only — NEVER prints to + stderr (that would corrupt the Live region). Returns: Mapping of table name → status. Values are ``"ok"``, ``"skipped"`` @@ -82,67 +128,95 @@ async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict # not pay the lancedb import cost at flow-definition time. import lancedb + if on_progress is not None: + on_progress(_make_optimize_event(status="running")) + t0 = time.perf_counter() results: dict[str, str] = {} - db = await lancedb.connect_async(str(index_dir)) + failed = False try: + db = await lancedb.connect_async(str(index_dir)) try: - existing = await _list_table_names(db) - except Exception as exc: - print( - f"java-codebase-rag: optimize: failed to list tables in " - f"{index_dir}: {exc}", - file=sys.stderr, - ) - return {name: f"error: list failed: {exc}" for name in LANCE_TABLE_NAMES} - - for name in LANCE_TABLE_NAMES: - if name not in existing: - results[name] = "skipped" - if not quiet: - print( - f"java-codebase-rag: optimize: {name} absent, skipped", - file=sys.stderr, - ) - continue try: - table = await db.open_table(name) + existing = await _list_table_names(db) except Exception as exc: - results[name] = f"error: open failed: {exc}" print( - f"java-codebase-rag: optimize: {name} open failed: {exc}", + f"java-codebase-rag: optimize: failed to list tables in " + f"{index_dir}: {exc}", file=sys.stderr, ) - continue - - last_exc: BaseException | None = None - for attempt in range(_MAX_ATTEMPTS): + failed = True + return {name: f"error: list failed: {exc}" for name in LANCE_TABLE_NAMES} + + for name in LANCE_TABLE_NAMES: + if name not in existing: + results[name] = "skipped" + if not quiet: + print( + f"java-codebase-rag: optimize: {name} absent, skipped", + file=sys.stderr, + ) + continue try: - await table.optimize() - last_exc = None - break + table = await db.open_table(name) except Exception as exc: - last_exc = exc - if _is_retryable(exc) and attempt < _MAX_ATTEMPTS - 1: - await asyncio.sleep(_BASE_BACKOFF_S * (2**attempt)) - continue - # Non-retryable, or retries exhausted: stop the loop and - # surface below — do not swallow silently. - break - - if last_exc is None: - results[name] = "ok" - if not quiet: + results[name] = f"error: open failed: {exc}" + failed = True print( - f"java-codebase-rag: optimize: {name} ok", + f"java-codebase-rag: optimize: {name} open failed: {exc}", file=sys.stderr, ) - else: - results[name] = f"error: {last_exc}" - print( - f"java-codebase-rag: optimize: {name} failed: {last_exc}", - file=sys.stderr, - ) + continue + + last_exc: BaseException | None = None + for attempt in range(_MAX_ATTEMPTS): + try: + await table.optimize() + last_exc = None + break + except Exception as exc: + last_exc = exc + if _is_retryable(exc) and attempt < _MAX_ATTEMPTS - 1: + await asyncio.sleep(_BASE_BACKOFF_S * (2**attempt)) + continue + # Non-retryable, or retries exhausted: stop the loop and + # surface below — do not swallow silently. + break + + if last_exc is None: + results[name] = "ok" + if not quiet: + print( + f"java-codebase-rag: optimize: {name} ok", + file=sys.stderr, + ) + else: + results[name] = f"error: {last_exc}" + failed = True + print( + f"java-codebase-rag: optimize: {name} failed: {last_exc}", + file=sys.stderr, + ) + finally: + # ``AsyncConnection.close`` is a *sync* method in lancedb 0.30.x. + db.close() + return results + except Exception: + # An unexpected exception (e.g. ``connect_async`` raised, or a table- + # independent failure) must still flip the terminal event to failed so + # the renderer's task doesn't render a green check on a crash. Re-raise + # after marking — the caller (``_maybe_run_serialized_optimize`` / + # ``run_refresh_pipeline``) treats optimize failure as non-fatal and + # logs it, but the renderer must reflect the truth. + failed = True + raise finally: - # ``AsyncConnection.close`` is a *sync* method in lancedb 0.30.x. - db.close() - return results + # Always emit a terminal optimize event so the renderer's task never + # hangs at "running" — even on exception (the parent treats a failed + # optimize as non-fatal: the index is still searchable un-compacted). + if on_progress is not None: + on_progress( + _make_optimize_event( + status="failed" if failed else "done", + elapsed_s=time.perf_counter() - t0, + ) + ) diff --git a/java_codebase_rag/pipeline.py b/java_codebase_rag/pipeline.py index d5eb7a0..fe6376a 100644 --- a/java_codebase_rag/pipeline.py +++ b/java_codebase_rag/pipeline.py @@ -11,8 +11,7 @@ from pathlib import Path from typing import Callable -from java_codebase_rag.cli_format import Spinner, is_noise_line, stderr_is_tty -from java_codebase_rag.cli_progress import emit_vectors_finish, emit_vectors_start +from java_codebase_rag.cli_format import is_noise_line from java_codebase_rag.config import cocoindex_subprocess_env_defaults from java_codebase_rag.progress import ProgressEvent, ProgressRelay, make_relay @@ -129,6 +128,8 @@ def run_cocoindex_update( quiet: bool, verbose: bool = True, lance_project_root: Path | None = None, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> subprocess.CompletedProcess[str]: result = _run_cocoindex_update_impl( env, @@ -136,19 +137,25 @@ def run_cocoindex_update( quiet=quiet, verbose=verbose, lance_project_root=lance_project_root, + on_progress=on_progress, + on_progress_console=on_progress_console, ) # After cocoindex returns exit 0 there are no concurrent writers, so this # is the safe window to compact the Lance tables. The flow disabled its # in-flight background optimize (see java_index_flow_lancedb.py), making # this serialized pass the sole optimizer. Optimize failure does not flip # the cocoindex CompletedProcess (a successful index is still usable, just - # not compacted); the outcome is logged to stderr only. + # not compacted); the outcome is logged to stderr only. Thread the + # in-process on_progress so the optimize phase renders via the same + # renderer (the flow cannot emit it — it runs in the child). if result.returncode == 0: - _maybe_run_serialized_optimize(env, quiet=quiet) + _maybe_run_serialized_optimize(env, quiet=quiet, on_progress=on_progress) return result -def _maybe_run_serialized_optimize(env: dict[str, str], *, quiet: bool) -> None: +def _maybe_run_serialized_optimize( + env: dict[str, str], *, quiet: bool, on_progress: Callable | None = None +) -> None: """Resolve the index dir from *env* and run the serialized Lance optimize. The flow's lifespan reads ``JAVA_CODEBASE_RAG_INDEX_DIR`` (set by the CLI / @@ -167,7 +174,7 @@ def _maybe_run_serialized_optimize(env: dict[str, str], *, quiet: bool) -> None: try: from java_codebase_rag.lance_optimize import optimize_lance_tables - asyncio.run(optimize_lance_tables(Path(idx_raw), quiet=quiet)) + asyncio.run(optimize_lance_tables(Path(idx_raw), quiet=quiet, on_progress=on_progress)) except Exception as exc: # Never crash the CLI on an optimize failure — surface on stderr only. print(f"java-codebase-rag: optimize failed: {exc}", file=sys.stderr) @@ -180,9 +187,15 @@ def _run_cocoindex_update_impl( quiet: bool, verbose: bool = True, lance_project_root: Path | None = None, + on_progress: Callable[[ProgressEvent], None] | None = None, + on_progress_console: object | None = None, ) -> subprocess.CompletedProcess[str]: exe = cocoindex_bin() if not exe.is_file(): + # 127 pre-spawn stub: never mark the vectors task running — emit a + # terminal failed event so the renderer doesn't leave it hung. + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return subprocess.CompletedProcess( args=[str(exe)], returncode=127, @@ -192,6 +205,8 @@ def _run_cocoindex_update_impl( bd = bundle_dir() flow = bd / "java_index_flow_lancedb.py" if not flow.is_file(): + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return subprocess.CompletedProcess( args=[], returncode=126, @@ -218,17 +233,10 @@ def _run_cocoindex_update_impl( text=True, ) - emit_progress = lance_project_root is not None - use_spinner = emit_progress and stderr_is_tty() - if emit_progress and not use_spinner: - emit_vectors_start() - spinner: Spinner | None = None - if use_spinner: - spinner = Spinner("[vectors] running · cocoindex update") - spinner.start() t0 = time.perf_counter() code = -1 out_s, err_s = "", "" + proc: subprocess.Popen[bytes] | None = None try: proc = subprocess.Popen( cmd, @@ -238,12 +246,27 @@ def _run_cocoindex_update_impl( stderr=subprocess.PIPE, bufsize=0, ) - out_s, err_s, code = _popen_capturing_stderr(proc, verbose=verbose) + # Vectors task is marked running only AFTER Popen succeeds — the flow's + # per-file ticks + approximate total stream in from the child via the + # relay (parsed by ProgressRelay, routed to on_progress). + out_s, err_s, code = _popen_capturing_stderr( + proc, verbose=verbose, on_progress=on_progress, on_progress_console=on_progress_console + ) finally: - if spinner is not None: - spinner.stop() - if emit_progress: - emit_vectors_finish(elapsed_s=time.perf_counter() - t0, exit_code=code) + # The flow cannot emit the terminal vectors event (no "all files done" + # hook in cocoindex flows), so the PARENT emits it here based on the + # cocoindex exit code. This drives clamp-on-completion + the phase + # transition to Optimize. Emitted even on a spawn failure (code stays + # -1 → failed) so the renderer's task never hangs at running. + if on_progress is not None: + elapsed = time.perf_counter() - t0 + status = "done" if code == 0 else "failed" + on_progress( + ProgressEvent( + kind="vectors", phase=None, pass_=None, done=None, total=None, + status=status, elapsed_s=elapsed, + ) + ) return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s) diff --git a/java_index_flow_lancedb.py b/java_index_flow_lancedb.py index 6213616..10425a2 100644 --- a/java_index_flow_lancedb.py +++ b/java_index_flow_lancedb.py @@ -18,10 +18,13 @@ import inspect import os +import sys +import threading import uuid from collections.abc import AsyncIterator from contextlib import asynccontextmanager from dataclasses import dataclass +from fnmatch import fnmatch from pathlib import Path from typing import Annotated, Any @@ -84,6 +87,138 @@ _NUM_TXN_BEFORE_OPTIMIZE = 10**12 +# --- Vectors-phase progress emission (JCIRAG_PROGRESS kind=vectors) ----------- +# +# The flow runs in a CHILD cocoindex process; it prints structured progress to +# its stderr and the parent (pipeline._popen_capturing_stderr / +# cli_progress.accumulate_and_relay_subprocess_streams) parses it via +# ProgressRelay and feeds the renderer. The flow CANNOT know when all files are +# done (cocoindex offers no "all files done" hook in the flow), so it emits: +# - ONE ``total=N status=running`` line from ``app_main`` (approximate +# pre-walk: matcher includes + LayeredIgnore), and +# - per-file ``done=k status=running`` ticks (throttled every ~25 files) from +# ``process_*_file`` (shared atomic counter). +# The PARENT emits the terminal ``status=done``/``failed`` vectors event on +# cocoindex exit (drives clamp-on-completion + phase transition to Optimize). + +# Per-file tick cadence: bound stderr volume on huge trees without making the +# bar feel stale. Every 25th file (and the modulo boundary is enough — the +# parent clamps to total on the terminal event anyway). +_VECTORS_TICK_EVERY = 25 + +# Thread-safe counter: cocoindex may call process_*_file concurrently +# (mount_each parallelism is implementation-defined). A module-level lock guards +# both the counter and the emission so two threads never interleave a tick. +_vectors_done_lock = threading.Lock() +_vectors_done_count = 0 + + +def _emit_vectors_progress( + *, + done: int | None = None, + total: int | None = None, + status: str = "running", + elapsed_s: float | None = None, +) -> None: + """Emit one ``JCIRAG_PROGRESS kind=vectors …`` line to stderr (flushed). + + Field order is fixed (kind, done, total, status, elapsed_s) so the parser + and tests can pin substrings. Omitted fields are simply absent. + """ + fields = ["kind=vectors"] + if done is not None: + fields.append(f"done={done}") + if total is not None: + fields.append(f"total={total}") + fields.append(f"status={status}") + if elapsed_s is not None: + fields.append(f"elapsed_s={elapsed_s:.2f}") + print("JCIRAG_PROGRESS " + " ".join(fields), file=sys.stderr, flush=True) + + +def _tick_vectors_done() -> None: + """Increment the shared per-file counter and emit a throttled ``done=k`` tick. + + Called once per successfully-processed file (after the ignore / empty + early-returns). The tick is emitted every ``_VECTORS_TICK_EVERY`` files so + stderr volume stays bounded on huge trees; the parent clamps to total on + the terminal event, so the exact tick cadence is not load-bearing. + """ + global _vectors_done_count + with _vectors_done_lock: + _vectors_done_count += 1 + n = _vectors_done_count + if n % _VECTORS_TICK_EVERY != 0: + return + # Emit under the lock: the docstring above promises the lock guards both + # the counter AND the emission, so two concurrent ticks can't emit their + # ``done=N`` lines out of order. Contention is negligible (fires every + # ~25 files). + _emit_vectors_progress(done=n, status="running") + + +def _approximate_vectors_total(project_root: Path) -> int: + """Reproduce the matchers' include globs + LayeredIgnore for an approximate total. + + The flow applies two filtering layers: (1) ``PatternFilePathMatcher`` + excludes at walk time via ``LayeredIgnore.cocoindex_excluded_patterns()``, + then (2) ``LayeredIgnore.is_ignored()`` plus an early-return for empty / + undecodable files inside each ``process_*_file``. Files that early-return + never tick, so this pre-walk OVERSTATES the total by the ignored / empty + count. The parent clamps the bar to 100% on the terminal ``status=done`` + event, so the over-count cannot stall the bar. + + Mirrors the three ``localfs.walk_dir`` matchers in ``app_main``: + - ``**/*.java`` + - ``**/src/main/resources/db/migration/*.sql`` + - ``**/src/main/resources/application*.yml`` and ``.yaml`` + """ + ignore = LayeredIgnore(project_root) + excluded = ignore.cocoindex_excluded_patterns() + + def _excluded(rel_posix: str) -> bool: + return any(fnmatch(rel_posix, pat) for pat in excluded) + + total = 0 + for dirpath, dirnames, filenames in os.walk(project_root): + # Prune the same universal nuisance dirs as iter_java_source_files / + # cocoindex walk. (build-output pruning is matcher-dependent in the + # real walk; for an APPROXIMATE total this cheap prune is sufficient + # — the clamp absorbs any residual divergence.) + dirnames[:] = [ + d for d in dirnames if d not in (".git", ".hg", ".svn", "node_modules", ".venv", "venv") + ] + for fn in filenames: + full = Path(dirpath) / fn + try: + rel = full.resolve().relative_to(project_root).as_posix() + except ValueError: + continue + if _excluded(rel): + continue + # Java: **/*.java + if fn.endswith(".java"): + if not ignore.is_ignored(full)[0]: + total += 1 + continue + # SQL: **/src/main/resources/db/migration/*.sql + if fn.endswith(".sql") and "/db/migration/" in rel: + if not ignore.is_ignored(full)[0]: + total += 1 + continue + # YAML: **/src/main/resources/application*.yml / .yaml + # NOTE: ``fn`` is the bare filename (e.g. ``application-cloud.yml``), so + # the prefix predicate must be ``fn.startswith("application")`` — + # ``"/application" in fn`` was always False (no leading slash in a bare + # name) and under-counted every application YAML, driving the pre-walk + # total below the actual done count. The ``rel``-based + # ``"/src/main/resources/"`` gate stays (full path component). + if fn.endswith((".yml", ".yaml")) and fn.startswith("application") and "/src/main/resources/" in rel: + if not ignore.is_ignored(full)[0]: + total += 1 + return total + + @dataclass class JavaLanceChunk: id: str @@ -187,6 +322,8 @@ async def process_java_file( if not content.strip(): return + _tick_vectors_done() + language = detect_code_language(filename=file.file_path.path.name) or "text" cs, mn, ov = JAVA_CHUNK chunks = splitter.split( @@ -251,6 +388,8 @@ async def process_sql_file( if not content.strip(): return + _tick_vectors_done() + language = "sql" cs, mn, ov = SQL_CHUNK chunks = splitter.split( @@ -295,6 +434,8 @@ async def process_yaml_file( if not content.strip(): return + _tick_vectors_done() + ext = file.file_path.path.suffix.lower() language = "yaml" if ext in (".yml", ".yaml") else "text" cs, mn, ov = YAML_CHUNK @@ -362,6 +503,20 @@ async def app_main() -> None: project_root = coco.use_context(PROJECT_ROOT) _ignore = LayeredIgnore(project_root) _walk_excludes = _ignore.cocoindex_excluded_patterns() + # Emit ONE approximate total so the parent's renderer can show a determinate + # bar (clamps to 100% on the terminal vectors event the parent emits on + # cocoindex exit). Approximate — ignored / empty files over-state it; see + # ``_approximate_vectors_total``. ``--full-reprocess`` only: on incremental + # catch-up the @coco.fn(memo=True) cache skips unchanged files, so no total + # is knowable up front → the parent renders indeterminate from the absence. + try: + total = _approximate_vectors_total(project_root) + if total > 0: + _emit_vectors_progress(total=total, status="running") + except Exception: + # The pre-walk must never break indexing — a failure here just means + # the parent falls back to indeterminate. Swallow and continue. + pass java_files = localfs.walk_dir( PROJECT_ROOT, recursive=True, diff --git a/server.py b/server.py index c151f94..526c4aa 100644 --- a/server.py +++ b/server.py @@ -13,9 +13,8 @@ from index_common import SBERT_MODEL from java_codebase_rag.cli_progress import ( accumulate_and_relay_subprocess_streams, - emit_vectors_finish, - emit_vectors_start, ) +from java_codebase_rag.progress import ProgressEvent from java_codebase_rag._fdlimit import raise_fd_limit from java_codebase_rag.config import ( cocoindex_subprocess_env_defaults, @@ -270,10 +269,20 @@ def list_code_index_tables_payload() -> IndexInfoOutput: ) -async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> RefreshIndexOutput: +async def run_refresh_pipeline( + *, + quiet: bool = False, + verbose: bool = True, + on_progress=None, + on_progress_console: object | None = None, +) -> RefreshIndexOutput: root = _project_root() cocoindex_bin = Path(sys.executable).parent / "cocoindex" if not cocoindex_bin.is_file(): + # 127 pre-spawn: emit a terminal failed vectors event so the renderer's + # task doesn't hang at running (matches the sync pipeline path). + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return RefreshIndexOutput( success=False, message=f"cocoindex not found next to Python: {cocoindex_bin}", @@ -286,6 +295,8 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> if fallback.is_file(): flow_path = fallback else: + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return RefreshIndexOutput( success=False, message=f"java_index_flow_lancedb.py not found under {root} nor {bundle_dir}", @@ -308,13 +319,14 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> ) out_b, err_b = await proc.communicate() except Exception as exc: + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return RefreshIndexOutput( success=False, message=f"spawn failed: {exc!s}", phases_run=[], ) else: - emit_vectors_start() t0 = time.perf_counter() code_c = -1 try: @@ -329,16 +341,30 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - out_b, err_b = await accumulate_and_relay_subprocess_streams(proc, relay=True, verbose=verbose) + # The vectors task is fed by the child's per-file ticks + the + # approximate total line, parsed by the ProgressRelay inside the + # async drain and routed to on_progress. + out_b, err_b = await accumulate_and_relay_subprocess_streams( + proc, relay=True, verbose=verbose, + on_progress=on_progress, on_progress_console=on_progress_console, + ) code_c = proc.returncode if proc.returncode is not None else -1 except Exception as exc: + if on_progress is not None: + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status="failed", elapsed_s=None)) return RefreshIndexOutput( success=False, message=f"spawn failed: {exc!s}", phases_run=[], ) finally: - emit_vectors_finish(elapsed_s=time.perf_counter() - t0, exit_code=code_c) + # The parent emits the terminal vectors event (the flow can't — no + # "all files done" hook). Drives clamp-on-completion + phase + # transition to Optimize. + if on_progress is not None: + elapsed = time.perf_counter() - t0 + status = "done" if code_c == 0 else "failed" + on_progress(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=None, status=status, elapsed_s=elapsed)) assert proc is not None out = out_b.decode(errors="replace") err = err_b.decode(errors="replace") @@ -366,7 +392,7 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> idx_dir = Path(idx_raw) else: idx_dir = (root / ".java-codebase-rag").resolve() - await optimize_lance_tables(idx_dir, quiet=quiet) + await optimize_lance_tables(idx_dir, quiet=quiet, on_progress=on_progress) except Exception as exc: optimize_error = f"lance optimize failed: {exc}" print(f"java-codebase-rag: {optimize_error}", file=sys.stderr) @@ -394,7 +420,10 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> if quiet: gout_b, gerr_b = await gproc.communicate() else: - gout_b, gerr_b = await accumulate_and_relay_subprocess_streams(gproc, relay=True, verbose=verbose) + gout_b, gerr_b = await accumulate_and_relay_subprocess_streams( + gproc, relay=True, verbose=verbose, + on_progress=on_progress, on_progress_console=on_progress_console, + ) graph_code = gproc.returncode graph_out = gout_b.decode(errors="replace") graph_err = gerr_b.decode(errors="replace") diff --git a/tests/test_cli_progress_stdout_invariant.py b/tests/test_cli_progress_stdout_invariant.py index 739f879..81fd670 100644 --- a/tests/test_cli_progress_stdout_invariant.py +++ b/tests/test_cli_progress_stdout_invariant.py @@ -160,7 +160,7 @@ def test_cli_lifecycle_stdout_invariant_reprocess( baseline = (_FIXTURE_DIR / "reprocess_quiet_success.stdout.txt").read_text(encoding="utf-8") - async def fake_refresh(*, quiet: bool = False, verbose: bool = True) -> RefreshIndexOutput: + async def fake_refresh(*, quiet: bool = False, verbose: bool = True, on_progress=None, on_progress_console=None) -> RefreshIndexOutput: _ = quiet _ = verbose return RefreshIndexOutput( diff --git a/tests/test_java_codebase_rag_cli.py b/tests/test_java_codebase_rag_cli.py index 3b35324..aa8f3b9 100644 --- a/tests/test_java_codebase_rag_cli.py +++ b/tests/test_java_codebase_rag_cli.py @@ -579,7 +579,7 @@ def test_reprocess_graph_only_then_increment_graph_is_noop( hash_file.write_text(json.dumps(data), encoding="utf-8") # Stub cocoindex so increment exercises ONLY its graph stage. - def _noop_coco(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None): + def _noop_coco(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None, on_progress=None, on_progress_console=None): return subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="") monkeypatch.setattr(cli_mod, "run_cocoindex_update", _noop_coco) @@ -1017,7 +1017,7 @@ def test_reprocess_no_flag_cocoindex_failure_records_vectors_only( monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(tmp_path)) - async def fake_refresh(*, quiet: bool = False, verbose: bool = True) -> server_mod.RefreshIndexOutput: + async def fake_refresh(*, quiet: bool = False, verbose: bool = True, on_progress=None, on_progress_console=None) -> server_mod.RefreshIndexOutput: return server_mod.RefreshIndexOutput( success=False, exit_code=1, @@ -1253,7 +1253,7 @@ def _patch_pipeline_for_graph_progress(monkeypatch: pytest.MonkeyPatch, *, emit_ from java_codebase_rag import cli as _cli from java_codebase_rag import pipeline as _pipeline - def _fake_cocoindex_update(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None): + def _fake_cocoindex_update(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None, on_progress=None, on_progress_console=None): return _make_stub_completed(returncode=0) def _fake_run_build_ast_graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, on_progress=None, on_progress_console=None): diff --git a/tests/test_vectors_progress.py b/tests/test_vectors_progress.py new file mode 100644 index 0000000..8c9a754 --- /dev/null +++ b/tests/test_vectors_progress.py @@ -0,0 +1,349 @@ +"""PR-3: vectors-phase + optimize-phase index progress. + +Two tests are HEAVY-gated (``JAVA_CODEBASE_RAG_RUN_HEAVY=1``) because they run +a real ``cocoindex update`` against the bank-chat-system corpus (embedding model +cached). The remaining tests are LIGHT: they exercise the renderer against a +synthetic event stream or patch the pipeline helpers so no cocoindex/torch loads. +""" +from __future__ import annotations + +import io +import os +import re +import subprocess +import sys +from pathlib import Path + +import pytest +from rich.console import Console + +from java_codebase_rag.progress import IndexProgressRenderer, ProgressEvent + +_PREFIX = "JCIRAG_PROGRESS" + +HEAVY = os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip().lower() in ("1", "true", "yes") + + +# --------------------------------------------------------------------------- +# Light: renderer clamp-on-completion + indeterminate (tests 2, 3, 4) +# --------------------------------------------------------------------------- + + +def _renderer(*, terminal: bool = True) -> IndexProgressRenderer: + """Build a renderer over a buffer; force_terminal=True exercises the Live path.""" + buf = io.StringIO() + console = Console(file=buf, force_terminal=terminal, width=120, color_system=None) + return IndexProgressRenderer(["vectors", "optimize", "graph"], console=console) + + +def test_vectors_progress_clamps_on_completion() -> None: + """total=100, done=80, then parent status=done -> completed clamps to 100.""" + r = _renderer() + r.start() + try: + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=80, total=100, status="running", elapsed_s=None)) + assert r._progress.tasks[r._task_ids["vectors"]].completed == 80 + # Parent emits the terminal event (the flow cannot — no "all files done" + # hook). done=None here; the clamp must still snap completed to total. + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=100, status="done", elapsed_s=1.0)) + task = r._progress.tasks[r._task_ids["vectors"]] + assert task.completed == 100 + assert task.finished + finally: + r.stop() + + +def test_vectors_progress_approximate_total_overstates_then_clamps() -> None: + """Approximate pre-walk overstates total (100) but done only reaches 95; the + parent's status=done still clamps to 100 (no 95% stall).""" + r = _renderer() + r.start() + try: + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=None, total=100, status="running", elapsed_s=None)) + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=95, total=100, status="running", elapsed_s=None)) + assert r._progress.tasks[r._task_ids["vectors"]].completed == 95 + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=95, total=100, status="done", elapsed_s=42.1)) + assert r._progress.tasks[r._task_ids["vectors"]].completed == 100 + assert r._progress.tasks[r._task_ids["vectors"]].finished + finally: + r.stop() + + +def test_vectors_incremental_renders_indeterminate() -> None: + """No total event (incremental catch-up) -> task stays indeterminate (total None).""" + r = _renderer() + r.start() + try: + # Only a done tick, no total — mirrors incremental catch-up where the + # memo cache skips unchanged files and no total is knowable up front. + r.apply(ProgressEvent(kind="vectors", phase=None, pass_=None, done=3, total=None, status="running", elapsed_s=None)) + task = r._progress.tasks[r._task_ids["vectors"]] + assert task.total is None + finally: + r.stop() + + +# --------------------------------------------------------------------------- +# Light: CLI-level vectors/optimize progress (tests 6, 7) +# --------------------------------------------------------------------------- + + +def _make_stub_completed(*, returncode: int = 0, stderr: str = "") -> "subprocess.CompletedProcess[str]": + return subprocess.CompletedProcess(args=["stub"], returncode=returncode, stdout="", stderr=stderr) + + +def _patch_pipeline_for_vectors_progress(monkeypatch: pytest.MonkeyPatch, *, emit_vectors: bool) -> None: + """Patch cocoindex + graph helpers so init/increment run without heavy deps. + + When ``emit_vectors`` is True the patched cocoindex helper invokes the + caller's ``on_progress`` with a synthetic ``kind=vectors`` event — simulating + what the real subprocess drain would feed the renderer in default mode. + """ + from java_codebase_rag import cli as _cli + from java_codebase_rag import pipeline as _pipeline + + def _fake_cocoindex_update(env, *, full_reprocess, quiet, verbose=True, lance_project_root=None, on_progress=None, on_progress_console=None): + if emit_vectors and on_progress is not None: + on_progress( + ProgressEvent(kind="vectors", phase=None, pass_=None, done=10, total=130, status="running", elapsed_s=None) + ) + return _make_stub_completed(returncode=0) + + def _fake_run_build_ast_graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, on_progress=None, on_progress_console=None): + return _make_stub_completed(returncode=0) + + def _fake_run_incremental_graph(*, source_root, ladybug_path, verbose, quiet=False, env=None, on_progress=None, on_progress_console=None): + return _make_stub_completed(returncode=0) + + monkeypatch.setattr(_cli, "run_cocoindex_update", _fake_cocoindex_update) + monkeypatch.setattr(_cli, "run_build_ast_graph", _fake_run_build_ast_graph) + monkeypatch.setattr(_cli, "run_incremental_graph", _fake_run_incremental_graph) + monkeypatch.setattr(_pipeline, "run_cocoindex_update", _fake_cocoindex_update) + monkeypatch.setattr(_pipeline, "run_build_ast_graph", _fake_run_build_ast_graph) + monkeypatch.setattr(_pipeline, "run_incremental_graph", _fake_run_incremental_graph) + + +def test_cli_init_vectors_phase_progress_on_stderr( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """In default mode a vectors-phase progress event is parsed and rendered to + stderr; the raw ``JCIRAG_PROGRESS`` line is NOT echoed verbatim.""" + from java_codebase_rag import cli as cli_mod + + idx = tmp_path / "idx_vectors_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + _patch_pipeline_for_vectors_progress(monkeypatch, emit_vectors=True) + buf = io.StringIO() + import contextlib + + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx)] + ) + assert rc == 0 + err = buf.getvalue() + # The raw structured line is consumed by the parser, never raw-relayed. + assert "JCIRAG_PROGRESS kind=vectors" not in err + # But vectors-phase progress IS rendered (non-TTY concise fallback prints a + # "vectors ..." line). The synthetic event had done=10, total=130. + assert "vectors" in err.lower() + + +def test_cli_reprocess_optimize_phase_progress( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """The optimize phase renders when optimize_lance_tables emits kind=optimize. + + Patches server.run_refresh_pipeline to drive the renderer's on_progress with + a synthetic optimize event (mirrors the in-process emission). Asserts the + phase renders and the raw line is not echoed.""" + import contextlib + + from java_codebase_rag import cli as cli_mod + import server + + idx = tmp_path / "idx_optimize_prog" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + + async def _fake_refresh(*, quiet=False, verbose=True, on_progress=None, on_progress_console=None): + # Emit a synthetic optimize event (mirrors lance_optimize's in-process + # emission) so the renderer's optimize task is exercised. + if on_progress is not None: + on_progress(ProgressEvent(kind="optimize", phase=None, pass_=None, done=None, total=None, status="running", elapsed_s=None)) + on_progress(ProgressEvent(kind="optimize", phase=None, pass_=None, done=None, total=None, status="done", elapsed_s=1.2)) + from server import RefreshIndexOutput + + return RefreshIndexOutput(success=True, message=None, phases_run=["vectors", "graph"]) + + monkeypatch.setattr(server, "run_refresh_pipeline", _fake_refresh) + + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["reprocess", "--source-root", str(corpus_root), "--index-dir", str(idx)] + ) + assert rc == 0 + err = buf.getvalue() + # The raw structured line is consumed by the parser, never raw-relayed. + assert "JCIRAG_PROGRESS kind=optimize" not in err + # The optimize phase rendered (non-TTY concise fallback prints an + # "optimize done ..." line for the terminal event). + assert "optimize" in err.lower() + + +# --------------------------------------------------------------------------- +# Light: retirement guard (test 8) +# --------------------------------------------------------------------------- + + +def test_spinner_removed_and_emit_vectors_helpers_removed() -> None: + """Spinner and emit_vectors_start/_finish are no longer importable; no + remaining references anywhere in the production tree.""" + from java_codebase_rag import cli_format, cli_progress + + # The retired symbols must not be importable. + assert not hasattr(cli_format, "Spinner"), "Spinner should have been removed from cli_format" + assert not hasattr(cli_progress, "emit_vectors_start"), "emit_vectors_start should have been removed" + assert not hasattr(cli_progress, "emit_vectors_finish"), "emit_vectors_finish should have been removed" + # And no remaining references anywhere in the production tree. + repo_root = Path(__file__).resolve().parent.parent + offenders: list[str] = [] + for py in (repo_root / "java_codebase_rag").rglob("*.py"): + text = py.read_text(encoding="utf-8") + # Word-boundary match for the retired Spinner class (not rich's SpinnerColumn). + if re.search(r"\bSpinner\b", text): + offenders.append(str(py)) + server_py = repo_root / "server.py" + if server_py.is_file(): + text = server_py.read_text(encoding="utf-8") + if re.search(r"\bSpinner\b", text) or "emit_vectors_start" in text or "emit_vectors_finish" in text: + offenders.append(str(server_py)) + assert not offenders, f"retired symbols still referenced in: {offenders}" + + +# --------------------------------------------------------------------------- +# Heavy: real cocoindex flow emission + pre-walk divergence (tests 1, 5) +# --------------------------------------------------------------------------- + + +def _cocoindex_bin() -> Path: + return Path(sys.executable).parent / "cocoindex" + + +def _require_cocoindex_runtime_deps() -> None: + try: + import tree_sitter_java # noqa: F401 + except ImportError as exc: + pytest.skip(f"heavy e2e needs project deps: {exc}") + + +pytestmark_heavy = pytest.mark.skipif( + not HEAVY, + reason="set JAVA_CODEBASE_RAG_RUN_HEAVY=1 to run the cocoindex vectors-flow test", +) + + +def _run_cocoindex_update(corpus_root: Path, index_dir: Path) -> subprocess.CompletedProcess: + """Run a real ``cocoindex update --full-reprocess`` and return the result.""" + _require_cocoindex_runtime_deps() + cocoindex_bin = _cocoindex_bin() + if not cocoindex_bin.is_file(): + pytest.skip(f"cocoindex not installed in venv: {cocoindex_bin}") + bundle_dir = Path(__file__).resolve().parent.parent + flow = (bundle_dir / "java_index_flow_lancedb.py").resolve() + start = Path(corpus_root).resolve() + relp = os.path.relpath(str(flow), start=str(start)) + relp = Path(relp).as_posix() + app_spec = f"{relp}:JavaCodeIndexLance" + env = { + **os.environ, + "JAVA_CODEBASE_RAG_INDEX_DIR": str(index_dir.resolve()), + "JAVA_CODEBASE_RAG_SOURCE_ROOT": str(Path(corpus_root).resolve()), + } + return subprocess.run( + [str(cocoindex_bin), "update", app_spec, "--full-reprocess", "-f"], + cwd=str(corpus_root), + env=env, + capture_output=True, + text=True, + timeout=900, + ) + + +@pytestmark_heavy +def test_flow_emits_vectors_progress_per_file(corpus_root: Path, tmp_path: Path) -> None: + """A real ``cocoindex update`` emits ``JCIRAG_PROGRESS kind=vectors`` lines + in captured stderr: the one-shot approximate ``total=`` line from app_main + plus per-file ``done=`` ticks (the gating spike, promoted to a regression test).""" + index_dir = tmp_path / ".java-codebase-rag" + index_dir.mkdir(parents=True) + proc = _run_cocoindex_update(corpus_root, index_dir) + assert proc.returncode == 0, f"cocoindex failed: stdout={proc.stdout}\nstderr={proc.stderr}" + lines = [ln for ln in proc.stderr.splitlines() if "JCIRAG_PROGRESS kind=vectors" in ln] + assert lines, f"expected vectors progress lines, got stderr:\n{proc.stderr}" + # The one-shot approximate total line from app_main. + totals = [ln for ln in lines if "total=" in ln and "done=" not in ln] + assert totals, f"expected a one-shot total line; lines: {lines!r}" + # Per-file done ticks (throttled every ~25 files). + ticks = [ln for ln in lines if "done=" in ln] + assert ticks, f"expected per-file done ticks; lines: {lines!r}" + # Done ticks are monotonic non-decreasing. + done_vals = [int(m.group(1)) for ln in ticks if (m := re.search(r"done=(\d+)", ln))] + assert done_vals, f"could not parse done values from: {ticks!r}" + assert done_vals == sorted(done_vals), f"done ticks must be monotonic: {done_vals}" + + +@pytestmark_heavy +def test_pre_walk_total_divergence_bounded(corpus_root: Path, tmp_path: Path) -> None: + """On the fixture, the approximate pre-walk total exactly equals the number of + files actually processed (the spike measured gap == 0). + + The TRUE processed count is read from the LanceDB tables (distinct + ``filename`` values across the three tables) rather than the throttled + ``done=k`` tick stream: ticks fire only every 25th file, so ``max(done)`` + lands on a multiple of 25 and would mask the real divergence. The accepted + over-count on larger trees is the ignored / empty / undecodable file count + (the renderer clamps to total on completion regardless).""" + index_dir = tmp_path / ".java-codebase-rag" + index_dir.mkdir(parents=True) + proc = _run_cocoindex_update(corpus_root, index_dir) + assert proc.returncode == 0, f"cocoindex failed: stdout={proc.stdout}\nstderr={proc.stderr}" + lines = [ln for ln in proc.stderr.splitlines() if "JCIRAG_PROGRESS kind=vectors" in ln] + # The one-shot approximate total. + total_lines = [ln for ln in lines if "total=" in ln and "done=" not in ln] + assert total_lines, f"expected a total line; lines: {lines!r}" + total_match = re.search(r"total=(\d+)", total_lines[0]) + assert total_match, f"could not parse total from: {total_lines[0]!r}" + pre_walk_total = int(total_match.group(1)) + # The tick stream is throttled (every 25th file), so it cannot yield the + # true processed count. Read the ground truth from the LanceDB tables: + # distinct filename values across the three tables the flow populated. + import lancedb + + db = lancedb.connect(str(index_dir.resolve())) + actual_done = 0 + for tname in ("javacodeindex_java_code", "sqlschemaindex_sql_schema", "yamlconfigindex_yaml_config"): + try: + tbl = db.open_table(tname) + except Exception as exc: # pragma: no cover - table missing only on a broken flow + raise AssertionError(f"Lance table {tname} missing after flow: {exc}") from exc + rows = tbl.search().select(["filename"]).limit(1_000_000).to_list() + actual_done += len({r["filename"] for r in rows if r.get("filename") is not None}) + # On this fixture the pre-walk matches exactly (gap == 0): all counted + # files are non-empty / decodable / not-ignored, and the YAML predicate + # was fixed to include application*.yml. The accepted over-count on larger + # trees is the ignored / empty file count (the renderer clamps regardless). + gap = pre_walk_total - actual_done + assert gap >= 0, ( + f"pre-walk total {pre_walk_total} < actual done {actual_done} (under-count: " + f"a matcher predicate is dropping files the flow still processes)" + ) + assert gap == 0, ( + f"pre-walk over-count on fixture: pre_walk_total={pre_walk_total} " + f"actual_done={actual_done} gap={gap} (expected 0; the over-count is the " + f"ignored/empty/undecodable file count — the renderer clamps regardless)" + )