Fix async task failure cleanup#6407
Conversation
📝 WalkthroughWalkthroughAdds exception handling to two async task processing methods in crew.py (_aprocess_async_tasks and _process_async_tasks): on failure, pending tasks/futures are cancelled and drained before the original exception is re-raised, replacing prior silent-hang behavior. Adds two regression tests validating this cancellation and propagation. ChangesAsync task failure handling
Sequence Diagram(s)sequenceDiagram
participant Crew
participant AsyncTaskProcessor as "_aprocess_async_tasks/_process_async_tasks"
participant PendingTasks as "Pending async tasks/futures"
Crew->>AsyncTaskProcessor: process async task results
AsyncTaskProcessor->>PendingTasks: await/collect results
PendingTasks-->>AsyncTaskProcessor: exception raised
AsyncTaskProcessor->>PendingTasks: cancel unfinished tasks
AsyncTaskProcessor->>PendingTasks: drain/suppress cleanup exceptions
AsyncTaskProcessor-->>Crew: re-raise original exception
Related issues: Suggested labels: bug, async, crew Suggested reviewers: joaomdmoura 🎨 Finished tuning the async gears — no more silent freezes in sight, 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
lib/crewai/tests/test_crew.py (1)
1294-1323: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick winAdd the later-failing-task regression case.
Both tests make the first awaited async unit fail. Add a mirrored case where the first future/task stays pending and a later one fails, so submission-order blocking can’t regress.
Also applies to: 1354-1372
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/tests/test_crew.py` around lines 1294 - 1323, Add a regression test in the Crew kickoff async ordering coverage that mirrors the existing failure case but with the first async future staying pending and a later async future raising, so submission-order blocking is verified in the opposite direction. Reuse the same test pattern around Crew.kickoff, Task.execute_async, and Task.execute_sync, but set up the first Task future to remain pending and the second to fail, then assert the later exception is raised and the earlier pending future is cancelled.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/crew.py`:
- Around line 1405-1421: The async batch handling in crew.py still awaits each
pending task sequentially inside the pending_tasks loop, so failures are
detected too late. Update the logic around the pending_tasks/async_task handling
to await the batch together using asyncio.gather (or equivalent) instead of
task-by-task, then on any exception cancel any still-running tasks, drain them
with gather(return_exceptions=True), and re-raise so _process_task_result and
_store_execution_log only run after each task completes successfully.
- Around line 1895-1910: The async future handling in crew execution is
currently processed in submission order via future.result(), which can delay
surfacing an earlier failure and stall the crew. Update the loop in the crew
execution path to consume futures in completion order using as_completed or
wait(..., return_when=FIRST_EXCEPTION), then keep the existing
cancellation/cleanup behavior for the remaining futures when one fails. Preserve
the existing task processing flow by still calling _process_task_result and
_store_execution_log for each completed future_task/task_output pair.
---
Nitpick comments:
In `@lib/crewai/tests/test_crew.py`:
- Around line 1294-1323: Add a regression test in the Crew kickoff async
ordering coverage that mirrors the existing failure case but with the first
async future staying pending and a later async future raising, so
submission-order blocking is verified in the opposite direction. Reuse the same
test pattern around Crew.kickoff, Task.execute_async, and Task.execute_sync, but
set up the first Task future to remain pending and the second to fail, then
assert the later exception is raised and the earlier pending future is
cancelled.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 3b68d9b6-4b41-42de-b7a0-c6ded2d3e88b
📒 Files selected for processing (2)
lib/crewai/src/crewai/crew.pylib/crewai/tests/test_crew.py
| try: | ||
| for future_task, async_task, task_index in pending_tasks: | ||
| task_output = await async_task | ||
| task_outputs.append(task_output) | ||
| self._process_task_result(future_task, task_output) | ||
| self._store_execution_log( | ||
| future_task, task_output, task_index, was_replayed | ||
| ) | ||
| except Exception: | ||
| for _, async_task, _ in pending_tasks: | ||
| if not async_task.done(): | ||
| async_task.cancel() | ||
| await asyncio.gather( | ||
| *(async_task for _, async_task, _ in pending_tasks), | ||
| return_exceptions=True, | ||
| ) | ||
| raise |
There was a problem hiding this comment.
🩺 Stability & Availability | 🔴 Critical | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate the relevant function and surrounding async control flow.
FILE="lib/crewai/src/crewai/crew.py"
# Show a compact outline first.
ast-grep outline "$FILE" --view expanded || true
# Read the relevant section with line numbers.
sed -n '1350,1465p' "$FILE" | cat -n
# Search for the helper methods involved in result processing and cleanup.
rg -n "_process_task_result|_store_execution_log|pending_tasks|asyncio.gather|cancel\(" "$FILE"Repository: crewAIInc/crewAI
Length of output: 14496
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/crew.py"
# Inspect the native-async batch processor and the older async-task helper nearby.
sed -n '1295,1425p' "$FILE" | cat -n
echo
echo '---'
sed -n '1880,1915p' "$FILE" | cat -n
echo
echo '---'
# Find where _aprocess_async_tasks is called so we can see its error-handling expectations.
rg -n "_aprocess_async_tasks\\(|_process_async_tasks\\(" "$FILE"Repository: crewAIInc/crewAI
Length of output: 8287
Await the async batch together, not task-by-task. This loop still blocks on the first pending task, so a later failure won’t trigger cancellation until earlier tasks finish. Use batch waiting (asyncio.gather or equivalent), then cancel/drain the बाकी tasks on error.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/crew.py` around lines 1405 - 1421, The async batch
handling in crew.py still awaits each pending task sequentially inside the
pending_tasks loop, so failures are detected too late. Update the logic around
the pending_tasks/async_task handling to await the batch together using
asyncio.gather (or equivalent) instead of task-by-task, then on any exception
cancel any still-running tasks, drain them with gather(return_exceptions=True),
and re-raise so _process_task_result and _store_execution_log only run after
each task completes successfully.
| try: | ||
| for future_task, future, task_index in futures: | ||
| task_output = future.result() | ||
| task_outputs.append(task_output) | ||
| self._process_task_result(future_task, task_output) | ||
| self._store_execution_log( | ||
| future_task, task_output, task_index, was_replayed | ||
| ) | ||
| except Exception: | ||
| for _, future, _ in futures: | ||
| if not future.done(): | ||
| future.cancel() | ||
| else: | ||
| with suppress(Exception): | ||
| future.result() | ||
| raise |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the relevant region and surrounding context
FILE="lib/crewai/src/crewai/crew.py"
wc -l "$FILE"
sed -n '1830,1935p' "$FILE"
# Search for futures/wait/FIRST_EXCEPTION usage in this file
rg -n "FIRST_EXCEPTION|concurrent\.futures|future\.result\(|futures =" "$FILE"Repository: crewAIInc/crewAI
Length of output: 4264
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/crew.py"
# Find where futures list is created and the async execution entrypoint
rg -n "append\\(|submit\\(|as_completed|wait\\(|_process_async_tasks|ThreadPoolExecutor|ProcessPoolExecutor|executor" "$FILE"
# Inspect the async execution region around task submission
sed -n '2010,2165p' "$FILE"Repository: crewAIInc/crewAI
Length of output: 7417
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/crew.py"
# Inspect the async task submission paths and their control flow
sed -n '1505,1625p' "$FILE"
# Show the function signature and neighboring helpers if needed
sed -n '1450,1508p' "$FILE"Repository: crewAIInc/crewAI
Length of output: 6915
Process async futures in completion order. future.result() walks the list in submission order, so an earlier pending task can delay a later failure and stall the crew unnecessarily. Use wait(..., return_when=FIRST_EXCEPTION) or as_completed, then cancel the remaining futures on the first error.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/crew.py` around lines 1895 - 1910, The async future
handling in crew execution is currently processed in submission order via
future.result(), which can delay surfacing an earlier failure and stall the
crew. Update the loop in the crew execution path to consume futures in
completion order using as_completed or wait(..., return_when=FIRST_EXCEPTION),
then keep the existing cancellation/cleanup behavior for the remaining futures
when one fails. Preserve the existing task processing flow by still calling
_process_task_result and _store_execution_log for each completed
future_task/task_output pair.
Summary
kickoff()andakickoff()failure pathsFixes #6380
Tests
uv run pytest lib/crewai/tests/test_crew.py::test_async_task_failure_cancels_pending_futures lib/crewai/tests/test_crew.py::test_async_task_failure_cancels_pending_native_tasksuv run ruff check lib/crewai/src/crewai/crew.py lib/crewai/tests/test_crew.py