Summary
At high concurrency, a map or parallel operation can stay in the current invocation while at least one branch is still running. When one iteration keeps it alive and the other branches wait on short timers (a wait, a wait_for_condition poll, or a step retry backoff), those branches resume in-process as their timers come due.
A single timer thread runs those resumes one at a time and holds its lock across a blocking checkpoint that refreshes state. Every resume costs one network round trip under the lock, and every branch trying to register its next wait queues behind it. When many timers come due together, the timer thread cannot keep up with the arrival rate. The operation does not finish before the function timeout, the backend reinvokes, the work replays, and the cycle repeats.
The work still completes correctly, but a map that should finish in seconds takes minutes across several timed-out invocations. This is a throughput limit in the in-process resume path, not a correctness failure.
Environment
aws-durable-execution-sdk-python 1.5.0, runtime python:3.13.DurableFunction.v17, us-west-2.
Reproduction
A map with one long-running keeper branch, which keeps the operation in-process, and num_stress branches polling wait_for_condition on a shared interval, so their timers come due in one synchronized wave. The keeper is a control to make "stays in-process" deterministic. wait_for_condition keeps each branch to one operation, which stays under the per-execution operation limit.
import time
from typing import Any
from aws_durable_execution_sdk_python.config import Duration, MapConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.types import (
StepContext,
WaitForConditionCheckContext,
)
from aws_durable_execution_sdk_python.waits import (
WaitForConditionConfig,
WaitForConditionDecision,
)
KEEPER_INDEX = 0
@durable_execution
def handler(event: Any, context: DurableContext) -> dict[str, int]:
payload: dict[str, Any] = event or {}
num_stress: int = int(payload.get("num_stress", 300))
polls: int = int(payload.get("polls", 8))
delay_seconds: int = int(payload.get("delay_seconds", 2))
keeper_seconds: int = int(payload.get("keeper_seconds", 18))
def branch(ctx: DurableContext, _item: int, index: int, _inputs: Any) -> int:
if index == KEEPER_INDEX:
# Keeps the map in-process so due timers resume in-process.
def keep(_step_ctx: StepContext) -> int:
time.sleep(keeper_seconds)
return keeper_seconds
return ctx.step(keep, name="keeper")
def check(state: int, _check_ctx: WaitForConditionCheckContext) -> int:
return state + 1
def strategy(state: int, _attempt: int) -> WaitForConditionDecision:
if state >= polls:
return WaitForConditionDecision.stop_polling()
return WaitForConditionDecision.continue_waiting(
Duration.from_seconds(delay_seconds)
)
return ctx.wait_for_condition(
check=check,
config=WaitForConditionConfig(wait_strategy=strategy, initial_state=0),
)
result = context.map(
inputs=list(range(num_stress + 1)), func=branch, name="repro_timer_serial"
)
return {
"total": result.total_count,
"succeeded": result.success_count,
"failed": result.failure_count,
}
Deploy as a durable function with a 60s function timeout and invoke with:
{"num_stress": 300, "polls": 8, "delay_seconds": 2, "keeper_seconds": 18}
Observed vs expected
Ideal completion is about 18s (the keeper) or 16s (8 polls at 2s). All 301 branches complete in both builds.
| Build |
invocations |
60s timeouts |
wall |
| 1.5.0 |
4 |
3 |
222s |
| proposed change |
1 |
0 |
38s |
Proposed change
Drain all due resumes under the lock and keep the pop and the reset-to-pending atomic, then resubmit outside the lock with one shared state refresh per wave instead of one round trip per resume. This is the throughput fix: the timer thread no longer holds its lock across a network call, and a single refresh serves the whole due wave.
Why narrowing the lock scope is safe
Before, one branch was handled entirely under the lock, one branch per loop turn:
with self._lock:
if self._pending_resumes and self._pending_resumes[0][0] <= current_time:
_, _, exe_state = heapq.heappop(self._pending_resumes) # 1. pop one timer off the heap
if exe_state.can_resume: # 2. check it
exe_state.reset_to_pending() # 3. mark it PENDING
self.resubmit_callback(exe_state) # 4. refresh state + submit (network + worker pool)
After, only the heap work stays under the lock, for the whole due wave:
ready = []
with self._lock:
while self._pending_resumes and self._pending_resumes[0][0] <= current_time:
_, _, exe_state = heapq.heappop(self._pending_resumes) # 1. pop
if exe_state.can_resume:
exe_state.reset_to_pending() # 2. mark PENDING (atomic with the pop)
ready.append(exe_state)
# lock released here
if ready:
self.resubmit_callback(ready) # 3. one shared refresh + submit each, off the lock
The lock only protects the pending-resume heap. The resubmit work, meaning the network refresh and the worker-pool submit, never touches the heap, so it does not need the lock. Narrowing the scope leaves one window: a branch that has been popped and marked PENDING but not yet submitted, while the timer thread runs the shared refresh. Nothing in that window is harmful:
should_execution_suspend reads branch status to decide whether to suspend the whole operation. It sees this branch as PENDING, which means "still has work to do," so it returns do-not-suspend. Keeping the pop and the reset-to-pending atomic is what guarantees the branch is never observed off the heap but still suspended, which would otherwise trigger a spurious parent suspend.
- Only the single timer thread pops from the heap and resubmits, so a branch cannot be resumed twice. It has no future yet, so no done-callback can fire for it mid-window.
- A concurrent
schedule_resume only pushes to the heap under the lock and does not touch this branch.
- If the shared refresh fails, which happens only when the checkpoint subsystem has already failed and is terminal, the timer thread records the error, wakes
execute(), and returns without submitting. The branch stays PENDING and never runs, but the execution is failing anyway and the backend retries from the last checkpoint. No hang.
PR to follow.
Summary
At high concurrency, a map or parallel operation can stay in the current invocation while at least one branch is still running. When one iteration keeps it alive and the other branches wait on short timers (a
wait, await_for_conditionpoll, or a step retry backoff), those branches resume in-process as their timers come due.A single timer thread runs those resumes one at a time and holds its lock across a blocking checkpoint that refreshes state. Every resume costs one network round trip under the lock, and every branch trying to register its next wait queues behind it. When many timers come due together, the timer thread cannot keep up with the arrival rate. The operation does not finish before the function timeout, the backend reinvokes, the work replays, and the cycle repeats.
The work still completes correctly, but a map that should finish in seconds takes minutes across several timed-out invocations. This is a throughput limit in the in-process resume path, not a correctness failure.
Environment
aws-durable-execution-sdk-python1.5.0, runtimepython:3.13.DurableFunction.v17, us-west-2.Reproduction
A map with one long-running keeper branch, which keeps the operation in-process, and
num_stressbranches pollingwait_for_conditionon a shared interval, so their timers come due in one synchronized wave. The keeper is a control to make "stays in-process" deterministic.wait_for_conditionkeeps each branch to one operation, which stays under the per-execution operation limit.Deploy as a durable function with a 60s function timeout and invoke with:
{"num_stress": 300, "polls": 8, "delay_seconds": 2, "keeper_seconds": 18}Observed vs expected
Ideal completion is about 18s (the keeper) or 16s (8 polls at 2s). All 301 branches complete in both builds.
Proposed change
Drain all due resumes under the lock and keep the pop and the reset-to-pending atomic, then resubmit outside the lock with one shared state refresh per wave instead of one round trip per resume. This is the throughput fix: the timer thread no longer holds its lock across a network call, and a single refresh serves the whole due wave.
Why narrowing the lock scope is safe
Before, one branch was handled entirely under the lock, one branch per loop turn:
After, only the heap work stays under the lock, for the whole due wave:
The lock only protects the pending-resume heap. The resubmit work, meaning the network refresh and the worker-pool submit, never touches the heap, so it does not need the lock. Narrowing the scope leaves one window: a branch that has been popped and marked PENDING but not yet submitted, while the timer thread runs the shared refresh. Nothing in that window is harmful:
should_execution_suspendreads branch status to decide whether to suspend the whole operation. It sees this branch as PENDING, which means "still has work to do," so it returns do-not-suspend. Keeping the pop and the reset-to-pending atomic is what guarantees the branch is never observed off the heap but still suspended, which would otherwise trigger a spurious parent suspend.schedule_resumeonly pushes to the heap under the lock and does not touch this branch.execute(), and returns without submitting. The branch stays PENDING and never runs, but the execution is failing anyway and the backend retries from the last checkpoint. No hang.PR to follow.