Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions aai_cli/code_agent/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

from __future__ import annotations

from collections.abc import Callable
import threading
from collections.abc import Callable, Iterator, Mapping
from dataclasses import dataclass, field
from typing import Protocol, runtime_checkable

from aai_cli.code_agent.agent import CompiledAgent
from aai_cli.code_agent.events import (
Expand All @@ -32,6 +34,20 @@
_DECLINED = "User declined to run this tool."


@runtime_checkable
class _SupportsStream(Protocol):
"""An agent that can stream its run as incremental state snapshots.

The real compiled graph supports this; the unit-test fakes that only implement
``invoke`` don't, so :meth:`CodeSession._run` falls back to a single emit for them.
"""

def stream(
self, graph_input: object, config: Mapping[str, object] | None, *, stream_mode: str
) -> Iterator[dict[str, object]]:
"""Yield the running state (incl. the growing ``messages``) after each super-step."""


@dataclass
class CodeSession:
"""One coding conversation: a compiled agent plus the I/O seams that render it."""
Expand All @@ -42,27 +58,60 @@ class CodeSession:
thread_id: str = "code"
auto_approve: bool = False
_seen: int = field(default=0, init=False)
_cancel: threading.Event = field(
default_factory=threading.Event,
init=False, # pragma: no mutate
)

def _config(self) -> dict[str, object]:
return {"configurable": {"thread_id": self.thread_id}}

def request_cancel(self) -> None:
"""Ask the running turn to stop its agent loop at the next step boundary.

Set from another thread (the TUI's Ctrl-C / Escape); the streaming loop in
:meth:`_run` and the approval loop both check it, so a long tool sequence stops
without having to kill the worker thread mid-step.
"""
self._cancel.set()

def send(self, text: str) -> None:
"""Run one user turn to completion, resolving approvals and emitting events.
"""Run one user turn, resolving approvals and emitting events as each step lands.

A failure inside the graph (a gateway 5xx, a tool blowing up) is surfaced as an
``ErrorText`` event rather than propagating — a single bad turn must not crash
the TUI worker or the REPL; the user can just try again.
Events stream out incrementally (responsive UI) and :meth:`request_cancel` can stop
the loop early. A failure inside the graph (a gateway 5xx, a tool blowing up) is
surfaced as an ``ErrorText`` event rather than propagating — a single bad turn must
not crash the TUI worker or the REPL; the user can just try again.
"""
self._cancel.clear()
config = self._config()
try:
result = self.agent.invoke({"messages": [{"role": "user", "content": text}]}, config)
result = self._resolve_interrupts(result, config)
result = self._run({"messages": [{"role": "user", "content": text}]}, config)
self._resolve_interrupts(result, config)
except KeyboardInterrupt:
raise
except Exception as exc:
self.sink(ErrorText(f"{type(exc).__name__}: {exc}"))
return

def _run(self, graph_input: object, config: dict[str, object]) -> dict[str, object]:
"""Drive one graph segment, emitting events as each step completes; return the end state.

Streaming (``stream_mode="values"``) renders intermediate tool calls/results live and
lets :meth:`request_cancel` break the loop between steps. A double that only implements
``invoke`` (the TUI/REPL test fakes) emits once at the end instead.
"""
if isinstance(self.agent, _SupportsStream):
last: dict[str, object] = {}
for chunk in self.agent.stream(graph_input, config, stream_mode="values"):
if self._cancel.is_set():
break
self._emit_new(chunk)
last = chunk
return last
result = self.agent.invoke(graph_input, config)
self._emit_new(result)
return result

def _resolve_interrupts(
self, result: dict[str, object], config: dict[str, object]
Expand All @@ -71,13 +120,15 @@ def _resolve_interrupts(
from langgraph.types import Command

while True:
if self._cancel.is_set():
return result
request = interrupt_request(result)
if request is None:
return result
actions = request.get("action_requests")
actions = actions if isinstance(actions, list) else []
decisions = [self._decide(action) for action in actions]
result = self.agent.invoke(Command(resume={"decisions": decisions}), config)
result = self._run(Command(resume={"decisions": decisions}), config)

def _decide(self, action: dict[str, object]) -> dict[str, object]:
"""Ask the approver about one pending tool call and shape the resume decision."""
Expand Down
150 changes: 138 additions & 12 deletions aai_cli/code_agent/tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import threading
import time
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, Protocol

from rich.markup import escape
from textual.app import App, ComposeResult
Expand All @@ -27,14 +27,28 @@
from aai_cli.code_agent.ask_tool import AskBridge
from aai_cli.code_agent.events import AssistantText, ErrorText, Event, ToolCall, ToolResult
from aai_cli.code_agent.session import CodeSession
from aai_cli.code_agent.voice import spoken_summary
from aai_cli.core import errors

if TYPE_CHECKING:
from collections.abc import Mapping
from collections.abc import Callable, Mapping

from textual.timer import Timer

# Glyphs cycled by the working indicator's animation (purely cosmetic).
_SPIN_FRAMES = "✶✷✸✹✺" # pragma: no mutate
# Seconds the Ctrl-C "press again to quit" hint stays armed (deepagents-code uses 3s too).
_QUIT_HINT_SECONDS = 3 # pragma: no mutate


class _VoiceIO(Protocol):
"""The speak-to-it / read-back slice the TUI drives; :class:`VoiceSession` satisfies it."""

def listen(self) -> str | None:
"""Capture one spoken turn and return its transcript (``None`` on no speech)."""

def speak(self, text: str) -> None:
"""Read ``text`` back aloud (a no-op when readback is unavailable)."""


def _format_args(args: Mapping[str, object]) -> str:
Expand Down Expand Up @@ -91,7 +105,7 @@ class ApprovalScreen(ModalScreen[str]):
ApprovalScreen { align: center bottom; background: transparent; }
ApprovalScreen #approvalbox {
dock: bottom; width: 1fr; height: auto;
border: round #f59e0b; background: #0b0e16; padding: 0 1; margin: 0 1 1 1;
border: round #f59e0b; background: #000000; padding: 0 1; margin: 0 1 1 1;
}
ApprovalScreen #approvalbox Label { height: auto; }
ApprovalScreen #approvalbox Horizontal { height: auto; }
Expand Down Expand Up @@ -139,7 +153,7 @@ class AskScreen(ModalScreen[str]):
AskScreen { align: center bottom; background: transparent; }
AskScreen #askbox {
dock: bottom; width: 1fr; height: auto;
border: round #3a3f55; background: #0b0e16; padding: 0 1; margin: 0 1 1 1;
border: round #3a3f55; background: #000000; padding: 0 1; margin: 0 1 1 1;
}
"""

Expand All @@ -159,27 +173,30 @@ def on_input_submitted(self, event: Input.Submitted) -> None:
class CodeAgentApp(App[None]):
"""The coding-agent TUI: conversation transcript + prompt + approval/ask modals."""

# Flat dark canvas — no panel borders/gray, just the bordered prompt and a status
# Flat pure-black canvas — no panel fills/gray, just the bordered prompt and a status
# line, matching the deepagents-code look (wordmark in the AssemblyAI brand blue).
CSS = f"""
Screen {{ background: #0b0e16; }}
Screen {{ background: #000000; }}
#log {{
height: 1fr; border: none; background: #0b0e16; padding: 1 2;
height: 1fr; border: none; background: #000000; padding: 1 2;
scrollbar-size-vertical: 0;
}}
#promptbar {{ dock: bottom; height: 3; background: #0b0e16; border: round #3a3f55; margin: 1 1; }}
#promptbar {{ dock: bottom; height: 3; background: #000000; border: round #3a3f55; margin: 1 1; }}
#promptmark {{ width: 3; color: {banner.BRAND_HEX}; content-align: center middle; }}
#prompt {{ border: none; background: #0b0e16; padding: 0; }}
#prompt {{ border: none; background: #000000; padding: 0; }}
/* In normal flow below the 1fr log, so it sits just above the docked prompt bar. */
#spinner {{ height: 1; background: #0b0e16; padding: 0 2;
#spinner {{ height: 1; background: #000000; padding: 0 2;
color: {banner.BRAND_HEX}; display: none; }}
#status {{ dock: bottom; height: 1; background: #0b0e16; padding: 0 1; }}
#status {{ dock: bottom; height: 1; background: #000000; padding: 0 1; }}
"""
TITLE = "AssemblyAI Code"
# Ctrl-C quits (in addition to Ctrl-Q); the built-in command palette is removed.
ENABLE_COMMAND_PALETTE = False
# Interrupt/quit keys follow deepagents-code: Escape interrupts the running turn, and
# Ctrl-C interrupts a running turn or — when idle — quits only on a confirmed double-press.
BINDINGS: ClassVar = [
("ctrl+c", "quit", "Quit"),
("escape", "interrupt", "Interrupt"),
("ctrl+c", "quit_or_interrupt", "Interrupt / Quit"),
("ctrl+q", "quit", "Quit"),
("ctrl+y", "copy_last", "Copy last reply"),
]
Expand All @@ -194,16 +211,20 @@ def __init__(
thread_id: str = "default",
cwd: Path | None = None,
web_note: str | None = None,
voice: _VoiceIO | None = None,
) -> None:
super().__init__()
self._agent = agent
self._ask_bridge = ask_bridge if ask_bridge is not None else AskBridge()
self._auto_approve = auto_approve
self._initial = initial
self._voice = voice # when set, spoken turns drive the prompt and replies are read back
self._voice_typed = False # flips once the mic is ruled out; then input is typed only
self._session_name = thread_id # not _thread_id: that shadows Textual App's int
self._cwd = cwd if cwd is not None else Path.cwd()
self._web_note = web_note
self._last_reply = ""
self._quit_pending = False # armed by a first idle Ctrl-C; a second confirms quit
self._spin_frames = itertools.cycle(_SPIN_FRAMES)
self._spin_timer: Timer | None = None
self._turn_started = 0.0 # pragma: no mutate — always reset by _start_spinner first
Expand Down Expand Up @@ -248,6 +269,8 @@ def on_mount(self) -> None:
self.query_one("#prompt", Input).focus()
if self._initial:
self._submit(self._initial)
else:
self._begin_listening() # in voice mode, capture the first spoken turn

# --- event rendering (always called on the UI thread) ---------------------

Expand Down Expand Up @@ -323,6 +346,51 @@ def _ask(self, question: str) -> str:
"""Block the worker on a modal input screen and return the user's answer."""
return self._modal_result(AskScreen(question), default="")

# --- interrupt / quit -----------------------------------------------------
# Mirrors deepagents-code: Escape interrupts a running turn; Ctrl-C interrupts a running
# turn or, when idle, quits only on a confirmed double-press (so it never drops the
# conversation by accident). Ctrl-Q stays an unconditional one-press quit.

def _turn_running(self) -> bool:
"""Whether an agent turn is in flight (the prompt is disabled while one runs)."""
return self.query_one("#prompt", Input).disabled

def _cancel_turn(self) -> bool:
"""Ask the session to stop its agent loop if a turn is running; True if one was.

Cooperative: the worker keeps running until the streaming loop sees the flag at
the next step boundary, then finishes and re-enables the prompt — so we never kill
the thread mid-step (which Textual can't do safely anyway).
"""
if not self._turn_running():
return False
self._session.request_cancel()
self.query_one("#log", RichLog).write("[dim](cancelling…)[/dim]")
return True

def action_interrupt(self) -> None:
"""Escape: interrupt a running agent turn (a no-op when idle, so Esc never quits)."""
self._cancel_turn()

def action_quit_or_interrupt(self) -> None:
"""Ctrl-C: interrupt a running turn, else quit on a confirmed second press."""
if self._cancel_turn():
self._quit_pending = False
return
if self._quit_pending:
self.exit()
else:
self._arm_quit_pending()

def _arm_quit_pending(self) -> None:
"""Arm Ctrl-C double-press-to-quit, showing a hint that expires after a few seconds."""
self._quit_pending = True
self.notify("Press Ctrl-C again to quit", timeout=_QUIT_HINT_SECONDS)
self.set_timer(_QUIT_HINT_SECONDS, self._clear_quit_pending)

def _clear_quit_pending(self) -> None:
self._quit_pending = False # pragma: no mutate — timer-fired reset; timing-unassertable

# --- input loop -----------------------------------------------------------

def on_input_submitted(self, event: Input.Submitted) -> None:
Expand Down Expand Up @@ -370,3 +438,61 @@ def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
prompt = self.query_one("#prompt", Input)
prompt.disabled = False
prompt.focus()
self._voice_followup() # read a spoken summary back, then listen for the next turn

# --- voice (speak-to-it / read-summary-back; the legs run off the UI thread) ----

def _voice_active(self) -> bool:
"""Voice capture is on: a session exists and the mic hasn't been ruled out yet."""
return self._voice is not None and not self._voice_typed

def _spawn(self, target: Callable[[], None]) -> None:
"""Run ``target`` on a daemon thread — voice legs block, so they stay off the UI thread."""
threading.Thread(target=target, daemon=True).start() # pragma: no mutate

def _begin_listening(self) -> None:
"""Capture the next spoken turn on a background thread (no-op when voice is off)."""
if not self._voice_active():
return
self._spawn(self._capture_voice_turn)

def _voice_followup(self) -> None:
"""After a turn finishes: read back a spoken summary, then listen for the next turn."""
voice = self._voice
if voice is None:
return
self._spawn(lambda: self._speak_then_listen(voice))

def _speak_then_listen(self, voice: _VoiceIO) -> None:
"""Read a summary of the last reply aloud (no code), then capture the next spoken turn."""
voice.speak(spoken_summary(self._last_reply))
self._capture_voice_turn()

def _capture_voice_turn(self) -> None:
"""Listen for one spoken turn; enter it into the prompt, or degrade to typing."""
voice = self._voice
if voice is None or self._voice_typed:
return
try:
transcript = voice.listen()
except errors.CLIError as exc:
# A capture failure (no mic, STT error) drops voice for the rest of the session
# rather than wedging it — the user just types instead.
self._voice_typed = True
self.call_from_thread(self._notice_voice_off, exc.message)
return
if transcript:
self.call_from_thread(self._enter_and_submit, transcript)

def _notice_voice_off(self, detail: str) -> None:
"""Tell the user voice input stopped and that input is now typed (UI thread)."""
self.query_one("#log", RichLog).write(
f"[dim](voice input off: {escape(detail)}; type your request instead)[/dim]"
)

def _enter_and_submit(self, text: str) -> None:
"""Show the spoken text in the prompt, then submit it as a turn (UI thread)."""
prompt = self.query_one("#prompt", Input)
prompt.value = text
self._submit(text)
prompt.value = ""
24 changes: 24 additions & 0 deletions aai_cli/code_agent/voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from __future__ import annotations

import re
import threading
from collections.abc import Callable, Iterable, Iterator
from dataclasses import dataclass
Expand All @@ -37,6 +38,29 @@
# `assembly stream` and `assembly agent-cascade` use.
_SPEECH_MODEL = "u3-rt-pro"

# Reading code aloud over TTS is useless, so the readback speaks only the prose. These
# strip fenced and inline code, and the spoken summary is capped so a long reply stays brief.
_FENCED_CODE = re.compile(r"```.*?```", re.DOTALL)
_INLINE_CODE = re.compile(r"`[^`]+`")
_MAX_SPOKEN_CHARS = 600 # pragma: no mutate — a cosmetic cap on how much prose is read aloud
_ALL_CODE_READBACK = "I've updated the code — see the transcript for the details."


def spoken_summary(text: str) -> str:
"""Reduce an assistant reply to the prose worth reading aloud.

Drops fenced and inline code, collapses whitespace, and caps the length. When the reply
was essentially all code (nothing but blocks), returns a short generic note so the
readback still says *something* rather than going silent.
"""
prose = _INLINE_CODE.sub(" ", _FENCED_CODE.sub(" ", text))
prose = " ".join(prose.split()).strip()
if not prose:
return _ALL_CODE_READBACK
if len(prose) > _MAX_SPOKEN_CHARS:
return prose[:_MAX_SPOKEN_CHARS].rstrip() + "…"
return prose


class Microphone(Protocol):
"""The microphone slice the listen loop drives: an iterable of PCM at a known rate."""
Expand Down
Loading
Loading