fix: improve error handling in config parsing, callbacks, and async execution#6440
fix: improve error handling in config parsing, callbacks, and async execution#6440axiom-of-choice wants to merge 4 commits into
Conversation
Add SearchApiSearchTool supporting 7 search engines (google, google_news, google_shopping, google_jobs, youtube, bing, baidu) through a single configurable tool class. Key design decisions: - Single tool with engine parameter vs separate classes per engine - Bearer token auth with PrivateAttr to prevent key serialization leaks - Per-query location parameter for runtime flexibility - Comprehensive test suite (19 tests) covering initialization, request construction, multi-engine execution, and error handling
- Add SearchApiSearchTool to __all__ in crewai_tools/__init__.py - Fix MD058: add blank lines before tables in README - Move engine validation to field_validator for assignment-time checks
📝 WalkthroughWalkthroughAdds a new SearchApiSearchTool to crewai-tools, including implementation, exports, tool spec, README, and tests. Separately, fixes error handling in crewai's Crew class and Task callback coroutine execution. ChangesSearchApi Search Tool
Crew and Task Error Handling Fixes
Sequence Diagram(s)sequenceDiagram
participant Agent
participant SearchApiSearchTool
participant SearchApiAPI
Agent->>SearchApiSearchTool: _run(search_query, location)
SearchApiSearchTool->>SearchApiSearchTool: validate engine, build params
SearchApiSearchTool->>SearchApiAPI: GET request with Bearer token
SearchApiAPI-->>SearchApiSearchTool: JSON response
SearchApiSearchTool->>SearchApiSearchTool: remove metadata/pagination
SearchApiSearchTool-->>Agent: filtered results or error message
Suggested reviewers: 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
lib/crewai-tools/src/crewai_tools/tools/searchapi_tool/searchapi_search_tool.py (2)
120-127: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick winUnhandled non-dict JSON response.
response.json()is typed asdict[str, Any], but if the API ever returns a non-dict payload (e.g., a list or error string) on a 2xx response,results.pop(...)at Line 127 will raise an uncaughtAttributeError, crashing the tool instead of returning the graceful error string used forRequestException.🛡️ Proposed defensive check
response.raise_for_status() - results: dict[str, Any] = response.json() + results = response.json() + if not isinstance(results, dict): + return results except requests.RequestException as e:🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai-tools/src/crewai_tools/tools/searchapi_tool/searchapi_search_tool.py` around lines 120 - 127, The search response handling in SearchApiSearchTool assumes response.json() always returns a dict, which can crash when the API returns a non-dict payload. Update the parsing path in the SearchApiSearchTool method that assigns results so it validates the JSON type before calling pop on search_metadata, search_parameters, and pagination, and if the payload is not a dict, log an error and return the same graceful error string used in the RequestException branch.
73-81: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueConsider
Literaltype forengine.
engineis a plainstrvalidated via a customfield_validator; usingLiteral[...]overSUPPORTED_ENGINESwould give static type checking and self-documenting schema generation on top of the existing runtime check.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai-tools/src/crewai_tools/tools/searchapi_tool/searchapi_search_tool.py` around lines 73 - 81, The SearchApiSearchTool engine field is only typed as a plain string and validated later in validate_engine, so update the engine annotation to use a Literal built from SUPPORTED_ENGINES in SearchApiSearchTool; keep the existing runtime validation if needed, but make the type definition self-documenting and schema-friendly so static typing reflects the allowed engine values.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/task.py`:
- Around line 852-856: The coroutine execution in task callback handling still
fails when a loop is already running because the current
`get_running_loop()`/`run_until_complete()` fallback to `asyncio.run()` raises
again in the same thread. Update the callback execution path in `Task` so it
safely handles already-running event loops without calling `asyncio.run()`
there, and apply the same fix to the `crew.task_callback` logic mentioned below.
Use the existing callback invocation flow around `cb_result` as the place to
route async work through a safe alternate strategy.
---
Nitpick comments:
In
`@lib/crewai-tools/src/crewai_tools/tools/searchapi_tool/searchapi_search_tool.py`:
- Around line 120-127: The search response handling in SearchApiSearchTool
assumes response.json() always returns a dict, which can crash when the API
returns a non-dict payload. Update the parsing path in the SearchApiSearchTool
method that assigns results so it validates the JSON type before calling pop on
search_metadata, search_parameters, and pagination, and if the payload is not a
dict, log an error and return the same graceful error string used in the
RequestException branch.
- Around line 73-81: The SearchApiSearchTool engine field is only typed as a
plain string and validated later in validate_engine, so update the engine
annotation to use a Literal built from SUPPORTED_ENGINES in SearchApiSearchTool;
keep the existing runtime validation if needed, but make the type definition
self-documenting and schema-friendly so static typing reflects the allowed
engine values.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: dfaed6e6-ce1f-406e-aff7-273367253353
📒 Files selected for processing (9)
lib/crewai-tools/src/crewai_tools/__init__.pylib/crewai-tools/src/crewai_tools/tools/__init__.pylib/crewai-tools/src/crewai_tools/tools/searchapi_tool/README.mdlib/crewai-tools/src/crewai_tools/tools/searchapi_tool/__init__.pylib/crewai-tools/src/crewai_tools/tools/searchapi_tool/searchapi_search_tool.pylib/crewai-tools/tests/tools/searchapi_tool_test.pylib/crewai-tools/tool.specs.jsonlib/crewai/src/crewai/crew.pylib/crewai/src/crewai/task.py
…xecution - _create_task: raise ValueError with available roles instead of cryptic StopIteration when task config references unknown agent role - _create_task: avoid mutating input config dict (use filtered copy) - after_kickoff_callbacks: preserve result if callback returns None - task callbacks: run async coroutines in a separate thread when an event loop is already running (Jupyter, kickoff_async) to avoid RuntimeError from asyncio.run() inside a running loop Fixes crewAIInc#6439
af4bb31 to
0bea535
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/task.py`:
- Around line 762-789: The `_run_coroutine_sync` flow in `Task` is catching the
coroutine’s own `RuntimeError` as if it meant “no running loop,” which then
incorrectly calls `asyncio.run` again in the active loop. Refactor the logic to
detect the running loop first, then execute either the threaded
`asyncio.run(coro)` path or the direct `asyncio.run(coro)` path outside the
loop-detection `try/except`, so the re-raised exception from the worker thread
is not swallowed and the already-awaited coroutine is not reused. Use the
`_run_coroutine_sync` method and the `exception_holder`/`thread` branch as the
main locations to update.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 835636f5-b296-4839-9828-039331b48164
📒 Files selected for processing (2)
lib/crewai/src/crewai/crew.pylib/crewai/src/crewai/task.py
🚧 Files skipped from review as they are similar to previous changes (1)
- lib/crewai/src/crewai/crew.py
| @staticmethod | ||
| def _run_coroutine_sync(coro) -> None: | ||
| """Run a coroutine from synchronous code, handling the case where an | ||
| event loop is already running (e.g. Jupyter, kickoff_async).""" | ||
| import threading | ||
|
|
||
| try: | ||
| asyncio.get_running_loop() | ||
| # Loop already running — execute in a separate thread to avoid | ||
| # "cannot be called from a running event loop" errors | ||
| result_holder = [] | ||
| exception_holder = [] | ||
|
|
||
| def _run(): | ||
| try: | ||
| result_holder.append(asyncio.run(coro)) | ||
| except Exception as e: | ||
| exception_holder.append(e) | ||
|
|
||
| thread = threading.Thread(target=_run) | ||
| thread.start() | ||
| thread.join() | ||
| if exception_holder: | ||
| raise exception_holder[0] | ||
| except RuntimeError: | ||
| # No running loop — safe to use asyncio.run directly | ||
| asyncio.run(coro) | ||
|
|
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
RuntimeError from the coroutine is caught and triggers a second asyncio.run in the running loop.
The if exception_holder: raise exception_holder[0] (Line 785) executes inside the try block guarded by except RuntimeError. When a loop is already running and the callback coroutine raises a RuntimeError, that error is captured in the thread, re-raised here, then swallowed by except RuntimeError, which calls asyncio.run(coro) in the main thread. That both hits "asyncio.run() cannot be called from a running event loop" and reuses an already-awaited coroutine, masking the original failure.
Separate loop detection from execution so the re-raise isn't caught:
🔒 Proposed fix
`@staticmethod`
def _run_coroutine_sync(coro) -> None:
"""Run a coroutine from synchronous code, handling the case where an
event loop is already running (e.g. Jupyter, kickoff_async)."""
import threading
try:
asyncio.get_running_loop()
+ loop_running = True
+ except RuntimeError:
+ loop_running = False
+
+ if loop_running:
# Loop already running — execute in a separate thread to avoid
# "cannot be called from a running event loop" errors
result_holder = []
exception_holder = []
def _run():
try:
result_holder.append(asyncio.run(coro))
except Exception as e:
exception_holder.append(e)
thread = threading.Thread(target=_run)
thread.start()
thread.join()
if exception_holder:
raise exception_holder[0]
- except RuntimeError:
+ else:
# No running loop — safe to use asyncio.run directly
asyncio.run(coro)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @staticmethod | |
| def _run_coroutine_sync(coro) -> None: | |
| """Run a coroutine from synchronous code, handling the case where an | |
| event loop is already running (e.g. Jupyter, kickoff_async).""" | |
| import threading | |
| try: | |
| asyncio.get_running_loop() | |
| # Loop already running — execute in a separate thread to avoid | |
| # "cannot be called from a running event loop" errors | |
| result_holder = [] | |
| exception_holder = [] | |
| def _run(): | |
| try: | |
| result_holder.append(asyncio.run(coro)) | |
| except Exception as e: | |
| exception_holder.append(e) | |
| thread = threading.Thread(target=_run) | |
| thread.start() | |
| thread.join() | |
| if exception_holder: | |
| raise exception_holder[0] | |
| except RuntimeError: | |
| # No running loop — safe to use asyncio.run directly | |
| asyncio.run(coro) | |
| `@staticmethod` | |
| def _run_coroutine_sync(coro) -> None: | |
| """Run a coroutine from synchronous code, handling the case where an | |
| event loop is already running (e.g. Jupyter, kickoff_async).""" | |
| import threading | |
| try: | |
| asyncio.get_running_loop() | |
| loop_running = True | |
| except RuntimeError: | |
| loop_running = False | |
| if loop_running: | |
| # Loop already running — execute in a separate thread to avoid | |
| # "cannot be called from a running event loop" errors | |
| result_holder = [] | |
| exception_holder = [] | |
| def _run(): | |
| try: | |
| result_holder.append(asyncio.run(coro)) | |
| except Exception as e: | |
| exception_holder.append(e) | |
| thread = threading.Thread(target=_run) | |
| thread.start() | |
| thread.join() | |
| if exception_holder: | |
| raise exception_holder[0] | |
| else: | |
| # No running loop — safe to use asyncio.run directly | |
| asyncio.run(coro) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/task.py` around lines 762 - 789, The
`_run_coroutine_sync` flow in `Task` is catching the coroutine’s own
`RuntimeError` as if it meant “no running loop,” which then incorrectly calls
`asyncio.run` again in the active loop. Refactor the logic to detect the running
loop first, then execute either the threaded `asyncio.run(coro)` path or the
direct `asyncio.run(coro)` path outside the loop-detection `try/except`, so the
re-raised exception from the worker thread is not swallowed and the
already-awaited coroutine is not reused. Use the `_run_coroutine_sync` method
and the `exception_holder`/`thread` branch as the main locations to update.
Summary
Fixes 4 error handling bugs found during production usage:
_create_taskraisesStopIterationon unknown agent role — replaced with descriptiveValueErrorlisting available roles_create_taskmutates input config dict — now works on a filtered copy instead ofdelafter_kickoff_callbacksswallows result — only replaces result if callback returns non-NoneChanges
lib/crewai/src/crewai/crew.py—_create_taskand callback loop fixeslib/crewai/src/crewai/task.py— async callback execution fixReproduction
See issue #6439 for full reproduction script and output.
Test plan
Crew(config=...)with mismatched agent role now raises clearValueError_create_taskFixes #6439