Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions java_codebase_rag/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ def _run_with_pipeline_progress(
return int(work(None))
from java_codebase_rag.progress import IndexProgressRenderer, ProgressEvent

# PR-2 owns the graph task only; vectors/optimize stay pending (PR-3).
phases = ["graph"]
# PR-3 owns all three tasks in order: Vectors → Optimize → Graph. The vectors
# task is fed by the cocoindex child's per-file ticks + approximate total
# (subprocess transport, parsed by ProgressRelay); the optimize task is fed
# in-process by lance_optimize; the graph task is fed by the build_ast_graph
# child (subprocess transport). A task only becomes visible/running once its
# first event arrives.
phases = ["vectors", "optimize", "graph"]
renderer = IndexProgressRenderer(phases)
progress = PipelineProgress(renderer=renderer)

Expand Down Expand Up @@ -318,6 +323,8 @@ def work(progress: "PipelineProgress | None") -> int:
quiet=bool(args.quiet),
verbose=verbose,
lance_project_root=None if args.quiet else cfg.source_root,
on_progress=progress.on_progress if progress is not None else None,
on_progress_console=progress.console if progress is not None else None,
)
if coco.returncode != 0:
_emit(
Expand Down Expand Up @@ -378,6 +385,8 @@ def work(progress: "PipelineProgress | None") -> int:
quiet=bool(args.quiet),
verbose=bool(args.verbose),
lance_project_root=None if args.quiet else cfg.source_root,
on_progress=progress.on_progress if progress is not None else None,
on_progress_console=progress.console if progress is not None else None,
)
if coco.returncode != 0:
_emit(
Expand Down Expand Up @@ -455,7 +464,11 @@ def work(progress: "PipelineProgress | None") -> int:
graph_only = bool(getattr(args, "graph_only", False))

if vectors_only:
coco = run_cocoindex_update(env, full_reprocess=True, quiet=bool(args.quiet), verbose=verbose)
coco = run_cocoindex_update(
env, full_reprocess=True, quiet=bool(args.quiet), verbose=verbose,
on_progress=progress.on_progress if progress is not None else None,
on_progress_console=progress.console if progress is not None else None,
)
if _is_cocoindex_preflight_blocker(coco):
payload: dict[str, Any] = {
"success": False,
Expand Down Expand Up @@ -530,7 +543,14 @@ def work(progress: "PipelineProgress | None") -> int:

import server # lazy: pulls sentence_transformers/torch/lancedb/kuzu

result = asyncio.run(server.run_refresh_pipeline(quiet=bool(args.quiet), verbose=verbose))
result = asyncio.run(
server.run_refresh_pipeline(
quiet=bool(args.quiet),
verbose=verbose,
on_progress=progress.on_progress if progress is not None else None,
on_progress_console=progress.console if progress is not None else None,
)
)
payload = result.model_dump()
_emit_reprocess_outcome(payload)
return _reprocess_exit_code(payload)
Expand Down
35 changes: 0 additions & 35 deletions java_codebase_rag/cli_format.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
"""TTY-aware ANSI formatting for CLI stderr progress."""
from __future__ import annotations

import itertools
import sys
import threading
import time

_RESET = "\033[0m"
_BOLD = "\033[1m"
Expand All @@ -16,8 +13,6 @@
CHECK = "✓"
CROSS = "✗"

_SPINNER_FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"

_NOISE_CONTAINS: tuple[bytes, ...] = (
b"lance::",
b"FutureWarning",
Expand Down Expand Up @@ -88,33 +83,3 @@ def styled_check() -> str:

def styled_cross() -> str:
return red(CROSS) if stderr_is_tty() else CROSS


class Spinner:
"""Braille spinner that overwrites the current stderr line until stopped."""

def __init__(self, label: str) -> None:
self._label = label
self._stop = threading.Event()
self._thread: threading.Thread | None = None

def start(self) -> None:
self._thread = threading.Thread(target=self._run, name="spinner", daemon=True)
self._thread.start()

def stop(self) -> None:
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=2.0)
sys.stderr.buffer.write(b"\r\x1b[2K")
sys.stderr.buffer.flush()

def _run(self) -> None:
frames = itertools.cycle(_SPINNER_FRAMES)
t0 = time.monotonic()
while not self._stop.wait(0.3):
elapsed = time.monotonic() - t0
frame = next(frames)
line = f"\r{frame} {self._label} · {elapsed:.0f}s"
sys.stderr.buffer.write(line.encode())
sys.stderr.buffer.flush()
20 changes: 1 addition & 19 deletions java_codebase_rag/cli_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,10 @@
import sys
from typing import Callable

from java_codebase_rag.cli_format import bold_cyan, is_noise_line, styled_check, styled_cross
from java_codebase_rag.cli_format import is_noise_line
from java_codebase_rag.progress import ProgressEvent, make_relay


def emit_vectors_start() -> None:
print(
bold_cyan("[vectors]") + " running · cocoindex update",
file=sys.stderr,
flush=True,
)


def emit_vectors_finish(*, elapsed_s: float, exit_code: int) -> None:
marker = styled_check() if exit_code == 0 else styled_cross()
print(
f"{marker} {bold_cyan('[vectors]')} finished · {elapsed_s:.2f}s"
+ (f" (exit={exit_code})" if exit_code != 0 else ""),
file=sys.stderr,
flush=True,
)


class _AsyncLineFilter:
"""Buffers byte chunks and relays only non-noise lines to stderr (async drain path)."""

Expand Down
176 changes: 125 additions & 51 deletions java_codebase_rag/lance_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@

import asyncio
import sys
import time
from pathlib import Path
from typing import Callable, Literal

# Mirrors ``ProgressStatus`` in ``progress.py``; kept local (rather than imported)
# so this module never pays the ``rich`` cost at import time — see
# ``_make_optimize_event``.
_OptimizeStatus = Literal["running", "done", "failed"]

# Single source of truth for the three Lance table names created by the flow.
# Keep in sync with ``search_lancedb.TABLES`` (the values there mirror these).
Expand All @@ -31,6 +38,33 @@
"yamlconfigindex_yaml_config",
)


def _make_optimize_event(
*,
status: _OptimizeStatus,
elapsed_s: float | None = None,
):
"""Build a ``ProgressEvent(kind="optimize", …)`` lazily (progress is parent-side).

``lance_optimize`` runs in-process in the parent (called by
``pipeline._maybe_run_serialized_optimize`` and
``server.run_refresh_pipeline``); it routes progress to the renderer via the
in-process ``on_progress`` callback — NOT via stderr (which would corrupt
the Live region). The import is local so the flow (which imports
``LANCE_TABLE_NAMES`` at definition time) never pays the ``rich`` cost.
"""
from java_codebase_rag.progress import ProgressEvent

return ProgressEvent(
kind="optimize",
phase=None,
pass_=None,
done=None,
total=None,
status=status,
elapsed_s=elapsed_s,
)

# Commit conflicts are transient; a handful of exponential-backoff retries is
# enough because, post-flow, there are no concurrent writers — only successive
# optimize/compaction passes within this single serialized call can still
Expand Down Expand Up @@ -60,7 +94,12 @@ async def _list_table_names(db: object) -> set[str]:
return set(await db.table_names())


async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict[str, str]:
async def optimize_lance_tables(
index_dir: Path,
*,
quiet: bool = False,
on_progress: Callable | None = None,
) -> dict[str, str]:
"""Optimize all known Lance tables under *index_dir*, serially, with retry.

Runs ``table.optimize()`` for each name in :data:`LANCE_TABLE_NAMES` that
Expand All @@ -73,6 +112,13 @@ async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict
index_dir: directory holding the Lance tables (the flow's LanceDB URI).
quiet: when True, suppress the per-table success/skip info lines on
stderr (errors are always logged).
on_progress: optional in-process progress callback (the parent's
renderer ``on_progress``). When given, emits
``ProgressEvent(kind="optimize", status="running")`` on entry and a
terminal ``status="done"``/``"failed"`` event on exit (covers BOTH
call sites: ``pipeline._maybe_run_serialized_optimize`` and
``server.run_refresh_pipeline``). In-process only — NEVER prints to
stderr (that would corrupt the Live region).

Returns:
Mapping of table name → status. Values are ``"ok"``, ``"skipped"``
Expand All @@ -82,67 +128,95 @@ async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict
# not pay the lancedb import cost at flow-definition time.
import lancedb

if on_progress is not None:
on_progress(_make_optimize_event(status="running"))
t0 = time.perf_counter()
results: dict[str, str] = {}
db = await lancedb.connect_async(str(index_dir))
failed = False
try:
db = await lancedb.connect_async(str(index_dir))
try:
existing = await _list_table_names(db)
except Exception as exc:
print(
f"java-codebase-rag: optimize: failed to list tables in "
f"{index_dir}: {exc}",
file=sys.stderr,
)
return {name: f"error: list failed: {exc}" for name in LANCE_TABLE_NAMES}

for name in LANCE_TABLE_NAMES:
if name not in existing:
results[name] = "skipped"
if not quiet:
print(
f"java-codebase-rag: optimize: {name} absent, skipped",
file=sys.stderr,
)
continue
try:
table = await db.open_table(name)
existing = await _list_table_names(db)
except Exception as exc:
results[name] = f"error: open failed: {exc}"
print(
f"java-codebase-rag: optimize: {name} open failed: {exc}",
f"java-codebase-rag: optimize: failed to list tables in "
f"{index_dir}: {exc}",
file=sys.stderr,
)
continue

last_exc: BaseException | None = None
for attempt in range(_MAX_ATTEMPTS):
failed = True
return {name: f"error: list failed: {exc}" for name in LANCE_TABLE_NAMES}

for name in LANCE_TABLE_NAMES:
if name not in existing:
results[name] = "skipped"
if not quiet:
print(
f"java-codebase-rag: optimize: {name} absent, skipped",
file=sys.stderr,
)
continue
try:
await table.optimize()
last_exc = None
break
table = await db.open_table(name)
except Exception as exc:
last_exc = exc
if _is_retryable(exc) and attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(_BASE_BACKOFF_S * (2**attempt))
continue
# Non-retryable, or retries exhausted: stop the loop and
# surface below — do not swallow silently.
break

if last_exc is None:
results[name] = "ok"
if not quiet:
results[name] = f"error: open failed: {exc}"
failed = True
print(
f"java-codebase-rag: optimize: {name} ok",
f"java-codebase-rag: optimize: {name} open failed: {exc}",
file=sys.stderr,
)
else:
results[name] = f"error: {last_exc}"
print(
f"java-codebase-rag: optimize: {name} failed: {last_exc}",
file=sys.stderr,
)
continue

last_exc: BaseException | None = None
for attempt in range(_MAX_ATTEMPTS):
try:
await table.optimize()
last_exc = None
break
except Exception as exc:
last_exc = exc
if _is_retryable(exc) and attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(_BASE_BACKOFF_S * (2**attempt))
continue
# Non-retryable, or retries exhausted: stop the loop and
# surface below — do not swallow silently.
break

if last_exc is None:
results[name] = "ok"
if not quiet:
print(
f"java-codebase-rag: optimize: {name} ok",
file=sys.stderr,
)
else:
results[name] = f"error: {last_exc}"
failed = True
print(
f"java-codebase-rag: optimize: {name} failed: {last_exc}",
file=sys.stderr,
)
finally:
# ``AsyncConnection.close`` is a *sync* method in lancedb 0.30.x.
db.close()
return results
except Exception:
# An unexpected exception (e.g. ``connect_async`` raised, or a table-
# independent failure) must still flip the terminal event to failed so
# the renderer's task doesn't render a green check on a crash. Re-raise
# after marking — the caller (``_maybe_run_serialized_optimize`` /
# ``run_refresh_pipeline``) treats optimize failure as non-fatal and
# logs it, but the renderer must reflect the truth.
failed = True
raise
finally:
# ``AsyncConnection.close`` is a *sync* method in lancedb 0.30.x.
db.close()
return results
# Always emit a terminal optimize event so the renderer's task never
# hangs at "running" — even on exception (the parent treats a failed
# optimize as non-fatal: the index is still searchable un-compacted).
if on_progress is not None:
on_progress(
_make_optimize_event(
status="failed" if failed else "done",
elapsed_s=time.perf_counter() - t0,
)
)
Loading
Loading