Skip to content

feat(server): add aclose() to drain ActiveTask background tasks (#1101)#1105

Open
astrogilda wants to merge 1 commit into
a2aproject:mainfrom
astrogilda:fix/active-task-registry-aclose-1101
Open

feat(server): add aclose() to drain ActiveTask background tasks (#1101)#1105
astrogilda wants to merge 1 commit into
a2aproject:mainfrom
astrogilda:fix/active-task-registry-aclose-1101

Conversation

@astrogilda

Copy link
Copy Markdown

Summary

Adds a public aclose() teardown to ActiveTask, ActiveTaskRegistry, and
DefaultRequestHandlerV2 that force-drains the SDK-owned producer, consumer,
and dispatcher asyncio.Tasks so none are left pending at event-loop shutdown.

  • ActiveTask.aclose() force-closes both event queues and cancels the producer
    and consumer tasks, then awaits them. It sets _is_finished under _lock, so
    it is mutually exclusive with start() (which refuses to spawn once
    _is_finished is set).
  • ActiveTaskRegistry.aclose() marks the registry closed so get_or_create
    refuses new work, drains every active task, then awaits the in-flight
    _remove_task cleanup tasks. The lock is released before awaiting because
    _remove_task re-acquires it.
  • DefaultRequestHandlerV2.aclose() delegates to the registry drain, for wiring
    into an ASGI lifespan / on_shutdown hook.

Why

Fixes #1101. At shutdown the ActiveTask producer can stay pending and surface
as Task was destroyed but it is pending!. The producer's finally calls
_event_queue_subscribers.close(immediate=False), which awaits join() on
every subscriber sink; an abandoned subscriber leaves an undrained sink, so the
join() never returns and the producer hangs. There is no public way to drain
these background tasks today. aclose() closes the queues with immediate=True,
which releases the wedged producer, and reaps the tasks.

The teardown always forces rather than exposing a graceful immediate=False
option, because that path inherits the documented close(immediate=False)
deadlock and a shutdown hook must be bounded.

Test plan

  • uv run pytest tests/server/agent_execution/ tests/server/request_handlers/ tests/server/events/ — pass
  • uv run pytest --cov=a2a --cov-fail-under=88 — pass
  • ./scripts/lint.sh — ruff, ruff-format, and ty clean

New tests cover the registry drain, idempotency, empty registry, new-work
rejected after close, and an errored task being logged rather than propagated;
ActiveTask reaping a running producer and force-closing past an undrained
subscriber (the #1101 repro); and the handler drain.

Fixes #1101 🦕

…roject#1101)

At shutdown the ActiveTask producer can stay pending and surface as
"Task was destroyed but it is pending!". The producer's finally closes the
subscriber queue with immediate=False, which joins every subscriber sink; an
abandoned subscriber leaves an undrained sink so the join never returns and
the producer hangs. No public API drains these background tasks today.

Add aclose() to ActiveTask, ActiveTaskRegistry, and DefaultRequestHandlerV2.
It force-closes the event queues (immediate=True), which releases the wedged
producer, then cancels and awaits the producer and consumer tasks. ActiveTask
sets _is_finished under _lock so it is mutually exclusive with start(); the
registry marks itself closed so get_or_create refuses new work during
teardown, closing the orphan-task race.

Fixes a2aproject#1101

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces aclose() methods to ActiveTask, ActiveTaskRegistry, and DefaultRequestHandlerV2 to facilitate a bounded, force-closed teardown of background tasks and queues during server shutdown, preventing pending task warnings. It also adds comprehensive unit tests to verify the teardown behavior, idempotency, and error handling. The review feedback suggests explicitly shutting down self._request_queue in ActiveTask.aclose() to ensure proper cleanup of all queues if start() was never called or failed early.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +745 to +746
await self._event_queue_agent.close(immediate=True)
await self._event_queue_subscribers.close(immediate=True)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If start() was never called or failed early, the background tasks are never spawned, meaning their finally blocks (which shut down self._request_queue) will not run. Explicitly shutting down self._request_queue in aclose() ensures that all queues are properly cleaned up and any pending operations on the request queue are unblocked.

Suggested change
await self._event_queue_agent.close(immediate=True)
await self._event_queue_subscribers.close(immediate=True)
await self._event_queue_agent.close(immediate=True)
await self._event_queue_subscribers.close(immediate=True)
self._request_queue.shutdown(immediate=True)

@github-actions

Copy link
Copy Markdown

🧪 Code Coverage (vs main)

⬇️ Download Full Report

Base PR Delta
src/a2a/server/agent_execution/active_task.py 95.92% 95.37% 🔴 -0.55%
src/a2a/server/agent_execution/active_task_registry.py 93.75% 96.61% 🟢 +2.86%
src/a2a/server/events/event_queue_v2.py 91.19% 91.71% 🟢 +0.52%
src/a2a/server/request_handlers/default_request_handler_v2.py 94.12% 94.17% 🟢 +0.05%
src/a2a/utils/telemetry.py 90.70% 91.47% 🟢 +0.78%
Total 92.99% 93.02% 🟢 +0.03%

Generated by coverage-comment.yml

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: DefaultRequestHandlerV2 ActiveTask producer can remain pending during EventQueue subscriber cleanup

1 participant