From 47e367e7655e22cc355002e8edb3d78aa92619ef Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 02:43:54 +0000 Subject: [PATCH] assembly code: black UI, live streaming, cancel keys, voice in the TUI - TUI: pure-black canvas (all surface fills #0b0e16 -> #000000). - Stream the agent turn-by-step (stream_mode="values") so tool calls, results, and reply text render live instead of all at the end; the approval/interrupt flow is preserved and request_cancel() can break the loop between steps. - Escape interrupts a running turn; Ctrl-C interrupts a running turn or, when idle, quits only on a confirmed double-press (mirrors deepagents-code's action_interrupt / action_quit_or_interrupt). - Voice now drives the TUI: a spoken turn is transcribed, entered into the prompt, and submitted; TTS reads back a code-stripped summary (spoken_summary) instead of the full reply. --no-tui keeps the voice REPL. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01FUL1Y7QWgAUDTRdQtK2qCJ --- aai_cli/code_agent/session.py | 67 +++++++++++-- aai_cli/code_agent/tui.py | 150 +++++++++++++++++++++++++++--- aai_cli/code_agent/voice.py | 24 +++++ aai_cli/commands/code/_exec.py | 28 ++++-- tests/test_code_agent.py | 43 +++++++++ tests/test_code_command.py | 31 ++++++- tests/test_code_tui.py | 75 +++++++++++++++ tests/test_code_tui_voice.py | 165 +++++++++++++++++++++++++++++++++ tests/test_code_voice.py | 32 ++++++- 9 files changed, 583 insertions(+), 32 deletions(-) create mode 100644 tests/test_code_tui_voice.py diff --git a/aai_cli/code_agent/session.py b/aai_cli/code_agent/session.py index d5d0b5d..64b6c90 100644 --- a/aai_cli/code_agent/session.py +++ b/aai_cli/code_agent/session.py @@ -9,8 +9,10 @@ from __future__ import annotations -from collections.abc import Callable +import threading +from collections.abc import Callable, Iterator, Mapping from dataclasses import dataclass, field +from typing import Protocol, runtime_checkable from aai_cli.code_agent.agent import CompiledAgent from aai_cli.code_agent.events import ( @@ -32,6 +34,20 @@ _DECLINED = "User declined to run this tool." +@runtime_checkable +class _SupportsStream(Protocol): + """An agent that can stream its run as incremental state snapshots. + + The real compiled graph supports this; the unit-test fakes that only implement + ``invoke`` don't, so :meth:`CodeSession._run` falls back to a single emit for them. + """ + + def stream( + self, graph_input: object, config: Mapping[str, object] | None, *, stream_mode: str + ) -> Iterator[dict[str, object]]: + """Yield the running state (incl. the growing ``messages``) after each super-step.""" + + @dataclass class CodeSession: """One coding conversation: a compiled agent plus the I/O seams that render it.""" @@ -42,27 +58,60 @@ class CodeSession: thread_id: str = "code" auto_approve: bool = False _seen: int = field(default=0, init=False) + _cancel: threading.Event = field( + default_factory=threading.Event, + init=False, # pragma: no mutate + ) def _config(self) -> dict[str, object]: return {"configurable": {"thread_id": self.thread_id}} + def request_cancel(self) -> None: + """Ask the running turn to stop its agent loop at the next step boundary. + + Set from another thread (the TUI's Ctrl-C / Escape); the streaming loop in + :meth:`_run` and the approval loop both check it, so a long tool sequence stops + without having to kill the worker thread mid-step. + """ + self._cancel.set() + def send(self, text: str) -> None: - """Run one user turn to completion, resolving approvals and emitting events. + """Run one user turn, resolving approvals and emitting events as each step lands. - A failure inside the graph (a gateway 5xx, a tool blowing up) is surfaced as an - ``ErrorText`` event rather than propagating — a single bad turn must not crash - the TUI worker or the REPL; the user can just try again. + Events stream out incrementally (responsive UI) and :meth:`request_cancel` can stop + the loop early. A failure inside the graph (a gateway 5xx, a tool blowing up) is + surfaced as an ``ErrorText`` event rather than propagating — a single bad turn must + not crash the TUI worker or the REPL; the user can just try again. """ + self._cancel.clear() config = self._config() try: - result = self.agent.invoke({"messages": [{"role": "user", "content": text}]}, config) - result = self._resolve_interrupts(result, config) + result = self._run({"messages": [{"role": "user", "content": text}]}, config) + self._resolve_interrupts(result, config) except KeyboardInterrupt: raise except Exception as exc: self.sink(ErrorText(f"{type(exc).__name__}: {exc}")) return + + def _run(self, graph_input: object, config: dict[str, object]) -> dict[str, object]: + """Drive one graph segment, emitting events as each step completes; return the end state. + + Streaming (``stream_mode="values"``) renders intermediate tool calls/results live and + lets :meth:`request_cancel` break the loop between steps. A double that only implements + ``invoke`` (the TUI/REPL test fakes) emits once at the end instead. + """ + if isinstance(self.agent, _SupportsStream): + last: dict[str, object] = {} + for chunk in self.agent.stream(graph_input, config, stream_mode="values"): + if self._cancel.is_set(): + break + self._emit_new(chunk) + last = chunk + return last + result = self.agent.invoke(graph_input, config) self._emit_new(result) + return result def _resolve_interrupts( self, result: dict[str, object], config: dict[str, object] @@ -71,13 +120,15 @@ def _resolve_interrupts( from langgraph.types import Command while True: + if self._cancel.is_set(): + return result request = interrupt_request(result) if request is None: return result actions = request.get("action_requests") actions = actions if isinstance(actions, list) else [] decisions = [self._decide(action) for action in actions] - result = self.agent.invoke(Command(resume={"decisions": decisions}), config) + result = self._run(Command(resume={"decisions": decisions}), config) def _decide(self, action: dict[str, object]) -> dict[str, object]: """Ask the approver about one pending tool call and shape the resume decision.""" diff --git a/aai_cli/code_agent/tui.py b/aai_cli/code_agent/tui.py index f87e75a..cb699cf 100644 --- a/aai_cli/code_agent/tui.py +++ b/aai_cli/code_agent/tui.py @@ -13,7 +13,7 @@ import threading import time from pathlib import Path -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, ClassVar, Protocol from rich.markup import escape from textual.app import App, ComposeResult @@ -27,14 +27,28 @@ from aai_cli.code_agent.ask_tool import AskBridge from aai_cli.code_agent.events import AssistantText, ErrorText, Event, ToolCall, ToolResult from aai_cli.code_agent.session import CodeSession +from aai_cli.code_agent.voice import spoken_summary +from aai_cli.core import errors if TYPE_CHECKING: - from collections.abc import Mapping + from collections.abc import Callable, Mapping from textual.timer import Timer # Glyphs cycled by the working indicator's animation (purely cosmetic). _SPIN_FRAMES = "✶✷✸✹✺" # pragma: no mutate +# Seconds the Ctrl-C "press again to quit" hint stays armed (deepagents-code uses 3s too). +_QUIT_HINT_SECONDS = 3 # pragma: no mutate + + +class _VoiceIO(Protocol): + """The speak-to-it / read-back slice the TUI drives; :class:`VoiceSession` satisfies it.""" + + def listen(self) -> str | None: + """Capture one spoken turn and return its transcript (``None`` on no speech).""" + + def speak(self, text: str) -> None: + """Read ``text`` back aloud (a no-op when readback is unavailable).""" def _format_args(args: Mapping[str, object]) -> str: @@ -91,7 +105,7 @@ class ApprovalScreen(ModalScreen[str]): ApprovalScreen { align: center bottom; background: transparent; } ApprovalScreen #approvalbox { dock: bottom; width: 1fr; height: auto; - border: round #f59e0b; background: #0b0e16; padding: 0 1; margin: 0 1 1 1; + border: round #f59e0b; background: #000000; padding: 0 1; margin: 0 1 1 1; } ApprovalScreen #approvalbox Label { height: auto; } ApprovalScreen #approvalbox Horizontal { height: auto; } @@ -139,7 +153,7 @@ class AskScreen(ModalScreen[str]): AskScreen { align: center bottom; background: transparent; } AskScreen #askbox { dock: bottom; width: 1fr; height: auto; - border: round #3a3f55; background: #0b0e16; padding: 0 1; margin: 0 1 1 1; + border: round #3a3f55; background: #000000; padding: 0 1; margin: 0 1 1 1; } """ @@ -159,27 +173,30 @@ def on_input_submitted(self, event: Input.Submitted) -> None: class CodeAgentApp(App[None]): """The coding-agent TUI: conversation transcript + prompt + approval/ask modals.""" - # Flat dark canvas — no panel borders/gray, just the bordered prompt and a status + # Flat pure-black canvas — no panel fills/gray, just the bordered prompt and a status # line, matching the deepagents-code look (wordmark in the AssemblyAI brand blue). CSS = f""" - Screen {{ background: #0b0e16; }} + Screen {{ background: #000000; }} #log {{ - height: 1fr; border: none; background: #0b0e16; padding: 1 2; + height: 1fr; border: none; background: #000000; padding: 1 2; scrollbar-size-vertical: 0; }} - #promptbar {{ dock: bottom; height: 3; background: #0b0e16; border: round #3a3f55; margin: 1 1; }} + #promptbar {{ dock: bottom; height: 3; background: #000000; border: round #3a3f55; margin: 1 1; }} #promptmark {{ width: 3; color: {banner.BRAND_HEX}; content-align: center middle; }} - #prompt {{ border: none; background: #0b0e16; padding: 0; }} + #prompt {{ border: none; background: #000000; padding: 0; }} /* In normal flow below the 1fr log, so it sits just above the docked prompt bar. */ - #spinner {{ height: 1; background: #0b0e16; padding: 0 2; + #spinner {{ height: 1; background: #000000; padding: 0 2; color: {banner.BRAND_HEX}; display: none; }} - #status {{ dock: bottom; height: 1; background: #0b0e16; padding: 0 1; }} + #status {{ dock: bottom; height: 1; background: #000000; padding: 0 1; }} """ TITLE = "AssemblyAI Code" # Ctrl-C quits (in addition to Ctrl-Q); the built-in command palette is removed. ENABLE_COMMAND_PALETTE = False + # Interrupt/quit keys follow deepagents-code: Escape interrupts the running turn, and + # Ctrl-C interrupts a running turn or — when idle — quits only on a confirmed double-press. BINDINGS: ClassVar = [ - ("ctrl+c", "quit", "Quit"), + ("escape", "interrupt", "Interrupt"), + ("ctrl+c", "quit_or_interrupt", "Interrupt / Quit"), ("ctrl+q", "quit", "Quit"), ("ctrl+y", "copy_last", "Copy last reply"), ] @@ -194,16 +211,20 @@ def __init__( thread_id: str = "default", cwd: Path | None = None, web_note: str | None = None, + voice: _VoiceIO | None = None, ) -> None: super().__init__() self._agent = agent self._ask_bridge = ask_bridge if ask_bridge is not None else AskBridge() self._auto_approve = auto_approve self._initial = initial + self._voice = voice # when set, spoken turns drive the prompt and replies are read back + self._voice_typed = False # flips once the mic is ruled out; then input is typed only self._session_name = thread_id # not _thread_id: that shadows Textual App's int self._cwd = cwd if cwd is not None else Path.cwd() self._web_note = web_note self._last_reply = "" + self._quit_pending = False # armed by a first idle Ctrl-C; a second confirms quit self._spin_frames = itertools.cycle(_SPIN_FRAMES) self._spin_timer: Timer | None = None self._turn_started = 0.0 # pragma: no mutate — always reset by _start_spinner first @@ -248,6 +269,8 @@ def on_mount(self) -> None: self.query_one("#prompt", Input).focus() if self._initial: self._submit(self._initial) + else: + self._begin_listening() # in voice mode, capture the first spoken turn # --- event rendering (always called on the UI thread) --------------------- @@ -323,6 +346,51 @@ def _ask(self, question: str) -> str: """Block the worker on a modal input screen and return the user's answer.""" return self._modal_result(AskScreen(question), default="") + # --- interrupt / quit ----------------------------------------------------- + # Mirrors deepagents-code: Escape interrupts a running turn; Ctrl-C interrupts a running + # turn or, when idle, quits only on a confirmed double-press (so it never drops the + # conversation by accident). Ctrl-Q stays an unconditional one-press quit. + + def _turn_running(self) -> bool: + """Whether an agent turn is in flight (the prompt is disabled while one runs).""" + return self.query_one("#prompt", Input).disabled + + def _cancel_turn(self) -> bool: + """Ask the session to stop its agent loop if a turn is running; True if one was. + + Cooperative: the worker keeps running until the streaming loop sees the flag at + the next step boundary, then finishes and re-enables the prompt — so we never kill + the thread mid-step (which Textual can't do safely anyway). + """ + if not self._turn_running(): + return False + self._session.request_cancel() + self.query_one("#log", RichLog).write("[dim](cancelling…)[/dim]") + return True + + def action_interrupt(self) -> None: + """Escape: interrupt a running agent turn (a no-op when idle, so Esc never quits).""" + self._cancel_turn() + + def action_quit_or_interrupt(self) -> None: + """Ctrl-C: interrupt a running turn, else quit on a confirmed second press.""" + if self._cancel_turn(): + self._quit_pending = False + return + if self._quit_pending: + self.exit() + else: + self._arm_quit_pending() + + def _arm_quit_pending(self) -> None: + """Arm Ctrl-C double-press-to-quit, showing a hint that expires after a few seconds.""" + self._quit_pending = True + self.notify("Press Ctrl-C again to quit", timeout=_QUIT_HINT_SECONDS) + self.set_timer(_QUIT_HINT_SECONDS, self._clear_quit_pending) + + def _clear_quit_pending(self) -> None: + self._quit_pending = False # pragma: no mutate — timer-fired reset; timing-unassertable + # --- input loop ----------------------------------------------------------- def on_input_submitted(self, event: Input.Submitted) -> None: @@ -370,3 +438,61 @@ def on_worker_state_changed(self, event: Worker.StateChanged) -> None: prompt = self.query_one("#prompt", Input) prompt.disabled = False prompt.focus() + self._voice_followup() # read a spoken summary back, then listen for the next turn + + # --- voice (speak-to-it / read-summary-back; the legs run off the UI thread) ---- + + def _voice_active(self) -> bool: + """Voice capture is on: a session exists and the mic hasn't been ruled out yet.""" + return self._voice is not None and not self._voice_typed + + def _spawn(self, target: Callable[[], None]) -> None: + """Run ``target`` on a daemon thread — voice legs block, so they stay off the UI thread.""" + threading.Thread(target=target, daemon=True).start() # pragma: no mutate + + def _begin_listening(self) -> None: + """Capture the next spoken turn on a background thread (no-op when voice is off).""" + if not self._voice_active(): + return + self._spawn(self._capture_voice_turn) + + def _voice_followup(self) -> None: + """After a turn finishes: read back a spoken summary, then listen for the next turn.""" + voice = self._voice + if voice is None: + return + self._spawn(lambda: self._speak_then_listen(voice)) + + def _speak_then_listen(self, voice: _VoiceIO) -> None: + """Read a summary of the last reply aloud (no code), then capture the next spoken turn.""" + voice.speak(spoken_summary(self._last_reply)) + self._capture_voice_turn() + + def _capture_voice_turn(self) -> None: + """Listen for one spoken turn; enter it into the prompt, or degrade to typing.""" + voice = self._voice + if voice is None or self._voice_typed: + return + try: + transcript = voice.listen() + except errors.CLIError as exc: + # A capture failure (no mic, STT error) drops voice for the rest of the session + # rather than wedging it — the user just types instead. + self._voice_typed = True + self.call_from_thread(self._notice_voice_off, exc.message) + return + if transcript: + self.call_from_thread(self._enter_and_submit, transcript) + + def _notice_voice_off(self, detail: str) -> None: + """Tell the user voice input stopped and that input is now typed (UI thread).""" + self.query_one("#log", RichLog).write( + f"[dim](voice input off: {escape(detail)}; type your request instead)[/dim]" + ) + + def _enter_and_submit(self, text: str) -> None: + """Show the spoken text in the prompt, then submit it as a turn (UI thread).""" + prompt = self.query_one("#prompt", Input) + prompt.value = text + self._submit(text) + prompt.value = "" diff --git a/aai_cli/code_agent/voice.py b/aai_cli/code_agent/voice.py index 3bff39d..c4eae22 100644 --- a/aai_cli/code_agent/voice.py +++ b/aai_cli/code_agent/voice.py @@ -11,6 +11,7 @@ from __future__ import annotations +import re import threading from collections.abc import Callable, Iterable, Iterator from dataclasses import dataclass @@ -37,6 +38,29 @@ # `assembly stream` and `assembly agent-cascade` use. _SPEECH_MODEL = "u3-rt-pro" +# Reading code aloud over TTS is useless, so the readback speaks only the prose. These +# strip fenced and inline code, and the spoken summary is capped so a long reply stays brief. +_FENCED_CODE = re.compile(r"```.*?```", re.DOTALL) +_INLINE_CODE = re.compile(r"`[^`]+`") +_MAX_SPOKEN_CHARS = 600 # pragma: no mutate — a cosmetic cap on how much prose is read aloud +_ALL_CODE_READBACK = "I've updated the code — see the transcript for the details." + + +def spoken_summary(text: str) -> str: + """Reduce an assistant reply to the prose worth reading aloud. + + Drops fenced and inline code, collapses whitespace, and caps the length. When the reply + was essentially all code (nothing but blocks), returns a short generic note so the + readback still says *something* rather than going silent. + """ + prose = _INLINE_CODE.sub(" ", _FENCED_CODE.sub(" ", text)) + prose = " ".join(prose.split()).strip() + if not prose: + return _ALL_CODE_READBACK + if len(prose) > _MAX_SPOKEN_CHARS: + return prose[:_MAX_SPOKEN_CHARS].rstrip() + "…" + return prose + class Microphone(Protocol): """The microphone slice the listen loop drives: an iterable of PCM at a known rate.""" diff --git a/aai_cli/commands/code/_exec.py b/aai_cli/commands/code/_exec.py index a912053..1628647 100644 --- a/aai_cli/commands/code/_exec.py +++ b/aai_cli/commands/code/_exec.py @@ -32,7 +32,12 @@ from aai_cli.code_agent.session import CodeSession, EventSink, run_repl from aai_cli.code_agent.skills import build_skills_middleware from aai_cli.code_agent.store import build_checkpointer -from aai_cli.code_agent.voice import AUDIO_ERROR_TYPES, VoiceSession, build_voice_session +from aai_cli.code_agent.voice import ( + AUDIO_ERROR_TYPES, + VoiceSession, + build_voice_session, + spoken_summary, +) from aai_cli.code_agent.web_search import TAVILY_API_KEY_ENV, build_web_search_tool from aai_cli.core import env, errors, stdio from aai_cli.ui import output @@ -139,11 +144,18 @@ def _web_note(opts: CodeOptions) -> str | None: return None -def _run_tui(agent: CompiledAgent, opts: CodeOptions, bridge: AskBridge) -> None: +def _run_tui( + agent: CompiledAgent, + opts: CodeOptions, + bridge: AskBridge, + *, + voice: VoiceSession | None = None, +) -> None: from aai_cli.code_agent.tui import CodeAgentApp # mouse=False leaves terminal mouse reporting off, so native text selection (and # copy/paste) works in the transcript and prompt; the UI is fully keyboard-driven. + # ``voice`` (when set) routes spoken turns into the prompt and reads summaries back. CodeAgentApp( agent=agent, ask_bridge=bridge, @@ -152,6 +164,7 @@ def _run_tui(agent: CompiledAgent, opts: CodeOptions, bridge: AskBridge) -> None thread_id=opts.session, cwd=opts.root_dir.resolve(), web_note=_web_note(opts), + voice=voice, ).run(mouse=False) @@ -193,12 +206,12 @@ def _announce_voice(renderer: RichRenderer, voice: VoiceSession) -> None: def _voice_sink(renderer: RichRenderer, voice: VoiceSession) -> EventSink: - """Render every event, and read the assistant's natural-language text back aloud.""" + """Render every event, and read a spoken *summary* of each reply back aloud (no code).""" def sink(event: Event) -> None: renderer(event) if isinstance(event, AssistantText): - voice.speak(event.text) + voice.speak(spoken_summary(event.text)) return sink @@ -255,8 +268,11 @@ def run_code(opts: CodeOptions, state: AppState, *, json_mode: bool) -> None: agent = _build_agent(api_key, opts, bridge) interactive = stdio.stdout_is_tty() and stdio.stdin_is_tty() try: - if opts.voice and interactive: - _run_voice(agent, opts, bridge, api_key) + if opts.voice and opts.tui and interactive: + # The default: spoken turns are entered into the TUI prompt; summaries read back. + _run_tui(agent, opts, bridge, voice=build_voice_session(api_key)) + elif opts.voice and interactive: + _run_voice(agent, opts, bridge, api_key) # --no-tui: the plain voice REPL elif opts.tui and interactive: _run_tui(agent, opts, bridge) else: diff --git a/tests/test_code_agent.py b/tests/test_code_agent.py index 8e92047..739285b 100644 --- a/tests/test_code_agent.py +++ b/tests/test_code_agent.py @@ -337,6 +337,49 @@ def invoke(self, *a, **k): assert any(isinstance(e, ErrorText) and "gateway 500" in e.text for e in seen) +class StreamingAgent: + """A double exercising the streaming path: yields scripted state snapshots.""" + + def __init__(self, chunks: list[dict[str, object]]) -> None: + self._chunks = chunks + + def stream(self, graph_input, config=None, *, stream_mode="values"): + del graph_input, config, stream_mode + yield from self._chunks + + def invoke(self, *a, **k): # the streaming branch is taken, so invoke is never used + raise AssertionError("a streaming agent must not be invoked") + + +def test_send_streams_each_step_and_cancel_stops_the_loop() -> None: + from langchain_core.messages import HumanMessage + + # Three successive graph states (messages grow by one each step); a stream_mode="values" + # graph yields exactly these snapshots, so the session must emit incrementally. + chunks: list[dict[str, object]] = [ + {"messages": [HumanMessage("go")]}, + {"messages": [HumanMessage("go"), AIMessage("first")]}, + {"messages": [HumanMessage("go"), AIMessage("first"), AIMessage("second")]}, + ] + seen: list[object] = [] + session = CodeSession( + agent=StreamingAgent(chunks), sink=seen.append, approver=lambda n, a: True + ) + + def sink(event: object) -> None: + seen.append(event) + if isinstance(event, AssistantText) and event.text == "first": + session.request_cancel() # cancel mid-stream, before the "second" chunk is consumed + + session.sink = sink + session.send("go") + + texts = [e.text for e in seen if isinstance(e, AssistantText)] + # "first" streamed out as its step landed; the cancel then broke the loop, so the later + # "second" step was never emitted — proving both incremental rendering and cancellation. + assert texts == ["first"] + + def test_session_propagates_keyboard_interrupt() -> None: class Stop: def invoke(self, *a, **k): diff --git a/tests/test_code_command.py b/tests/test_code_command.py index 4f8e79c..a4384db 100644 --- a/tests/test_code_command.py +++ b/tests/test_code_command.py @@ -44,19 +44,39 @@ def test_command_parses_flags_into_options(monkeypatch): assert opts.session == "s1" and opts.persist is False -def test_run_code_dispatches_to_voice_by_default_when_tty(monkeypatch): +def test_run_code_dispatches_to_tui_with_voice_by_default_when_tty(monkeypatch): + # The default (voice + tui in a TTY) now routes voice *into* the TUI: spoken turns are + # entered into the prompt there, rather than running the separate voice REPL. calls = {} monkeypatch.setattr(_exec, "_build_agent", lambda key, opts, bridge: "AGENT") + monkeypatch.setattr(_exec, "build_voice_session", lambda key: f"VOICE:{key}") monkeypatch.setattr( - _exec, "_run_voice", lambda agent, opts, bridge, key: calls.update(voice=(agent, key)) + _exec, "_run_tui", lambda agent, opts, bridge, *, voice: calls.update(tui=(agent, voice)) ) - monkeypatch.setattr(_exec, "_run_tui", lambda *a: calls.update(tui=True)) + monkeypatch.setattr(_exec, "_run_voice", lambda *a: calls.update(voice=True)) monkeypatch.setattr(_exec, "_run_repl", lambda *a: calls.update(repl=True)) monkeypatch.setattr("aai_cli.core.stdio.stdout_is_tty", lambda: True) monkeypatch.setattr("aai_cli.core.stdio.stdin_is_tty", lambda: True) state = SimpleNamespace(resolve_api_key=lambda: "k") _exec.run_code(_opts(), state, json_mode=False) + assert calls == {"tui": ("AGENT", "VOICE:k")} # voice session handed to the TUI + + +def test_run_code_uses_voice_repl_when_tui_off(monkeypatch): + # --no-tui keeps the plain voice REPL (speak, hear the reply) instead of the TUI. + calls = {} + monkeypatch.setattr(_exec, "_build_agent", lambda key, opts, bridge: "AGENT") + monkeypatch.setattr( + _exec, "_run_voice", lambda agent, opts, bridge, key: calls.update(voice=(agent, key)) + ) + monkeypatch.setattr(_exec, "_run_tui", lambda *a, **k: calls.update(tui=True)) + monkeypatch.setattr(_exec, "_run_repl", lambda *a: calls.update(repl=True)) + monkeypatch.setattr("aai_cli.core.stdio.stdout_is_tty", lambda: True) + monkeypatch.setattr("aai_cli.core.stdio.stdin_is_tty", lambda: True) + state = SimpleNamespace(resolve_api_key=lambda: "k") + + _exec.run_code(_opts(tui=False), state, json_mode=False) assert calls == {"voice": ("AGENT", "k")} @@ -110,10 +130,11 @@ def test_run_code_maps_keyboard_interrupt_to_exit_130(monkeypatch): monkeypatch.setattr("aai_cli.core.stdio.stdout_is_tty", lambda: True) monkeypatch.setattr("aai_cli.core.stdio.stdin_is_tty", lambda: True) - def boom(*a): + def boom(*a, **k): raise KeyboardInterrupt - monkeypatch.setattr(_exec, "_run_voice", boom) + monkeypatch.setattr(_exec, "build_voice_session", lambda key: "VOICE") + monkeypatch.setattr(_exec, "_run_tui", boom) # the default front-end in a TTY state = SimpleNamespace(resolve_api_key=lambda: "k") with pytest.raises(typer.Exit) as exc: diff --git a/tests/test_code_tui.py b/tests/test_code_tui.py index c444cf3..df36ed0 100644 --- a/tests/test_code_tui.py +++ b/tests/test_code_tui.py @@ -269,6 +269,81 @@ async def go() -> None: _run(go()) +def test_escape_interrupts_a_running_turn() -> None: + # While a turn is in flight (prompt disabled), Escape signals the session to stop its + # agent loop; it never quits the app. Drives the real "escape" binding end to end. + async def go() -> None: + app = CodeAgentApp(agent=FakeAgent([])) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.query_one("#prompt", Input).disabled = True # simulate a turn in progress + await pilot.press("escape") + await pilot.pause() + assert app._session._cancel.is_set() # the loop was asked to stop + + _run(go()) + + +def test_escape_is_a_noop_when_idle() -> None: + # Idle (prompt enabled): Escape does nothing — no cancel signal, no quit. + async def go() -> None: + app = CodeAgentApp(agent=FakeAgent([])) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.action_interrupt() # idle: nothing to interrupt + assert app._session._cancel.is_set() is False + + _run(go()) + + +def test_ctrl_c_interrupts_running_turn_and_does_not_arm_quit( + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def go() -> None: + app = CodeAgentApp(agent=FakeAgent([])) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + exited: list[bool] = [] + monkeypatch.setattr(app, "exit", lambda *a, **k: exited.append(True)) + app.query_one("#prompt", Input).disabled = True # a turn is running + app.action_quit_or_interrupt() + assert app._session._cancel.is_set() # interrupted the turn + assert exited == [] # did NOT quit, because a turn was in flight + assert app._quit_pending is False # interrupting never arms the quit hint + + _run(go()) + + +def test_ctrl_c_needs_a_double_press_to_quit_when_idle(monkeypatch: pytest.MonkeyPatch) -> None: + async def go() -> None: + app = CodeAgentApp(agent=FakeAgent([])) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + exited: list[bool] = [] + monkeypatch.setattr(app, "exit", lambda *a, **k: exited.append(True)) + app.action_quit_or_interrupt() # first idle press: arms, does not quit + assert exited == [] + assert app._quit_pending is True + app.action_quit_or_interrupt() # second press confirms the quit + assert exited == [True] + assert app._session._cancel.is_set() is False # nothing was cancelled + + _run(go()) + + +def test_clear_quit_pending_resets_the_flag() -> None: + # The timer-fired reset (covered directly since the timer won't fire within the test). + async def go() -> None: + app = CodeAgentApp(agent=FakeAgent([])) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app._quit_pending = True + app._clear_quit_pending() + assert app._quit_pending is False + + _run(go()) + + def test_spinner_text_formats_frame_and_elapsed() -> None: assert tui._spinner_text(46, "✶") == "✶ Working… (46s)" assert tui._spinner_text(0, "✷") == "✷ Working… (0s)" diff --git a/tests/test_code_tui_voice.py b/tests/test_code_tui_voice.py new file mode 100644 index 0000000..8adbaea --- /dev/null +++ b/tests/test_code_tui_voice.py @@ -0,0 +1,165 @@ +"""Tests for the `assembly code` TUI's voice integration. + +Drives the real Textual app (headless) with a fake agent and a scripted voice double, so +the listen→enter-into-the-prompt→submit cycle and the spoken-summary readback are exercised +without a microphone, speaker, or socket. Split from test_code_tui.py to keep each file under +the 500-line gate. +""" + +from __future__ import annotations + +import asyncio + +import pytest +from langchain_core.messages import AIMessage, HumanMessage +from textual.widgets import Input + +from aai_cli.code_agent.tui import CodeAgentApp +from aai_cli.core.errors import CLIError + + +class FakeAgent: + """Replays scripted invoke() results so a turn can complete without a model.""" + + def __init__(self, results: list[dict[str, object]]) -> None: + self._results = results + self.calls = 0 + + def invoke(self, *args, **kwargs): + result = self._results[self.calls] + self.calls += 1 + return result + + +class FakeVoice: + """A scripted voice I/O double: listen() replays transcripts, speak() records text.""" + + def __init__(self, transcripts: list[str] | None = None, *, error: CLIError | None = None): + self._transcripts = list(transcripts or []) + self._error = error + self.spoken: list[str] = [] + self.listens = 0 + + def listen(self) -> str | None: + self.listens += 1 + if self._error is not None: + raise self._error + return self._transcripts.pop(0) if self._transcripts else None + + def speak(self, text: str) -> None: + self.spoken.append(text) + + +def _run(coro) -> None: + asyncio.run(coro) + + +def _wait_until(pilot, predicate): + """Pump the event loop until ``predicate`` holds (lets a voice worker thread land).""" + + async def loop() -> bool: + for _ in range(200): + await pilot.pause(0.01) + if predicate(): + return True + return False + + return loop() + + +def test_voice_active_requires_a_session_and_an_available_mic() -> None: + async def go() -> None: + no_voice = CodeAgentApp(agent=FakeAgent([])) + async with no_voice.run_test(size=(100, 30)) as pilot: + await pilot.pause() + assert no_voice._voice_active() is False # no voice session at all + + app = CodeAgentApp(agent=FakeAgent([]), voice=FakeVoice()) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + assert app._voice_active() is True + app._voice_typed = True + assert app._voice_active() is False # mic ruled out -> inactive + + _run(go()) + + +def test_enter_and_submit_fills_prompt_then_clears_and_submits( + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def go() -> None: + app = CodeAgentApp(agent=FakeAgent([]), voice=FakeVoice()) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + submitted: list[str] = [] + monkeypatch.setattr(app, "_submit", submitted.append) + app._enter_and_submit("add a verbose flag") + assert submitted == ["add a verbose flag"] # the spoken turn was submitted + assert app.query_one("#prompt", Input).value == "" # prompt cleared afterwards + + _run(go()) + + +def test_voice_on_mount_listens_and_submits_the_spoken_turn() -> None: + async def go() -> None: + agent = FakeAgent([{"messages": [HumanMessage("do x"), AIMessage("done")]}]) + voice = FakeVoice(transcripts=["do x"]) + app = CodeAgentApp(agent=agent, voice=voice) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + # on_mount (no initial prompt) starts listening; the captured turn drives the agent. + assert await _wait_until(pilot, lambda: agent.calls >= 1) + assert voice.listens >= 1 + + _run(go()) + + +def test_capture_voice_turn_is_a_noop_once_typed() -> None: + async def go() -> None: + voice = FakeVoice(transcripts=["ignored"]) + app = CodeAgentApp(agent=FakeAgent([]), voice=voice) + app._voice_typed = True # set before mount so on_mount never auto-listens + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app._capture_voice_turn() # typed -> returns before listen (safe on the UI thread) + assert voice.listens == 0 + + _run(go()) + + +def test_voice_degrades_to_typed_on_capture_error() -> None: + async def go() -> None: + voice = FakeVoice(error=CLIError("no mic", error_type="mic_missing", exit_code=2)) + app = CodeAgentApp(agent=FakeAgent([]), voice=voice) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + assert await _wait_until(pilot, lambda: app._voice_typed) + assert app._voice_typed is True # a capture failure drops voice for the session + + _run(go()) + + +def test_voice_followup_reads_a_summary_of_the_last_reply() -> None: + async def go() -> None: + voice = FakeVoice() + app = CodeAgentApp(agent=FakeAgent([]), voice=voice) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app._voice_typed = True # isolate the readback: the post-speak listen is a no-op + app._last_reply = "Here is the plan.\n```py\ncode\n```" + app._voice_followup() + assert await _wait_until(pilot, lambda: bool(voice.spoken)) + assert voice.spoken == ["Here is the plan."] # summary only — the code is stripped + + _run(go()) + + +def test_voice_followup_is_a_noop_without_voice() -> None: + async def go() -> None: + app = CodeAgentApp(agent=FakeAgent([])) # no voice session + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app._voice_followup() # returns immediately without speaking or listening + assert app._voice is None + + _run(go()) diff --git a/tests/test_code_voice.py b/tests/test_code_voice.py index 7fbb801..517b0f4 100644 --- a/tests/test_code_voice.py +++ b/tests/test_code_voice.py @@ -10,7 +10,7 @@ from types import SimpleNamespace from aai_cli.code_agent import voice as voicemod -from aai_cli.code_agent.voice import VoiceSession, build_voice_session +from aai_cli.code_agent.voice import VoiceSession, build_voice_session, spoken_summary class FakeMic: @@ -110,6 +110,36 @@ def boom(*a, **k): blank.speak(" ") # blank text -> no synthesis +def test_spoken_summary_strips_code_and_keeps_prose(): + text = ( + "Here's the fix.\n\n```python\ndef f():\n return 1\n```\n\n" + "Call it with `f()` when ready." + ) + summary = spoken_summary(text) + # The fenced block and the inline `f()` are gone; only the prose is read aloud. + assert "def f" not in summary and "return 1" not in summary + assert "`" not in summary + assert summary == "Here's the fix. Call it with when ready." + + +def test_spoken_summary_falls_back_when_reply_is_all_code(): + # A reply that is nothing but a code block leaves no prose -> a generic spoken note, + # never an empty utterance. + assert spoken_summary("```\nprint('hi')\n```") == voicemod._ALL_CODE_READBACK + + +def test_spoken_summary_truncates_long_prose(): + long_prose = "word " * 400 # far over the cap + summary = spoken_summary(long_prose) + assert summary.endswith("…") + assert len(summary) <= voicemod._MAX_SPOKEN_CHARS + 1 # capped prose plus the ellipsis + + +def test_spoken_summary_leaves_short_prose_unchanged(): + # Below the cap: returned verbatim, with no truncation ellipsis appended. + assert spoken_summary("Done — added the flag.") == "Done — added the flag." + + def test_build_voice_session_readback_tracks_tts_availability(monkeypatch): monkeypatch.setattr(voicemod.tts_session, "is_available", lambda: True) assert build_voice_session("k").readback is True