Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,70 @@ finally:

</details>

<details>
<summary>Dual-channel: mic + system audio in one session</summary>

For note-taker apps that capture two live sources (microphone **and** system/speaker output) but want them handled as **one** streaming session — while still knowing which source each word came from — wrap the client in a `ChannelStreamer`.

You declare named channels and feed each channel's PCM separately. The SDK runs per-channel energy VAD, mixes the channels into a single mono stream over one websocket, and — for handlers registered on the coordinator — delivers an enriched `DualChannelTurnEvent` whose words/turn carry their originating channel (`turn.channel` and per-word `word.channel`). The base `Word` / `TurnEvent` stay unchanged, so single-stream payloads aren't affected. Attribution is fully client-side and model-agnostic, so it composes with `speaker_labels`, multilingual, and `u3-rt-pro`. It is a **separate dimension from diarization** — `word.channel` (physical source) is independent of `word.speaker` (voice): two people on the same `system` channel get distinct speaker labels, while one person heard on two channels keeps a single speaker label.

Unlike a browser sample, the SDK does not capture audio — you supply 16-bit PCM for each channel (from `sounddevice`, `pyaudio`, a loopback device, files, …).

```python
from assemblyai.streaming.v3 import (
ChannelStreamer, StreamingClient, StreamingClientOptions,
StreamingEvents, StreamingParameters,
)

def on_turn(client, event): # event is a DualChannelTurnEvent
print(f"[{event.channel}] {event.transcript}")
for w in event.words:
print(f" {w.text!r} -> channel={w.channel} speaker={w.speaker}")

client = StreamingClient(StreamingClientOptions(api_key="<YOUR_API_KEY>"))

# Declare the channels and the session sample rate (must be pcm_s16le).
mixer = ChannelStreamer(client, channels=["mic", "system"], sample_rate=16000)
# Register handlers on the mixer: Turn handlers receive the enriched event,
# other events (Begin/Error/…) are forwarded to the client.
mixer.on(StreamingEvents.Turn, on_turn)
client.connect(StreamingParameters(
sample_rate=16000, speech_model="u3-rt-pro", speaker_labels=True,
))

# Feed each source separately — e.g. from two capture callbacks. Send
# continuous PCM for every channel (silence as zeros), at the same rate.
mixer.stream("mic", mic_pcm)
mixer.stream("system", system_pcm)

mixer.flush() # push trailing buffered audio
client.disconnect(terminate=True)
```

`AsyncChannelStreamer` is the asyncio-native equivalent (`await mixer.stream(...)` / `await mixer.close_channel(...)` / `await mixer.flush()`); register handlers the same way with `mixer.on(...)`.

**Sources that end mid-session.** Mixing keeps channels aligned by consuming the shortest buffer, so it assumes every channel keeps delivering PCM (send silence as zeros, don't omit it). When a source genuinely ends (file EOF, screen share stopped, device removed), call `mixer.close_channel(name)` so the session degrades to the surviving channel(s) instead of stalling — the ended channel is then padded with silence.

**Swappable VAD.** The default detector is the built-in energy-based `EnergyVad`. Supply your own (e.g. a DNN VAD such as Silero) via `ChannelAttributionOptions.create_vad`, which is called once per channel with the channel name; subclass `VadDetector` (`process(frame) -> VadResult`, `reset()`). Pass `on_vad=callback` to observe raw per-frame activity (e.g. a live "who's talking" meter). Tune the default with `EnergyVad(threshold_ratio=3.0, noise_floor_alpha=0.05, hangover_frames=10)` — `threshold_ratio` below ~2 is too sensitive, above ~6 misses quiet onsets/offsets.

**Resolving unknown channels.** A word is `"unknown"` when no channel was clearly dominant in its window — silence, or two channels too close to call (the top must beat the runner-up by `dominance_ratio`, default 4). `ChannelAttributionOptions.resolve_unknown_channels_method` back-fills these:

- `"window"` (default) — from the dominant non-`"unknown"` channel among ±`resolution_window_words` neighbor words.
- `"speaker-history"` — from the speaker's session-wide channel evidence (requires `speaker_labels`).
- `"none"` — leave `"unknown"` as-is.

Back-filled words are flagged `word.channel_resolved = True`; confident per-word decisions are never overwritten. The method is validated at construction, so a typo raises immediately rather than silently disabling resolution.

**Caveats.**

- Requires 16-bit PCM (`pcm_s16le`, the default) — linear mixing is invalid for `pcm_mulaw`.
- Capturing the system/speaker output is platform-specific: macOS needs a loopback driver (e.g. BlackHole); Windows uses WASAPI loopback; Linux a PulseAudio/PipeWire monitor source.
- If the mic physically picks up the speakers, that bleed can pull attribution toward `mic`. Apply acoustic echo cancellation at capture (`getUserMedia({ audio: { echoCancellation: true } })` in browser front-ends, or an AEC-capable native path) — the SDK only receives already-captured PCM, so it can't apply AEC itself. Transcription quality is unaffected; only the `channel` field.

See [`examples/streaming_dual_channel.py`](./examples/streaming_dual_channel.py) for a complete runnable demo.

</details>

<details>
<summary>Stream a local file (async)</summary>

Expand Down
2 changes: 1 addition & 1 deletion assemblyai/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.64.20"
__version__ = "0.64.21"
22 changes: 22 additions & 0 deletions assemblyai/streaming/v3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
from .async_client import AsyncStreamingClient
from .client import StreamingClient
from .extras import (
AsyncChannelStreamer,
ChannelAttributionOptions,
ChannelStreamer,
DualChannelTurnEvent,
DualChannelWord,
EnergyVad,
VadDetector,
VadFrame,
VadResult,
attribute_turn,
)
from .models import (
BeginEvent,
Encoding,
Expand Down Expand Up @@ -27,8 +39,14 @@
)

__all__ = [
"AsyncChannelStreamer",
"AsyncStreamingClient",
"BeginEvent",
"ChannelAttributionOptions",
"ChannelStreamer",
"DualChannelTurnEvent",
"DualChannelWord",
"EnergyVad",
"Encoding",
"EventMessage",
"LLMGatewayResponseEvent",
Expand All @@ -50,6 +68,10 @@
"StreamingSessionParameters",
"TerminationEvent",
"TurnEvent",
"VadDetector",
"VadFrame",
"VadResult",
"WarningEvent",
"Word",
"attribute_turn",
]
15 changes: 11 additions & 4 deletions assemblyai/streaming/v3/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
ErrorEvent,
EventMessage,
ForceEndpoint,
KeepAlive,
OperationMessage,
StreamingClientOptions,
StreamingError,
Expand Down Expand Up @@ -80,10 +81,10 @@ class AsyncStreamingClient(_BaseStreamingClient):

Behavioral notes vs. the sync ``StreamingClient``:

- ``stream`` / ``set_params`` / ``force_endpoint`` raise ``RuntimeError``
when called before ``connect()`` — silent drop would diverge from the
sync client (which buffers pre-connect data) in a way that's easy to
miss. After the connection has closed, the same calls are silent
- ``stream`` / ``set_params`` / ``force_endpoint`` / ``keep_alive`` raise
``RuntimeError`` when called before ``connect()`` — silent drop would
diverge from the sync client (which buffers pre-connect data) in a way
that's easy to miss. After the connection has closed, the same calls are silent
no-ops so cleanup paths don't need defensive try/except.
- ``disconnect(terminate=True)`` waits at most 2.0s for the write task to
drain the ``TerminateSession`` frame before forcing teardown. The sync
Expand Down Expand Up @@ -288,6 +289,12 @@ async def force_endpoint(self) -> None:
return
await write_queue.put(ForceEndpoint())

async def keep_alive(self) -> None:
write_queue, stop_event = self._ensure_connected("keep_alive")
if stop_event.is_set():
return
await write_queue.put(KeepAlive())

def _ensure_connected(
self, method: str
) -> "tuple[asyncio.Queue[OperationMessage], asyncio.Event]":
Expand Down
5 changes: 5 additions & 0 deletions assemblyai/streaming/v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ErrorEvent,
EventMessage,
ForceEndpoint,
KeepAlive,
OperationMessage,
StreamingClientOptions,
StreamingError,
Expand Down Expand Up @@ -194,6 +195,10 @@ def force_endpoint(self):
message = ForceEndpoint()
self._write_queue.put(message)

def keep_alive(self):
message = KeepAlive()
self._write_queue.put(message)

def _write_message(self) -> None:
while True:
if not self._websocket:
Expand Down
Loading
Loading