feat(crew): add stream_frames opt-in for full event frame streaming#6451
feat(crew): add stream_frames opt-in for full event frame streaming#6451C0d3N1nja97342 wants to merge 3 commits into
Conversation
When stream=True and stream_frames=True, kickoff/kickoff_async now stream full StreamFrame events (LLM tokens, tool calls, agent/task lifecycle) via the existing frame pipeline, instead of the default StreamChunk token-only stream. Returns a StreamSession / AsyncStreamSession whose .subscribe(channels=...) allows filtering by channel (llm, tools, messages, flow, lifecycle). Reuses the existing frame infrastructure (create_frame_streaming_state / create_frame_generator / stream_frame_from_event) already used by BaseLLM.stream_events() and Flow.stream_events(); no new event emission points and no new output types. The default chunk path (stream=True, stream_frames=False) is unchanged, so the change is fully backward compatible.
Add TestCrewFrameStreaming covering: mixed-event streaming (asserts non-LLM 'tools' channel frames pass through, which the chunk path drops), result accessibility after exhaustion, default stream_frames=False compatibility, and the async _akickoff_frame_stream path.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughAdds a ChangesFrame Streaming Feature
Sequence Diagram(s)sequenceDiagram
participant Caller
participant Crew
participant FrameGenerator
participant StreamSession
Caller->>Crew: kickoff(stream_frames=True)
Crew->>Crew: _kickoff_frame_stream()
Crew->>Crew: create_frame_streaming_state()
Crew->>Crew: run kickoff() with stream disabled
Crew->>FrameGenerator: create_frame_generator(state)
Crew->>StreamSession: wrap generator + captured CrewOutput
StreamSession-->>Caller: yield frames (llm, tools)
Caller->>StreamSession: access .result after exhaustion
StreamSession-->>Caller: CrewOutput
Suggested reviewers: 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/crew.py`:
- Around line 303-312: The stream_frames setting currently implies streaming but
is silently ignored when stream is false, so fix the Crew model to enforce that
contract. Add a model_validator on the Crew class (or equivalent validation near
stream_frames) to either raise when stream_frames=True and stream=False, or
automatically enable stream before kickoff; ensure kickoff() and kickoff_async()
stay consistent with the validated behavior.
In `@lib/crewai/tests/test_streaming.py`:
- Around line 1084-1088: The assertion in test_streaming’s tools-channel check
is only attaching the first string to the assert, leaving the trailing strings
as no-op expressions. Update the assert expression so the full failure message
is part of the same assertion, using the existing channels check in
test_streaming and keeping the message strings grouped together for implicit
concatenation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: c58f516b-0cbc-47fb-aaaf-da1845729769
📒 Files selected for processing (2)
lib/crewai/src/crewai/crew.pylib/crewai/tests/test_streaming.py
…sage Address CodeRabbit review: (1) add a model_validator so stream_frames=True auto-enables stream=True instead of being silently ignored when stream=False, and update the field docstring; (2) fix the split assertion in test_frame_stream_yields_mixed_events so the full failure message is part of the assert (the trailing strings were no-op expressions). Add a test covering the stream_frames->stream implication.
Problem
Crew.kickoff(stream=True)streams onlyLLMStreamChunkEventtoken chunks — the stream handler (utilities/streaming.py:_create_stream_handler) filters out every other event type. So consumers iterating a streaming crew cannot observe tool calls, agent/task lifecycle, or reasoning steps as they happen, which limits real-time observability for multi-agent runs.Solution
Add an opt-in
Crew.stream_framesflag (defaultFalse). Whenstream=Trueandstream_frames=True,kickoff/kickoff_asyncroute through the existing frame streaming pipeline (create_frame_streaming_state+create_frame_generator/create_async_frame_generator), which already converts all bus events intoStreamFrameobjects viastream_frame_from_event. The crew returns aStreamSession/AsyncStreamSession(the same public types already used byBaseLLM.stream_events()andFlow.stream_events()), whose.subscribe(channels=...)lets consumers filter by channel (llm,tools,messages,flow,lifecycle).This reuses the existing frame infrastructure — no new event emission points and no new output types. The default chunk path (
stream=True,stream_frames=False) is unchanged, so the change is fully backward compatible.Changes
lib/crewai/src/crewai/crew.py: newstream_framesField;kickoff/kickoff_asyncadd an early-return branch to_kickoff_frame_stream/_akickoff_frame_streamwhenstream_framesis set; return-type union extended withStreamSession/AsyncStreamSession.lib/crewai/tests/test_streaming.py:TestCrewFrameStreamingcovering mixed-event streaming (non-LLMtoolschannel frames pass through), result accessibility after exhaustion, default-Falsecompatibility, and the async path.Testing
uv run pytest lib/crewai/tests/test_streaming.py::TestCrewFrameStreaming→ 4 passed (Python 3.12).ruff checkandruff format --checkclean on the changed files.Notes for Reviewer
.github/CONTRIBUTING.md, thellm-generatedlabel is requested (applied if permissions allow).StreamSession/AsyncStreamSessionpublic types rather than introducing a newCrewFrameStreamingOutput, and reused the existing frame pipeline rather than widening the chunk handler — keeps the change additive and backward compatible.kickoff_for_each*is intentionally out of scope for this PR.