Skip to content

Fix async task failure cleanup#6407

Open
Oxygen56 wants to merge 1 commit into
crewAIInc:mainfrom
Oxygen56:fix/async-task-failure-propagation
Open

Fix async task failure cleanup#6407
Oxygen56 wants to merge 1 commit into
crewAIInc:mainfrom
Oxygen56:fix/async-task-failure-propagation

Conversation

@Oxygen56

@Oxygen56 Oxygen56 commented Jul 1, 2026

Copy link
Copy Markdown

Summary

  • cancel and drain pending native async tasks when one async task fails
  • cancel unprocessed Future-based async tasks and consume completed failures in the sync path
  • add regression coverage for both kickoff() and akickoff() failure paths

Fixes #6380

Tests

  • uv run pytest lib/crewai/tests/test_crew.py::test_async_task_failure_cancels_pending_futures lib/crewai/tests/test_crew.py::test_async_task_failure_cancels_pending_native_tasks
  • uv run ruff check lib/crewai/src/crewai/crew.py lib/crewai/tests/test_crew.py

@coderabbitai

coderabbitai Bot commented Jul 1, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Adds exception handling to two async task processing methods in crew.py (_aprocess_async_tasks and _process_async_tasks): on failure, pending tasks/futures are cancelled and drained before the original exception is re-raised, replacing prior silent-hang behavior. Adds two regression tests validating this cancellation and propagation.

Changes

Async task failure handling

Layer / File(s) Summary
Cancellation and re-raise in async task processors
lib/crewai/src/crewai/crew.py
Imports suppress from contextlib; _aprocess_async_tasks and _process_async_tasks now wrap result processing in try/except, cancel unfinished tasks/futures on error, drain or suppress cleanup exceptions, and re-raise the original exception instead of hanging silently.
Regression tests for failure propagation
lib/crewai/tests/test_crew.py
Adds asyncio import and two new tests confirming that a failing async task cancels pending futures/native async tasks and prevents subsequent sync task execution during kickoff() and akickoff().

Sequence Diagram(s)

sequenceDiagram
  participant Crew
  participant AsyncTaskProcessor as "_aprocess_async_tasks/_process_async_tasks"
  participant PendingTasks as "Pending async tasks/futures"

  Crew->>AsyncTaskProcessor: process async task results
  AsyncTaskProcessor->>PendingTasks: await/collect results
  PendingTasks-->>AsyncTaskProcessor: exception raised
  AsyncTaskProcessor->>PendingTasks: cancel unfinished tasks
  AsyncTaskProcessor->>PendingTasks: drain/suppress cleanup exceptions
  AsyncTaskProcessor-->>Crew: re-raise original exception
Loading

Related issues: #6380 (async task LLM failure silently freezes flow) — this PR adds exception propagation and cancellation of pending async work, addressing the silent-hang scenario described.

Suggested labels: bug, async, crew

Suggested reviewers: joaomdmoura

🎨 Finished tuning the async gears — no more silent freezes in sight,
🐰 A rabbit hopped through cancelled tasks, setting errors right.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title is concise and accurately reflects the main change: async task failure cleanup.
Description check ✅ Passed The description clearly matches the code changes and regression tests in this PR.
Linked Issues check ✅ Passed The PR cancels, drains, and propagates async task failures, which addresses the freeze behavior in #6380.
Out of Scope Changes check ✅ Passed The changes and tests are focused on async failure cleanup with no clear unrelated additions.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@Oxygen56 Oxygen56 marked this pull request as ready for review July 1, 2026 05:13

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
lib/crewai/tests/test_crew.py (1)

1294-1323: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick win

Add the later-failing-task regression case.

Both tests make the first awaited async unit fail. Add a mirrored case where the first future/task stays pending and a later one fails, so submission-order blocking can’t regress.

Also applies to: 1354-1372

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/tests/test_crew.py` around lines 1294 - 1323, Add a regression
test in the Crew kickoff async ordering coverage that mirrors the existing
failure case but with the first async future staying pending and a later async
future raising, so submission-order blocking is verified in the opposite
direction. Reuse the same test pattern around Crew.kickoff, Task.execute_async,
and Task.execute_sync, but set up the first Task future to remain pending and
the second to fail, then assert the later exception is raised and the earlier
pending future is cancelled.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/crewai/src/crewai/crew.py`:
- Around line 1405-1421: The async batch handling in crew.py still awaits each
pending task sequentially inside the pending_tasks loop, so failures are
detected too late. Update the logic around the pending_tasks/async_task handling
to await the batch together using asyncio.gather (or equivalent) instead of
task-by-task, then on any exception cancel any still-running tasks, drain them
with gather(return_exceptions=True), and re-raise so _process_task_result and
_store_execution_log only run after each task completes successfully.
- Around line 1895-1910: The async future handling in crew execution is
currently processed in submission order via future.result(), which can delay
surfacing an earlier failure and stall the crew. Update the loop in the crew
execution path to consume futures in completion order using as_completed or
wait(..., return_when=FIRST_EXCEPTION), then keep the existing
cancellation/cleanup behavior for the remaining futures when one fails. Preserve
the existing task processing flow by still calling _process_task_result and
_store_execution_log for each completed future_task/task_output pair.

---

Nitpick comments:
In `@lib/crewai/tests/test_crew.py`:
- Around line 1294-1323: Add a regression test in the Crew kickoff async
ordering coverage that mirrors the existing failure case but with the first
async future staying pending and a later async future raising, so
submission-order blocking is verified in the opposite direction. Reuse the same
test pattern around Crew.kickoff, Task.execute_async, and Task.execute_sync, but
set up the first Task future to remain pending and the second to fail, then
assert the later exception is raised and the earlier pending future is
cancelled.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 3b68d9b6-4b41-42de-b7a0-c6ded2d3e88b

📥 Commits

Reviewing files that changed from the base of the PR and between 1452ee2 and e899991.

📒 Files selected for processing (2)
  • lib/crewai/src/crewai/crew.py
  • lib/crewai/tests/test_crew.py

Comment on lines +1405 to +1421
try:
for future_task, async_task, task_index in pending_tasks:
task_output = await async_task
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._store_execution_log(
future_task, task_output, task_index, was_replayed
)
except Exception:
for _, async_task, _ in pending_tasks:
if not async_task.done():
async_task.cancel()
await asyncio.gather(
*(async_task for _, async_task, _ in pending_tasks),
return_exceptions=True,
)
raise

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🔴 Critical | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate the relevant function and surrounding async control flow.
FILE="lib/crewai/src/crewai/crew.py"

# Show a compact outline first.
ast-grep outline "$FILE" --view expanded || true

# Read the relevant section with line numbers.
sed -n '1350,1465p' "$FILE" | cat -n

# Search for the helper methods involved in result processing and cleanup.
rg -n "_process_task_result|_store_execution_log|pending_tasks|asyncio.gather|cancel\(" "$FILE"

Repository: crewAIInc/crewAI

Length of output: 14496


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="lib/crewai/src/crewai/crew.py"

# Inspect the native-async batch processor and the older async-task helper nearby.
sed -n '1295,1425p' "$FILE" | cat -n

echo
echo '---'
sed -n '1880,1915p' "$FILE" | cat -n

echo
echo '---'
# Find where _aprocess_async_tasks is called so we can see its error-handling expectations.
rg -n "_aprocess_async_tasks\\(|_process_async_tasks\\(" "$FILE"

Repository: crewAIInc/crewAI

Length of output: 8287


Await the async batch together, not task-by-task. This loop still blocks on the first pending task, so a later failure won’t trigger cancellation until earlier tasks finish. Use batch waiting (asyncio.gather or equivalent), then cancel/drain the बाकी tasks on error.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/crew.py` around lines 1405 - 1421, The async batch
handling in crew.py still awaits each pending task sequentially inside the
pending_tasks loop, so failures are detected too late. Update the logic around
the pending_tasks/async_task handling to await the batch together using
asyncio.gather (or equivalent) instead of task-by-task, then on any exception
cancel any still-running tasks, drain them with gather(return_exceptions=True),
and re-raise so _process_task_result and _store_execution_log only run after
each task completes successfully.

Comment on lines +1895 to +1910
try:
for future_task, future, task_index in futures:
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._store_execution_log(
future_task, task_output, task_index, was_replayed
)
except Exception:
for _, future, _ in futures:
if not future.done():
future.cancel()
else:
with suppress(Exception):
future.result()
raise

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the relevant region and surrounding context
FILE="lib/crewai/src/crewai/crew.py"
wc -l "$FILE"
sed -n '1830,1935p' "$FILE"

# Search for futures/wait/FIRST_EXCEPTION usage in this file
rg -n "FIRST_EXCEPTION|concurrent\.futures|future\.result\(|futures =" "$FILE"

Repository: crewAIInc/crewAI

Length of output: 4264


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="lib/crewai/src/crewai/crew.py"

# Find where futures list is created and the async execution entrypoint
rg -n "append\\(|submit\\(|as_completed|wait\\(|_process_async_tasks|ThreadPoolExecutor|ProcessPoolExecutor|executor" "$FILE"

# Inspect the async execution region around task submission
sed -n '2010,2165p' "$FILE"

Repository: crewAIInc/crewAI

Length of output: 7417


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="lib/crewai/src/crewai/crew.py"

# Inspect the async task submission paths and their control flow
sed -n '1505,1625p' "$FILE"

# Show the function signature and neighboring helpers if needed
sed -n '1450,1508p' "$FILE"

Repository: crewAIInc/crewAI

Length of output: 6915


Process async futures in completion order. future.result() walks the list in submission order, so an earlier pending task can delay a later failure and stall the crew unnecessarily. Use wait(..., return_when=FIRST_EXCEPTION) or as_completed, then cancel the remaining futures on the first error.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/crew.py` around lines 1895 - 1910, The async future
handling in crew execution is currently processed in submission order via
future.result(), which can delay surfacing an earlier failure and stall the
crew. Update the loop in the crew execution path to consume futures in
completion order using as_completed or wait(..., return_when=FIRST_EXCEPTION),
then keep the existing cancellation/cleanup behavior for the remaining futures
when one fails. Preserve the existing task processing flow by still calling
_process_task_result and _store_execution_log for each completed
future_task/task_output pair.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug Report] Async task LLM failure silently freezes flow — reproduction + 10-line fix

1 participant