Graph network resilience: node retry + resumable failures#5
Conversation
RetryPolicy gains a backoff_sleep flag (off by default) and a sleep_backoff helper. The agent-loop retry loop and RetryMiddleware now sleep for the computed backoff when opted in via with_backoff_sleep, so transient provider/network failures are retried after a real, growing delay instead of back-to-back. Default stays sleep-free to keep tests deterministic. Claude-Session: https://claude.ai/code/session_01NRL2bim9Gz3UN3SRtpeXEX
Add opt-in CompiledGraph::with_node_retry(RetryPolicy): each node handler is re-run from its start on a retryable (model/tool) error, up to the policy cap, emitting GraphEvent::NodeRetryScheduled and sleeping the opt-in backoff. Both the sequential and parallel runners drive handlers through the shared run_node_with_retry helper (which also applies the per-node timeout). When a handler fails after retries (or a non-retryable error hits), the runner now hands back partial progress instead of discarding it: completed branches' updates are folded into committed state and a resumable failure-boundary checkpoint is persisted (failed node scheduled as next_nodes, successful branches as completed_tasks, error + failed node in metadata). The Failed status carries that checkpoint id, so a crashed/exhausted run can be resumed or continued rather than lost. Non-checkpointed runs abort exactly as before. Claude-Session: https://claude.ai/code/session_01NRL2bim9Gz3UN3SRtpeXEX
Add CompiledGraph::retry(thread) — shorthand for resume with an empty command — to re-run a failed run's failure-boundary checkpoint, and document the inspect/update_state/retry feedback loop for continuing on user input. Tests: node retry recovers a transient failure (one NodeRetryScheduled per blip); exhausted retries leave a resumable checkpoint that retry() completes; a failure with no retry policy is still resumable (resumable-abort default); edit-state-then-retry runs against edited state; parallel partial progress is preserved (successful branch folded, only failed branch rescheduled). Plus a paused-time test that backoff sleep fires only when opted in. Claude-Session: https://claude.ai/code/session_01NRL2bim9Gz3UN3SRtpeXEX
Add examples/resilient_graph.rs demonstrating node-level retry absorbing a transient fetch blip and a resumable failure checkpoint that retry() restarts after an outage clears. Rewrite docs/modules/graph/fault-tolerance.md to document the implemented behavior, add a resilience section to the graph module docs and wiki, and list the example in README/Examples. Claude-Session: https://claude.ai/code/session_01NRL2bim9Gz3UN3SRtpeXEX
📝 WalkthroughWalkthroughAdds opt-in node-level retry ( ChangesNode retry and resumable failure checkpoints
Estimated code review effort: 4 (Complex) | ~60 minutes Sequence Diagram(s)sequenceDiagram
participant Caller
participant CompiledGraph
participant NodeHandler
participant Checkpointer
Caller->>CompiledGraph: run(thread)
CompiledGraph->>NodeHandler: run_node_with_retry
NodeHandler-->>CompiledGraph: retryable error
CompiledGraph->>CompiledGraph: emit NodeRetryScheduled
CompiledGraph->>NodeHandler: retry attempt
NodeHandler-->>CompiledGraph: hard failure
CompiledGraph->>Checkpointer: persist_failure_checkpoint(next_nodes)
CompiledGraph->>Caller: Failed status with checkpoint id
Caller->>CompiledGraph: retry(thread)
CompiledGraph->>Checkpointer: load checkpoint
CompiledGraph->>NodeHandler: re-run failed node
NodeHandler-->>CompiledGraph: success
CompiledGraph->>Caller: Completed status
Related Issues: None found. Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches📝 Generate docstrings
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 17cec870c7
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| completed: Vec<NodeId>, | ||
| /// Nodes to re-run on resume: the failed node first, then the not-yet-folded | ||
| /// members of the step (recorded as the checkpoint's next nodes). | ||
| pending: Vec<NodeId>, |
There was a problem hiding this comment.
Preserve Send payloads in failure checkpoints
When a failed activation came from Command::send, storing only NodeIds in the failure checkpoint drops the activation's send_arg; resume_from later rebuilds pending work with Activation::node, so retrying a failed map/fan-out worker reruns it with ctx.send_arg == None instead of the original payload. This breaks resumable failures for Send-based graphs, including repeated activations of the same node that require distinct payloads.
Useful? React with 👍 / 👎.
| if !self.backoff_sleep { | ||
| return; | ||
| } | ||
| let backoff = self.backoff_for_attempt(attempt); |
There was a problem hiding this comment.
Honor jitter when sleeping for backoff
With RetryPolicy::with_jitter(true).with_backoff_sleep(true), this calls backoff_for_attempt, which delegates to backoff_for_attempt_with(attempt, 0.0); because jitter multiplies by that value, the computed backoff is always zero and the retry loop never sleeps. In production configurations that enable jitter to avoid retry storms, retries will run back-to-back despite opting into real backoff sleep.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/harness/agent_loop/mod.rs (1)
763-765: 📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick winStale docstring contradicts the new opt-in sleep behavior.
The doc comment on
invoke_model_resolvingstill states the backoff duration is "intentionally not slept on," but the code below it (816-819) now callsself.policy.retry.sleep_backoff(attempt).await, which does sleep whenRetryPolicy::with_backoff_sleep(true)is set. This directly contradicts the correctly-updated module-level doc (lines 53-58).📝 Proposed fix
- /// Retries are governed by [`RunPolicy::retry`][crate::harness::runtime::RunPolicy] - /// and apply only to retryable errors (see [`is_retryable`]); each scheduled - /// retry emits [`AgentEvent::RetryScheduled`]. When retries are exhausted - /// (or the error is non-retryable) and a [`crate::harness::retry::FallbackPolicy`] - /// is configured, the next model in the chain is tried. The computed backoff - /// duration is intentionally not slept on (see the module docs). + /// Retries are governed by [`RunPolicy::retry`][crate::harness::runtime::RunPolicy] + /// and apply only to retryable errors (see [`is_retryable`]); each scheduled + /// retry emits [`AgentEvent::RetryScheduled`]. When retries are exhausted + /// (or the error is non-retryable) and a [`crate::harness::retry::FallbackPolicy`] + /// is configured, the next model in the chain is tried. The backoff is slept + /// only when the policy opts in via + /// [`RetryPolicy::with_backoff_sleep`] (see the module docs).Also applies to: 816-819
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/harness/agent_loop/mod.rs` around lines 763 - 765, The doc comment on invoke_model_resolving is stale and contradicts the new opt-in backoff sleep behavior; update that method-level documentation to match the actual retry flow. Specifically, revise the note about the computed backoff duration so it no longer says it is intentionally not slept on, and instead reflects that self.policy.retry.sleep_backoff(attempt).await may sleep when RetryPolicy::with_backoff_sleep(true) is enabled. Keep the wording consistent with the module-level docs and the retry/backoff logic in invoke_model_resolving.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/graph/compiled/mod.rs`:
- Around line 987-1033: In the failure-handling path in compiled/mod.rs around
the `failure` branch, `persist_failure_checkpoint(...)` currently uses `?`,
which can replace the original node error and skip `fail_run` entirely if
checkpoint persistence fails. Update this path so `fail_run` is always invoked
and the original `error` from `StepFailure` is preserved as the terminal
failure, while any checkpoint-write error is handled separately (for example by
logging or attaching it without overriding the root cause). Make the fix in the
`failure` branch that calls `persist_failure_checkpoint`, `fail_run`, and
returns `Err(error)` so the run still records `Failed` status and emits the
failure event even when checkpoint storage is unavailable.
In `@src/harness/retry/mod.rs`:
- Around line 69-95: sleep_backoff currently drops jitter by calling
backoff_for_attempt(), which hardcodes a zero jitter value and makes
with_jitter(true) ineffective. Update RetryPolicy::sleep_backoff to take a
caller-supplied jitter value or RNG and use it when computing the delay, so the
backoff sleep preserves the randomized timing instead of always becoming
deterministic. Keep the behavior gated by backoff_sleep and continue returning
immediately when sleeping is disabled or the computed delay is zero.
---
Outside diff comments:
In `@src/harness/agent_loop/mod.rs`:
- Around line 763-765: The doc comment on invoke_model_resolving is stale and
contradicts the new opt-in backoff sleep behavior; update that method-level
documentation to match the actual retry flow. Specifically, revise the note
about the computed backoff duration so it no longer says it is intentionally not
slept on, and instead reflects that
self.policy.retry.sleep_backoff(attempt).await may sleep when
RetryPolicy::with_backoff_sleep(true) is enabled. Keep the wording consistent
with the module-level docs and the retry/backoff logic in
invoke_model_resolving.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a58c9311-405a-4dac-bb8d-19c3bbd932c8
📒 Files selected for processing (12)
README.mddocs/modules/graph/fault-tolerance.mdexamples/resilient_graph.rssrc/graph/compiled/mod.rssrc/graph/compiled/test.rssrc/graph/compiled/types.rssrc/graph/stream/types.rssrc/harness/agent_loop/mod.rssrc/harness/middleware/library/mod.rssrc/harness/retry/mod.rssrc/harness/retry/test.rssrc/harness/retry/types.rs
| // Node-handler failure (survived any node-retry policy): the updates | ||
| // of the branches that completed before it are already folded into | ||
| // `state` above, so persist a resumable failure-boundary checkpoint | ||
| // scheduling the failed node (and the not-yet-run tail) for a later | ||
| // `resume`/`retry`, record a `Failed` status carrying the error and | ||
| // that checkpoint, and abort. Without a checkpointer/thread the | ||
| // checkpoint is a no-op and the run aborts exactly as before. | ||
| if let Some(fail) = failure { | ||
| let StepFailure { | ||
| failed_node, | ||
| completed, | ||
| pending, | ||
| error, | ||
| } = fail; | ||
| let checkpoint_id = self | ||
| .persist_failure_checkpoint( | ||
| &thread_id, | ||
| &run_id, | ||
| &state, | ||
| &pending, | ||
| &completed, | ||
| parent_checkpoint.clone(), | ||
| steps, | ||
| &failed_node, | ||
| &error, | ||
| &recursion_meta, | ||
| &child_runs_meta, | ||
| ) | ||
| .await?; | ||
| self.fail_run( | ||
| &run_id, | ||
| &thread_id, | ||
| started_at, | ||
| steps, | ||
| &error, | ||
| checkpoint_id, | ||
| ) | ||
| .await; | ||
| return Err(error); | ||
| } | ||
|
|
||
| // Interrupt: persist a checkpoint whose next nodes are the | ||
| // not-yet-completed members of this step (interrupted node first), | ||
| // then return control to the caller. | ||
| if let Some((index, emitted)) = interrupt { | ||
| if let Err(err) = self.require_interrupt_durability(&thread_id) { | ||
| self.fail_run(&run_id, &thread_id, started_at, steps, &err) | ||
| self.fail_run(&run_id, &thread_id, started_at, steps, &err, None) |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Checkpoint-persist failure masks the original node error and skips fail_run.
If persist_failure_checkpoint(...) itself returns Err (e.g. the checkpointer's put fails — a transient storage/network error), the ? at line ~1015 immediately propagates that error, bypassing fail_run entirely. Two consequences:
- The original node
error(the actual root cause) is discarded and replaced by the checkpoint-write error. - No
RunFailedevent is emitted and no terminalFailedstatus is ever recorded viasave_status, leaving any status-store observer stuck showingRunning.
This is especially ironic given the point of this code path is resilience under failure — a checkpoint write hiccup at exactly the moment a node fails will silently drop the failure lifecycle bookkeeping.
🐛 Proposed fix: always call `fail_run` and preserve the original error
- let checkpoint_id = self
- .persist_failure_checkpoint(
- &thread_id,
- &run_id,
- &state,
- &pending,
- &completed,
- parent_checkpoint.clone(),
- steps,
- &failed_node,
- &error,
- &recursion_meta,
- &child_runs_meta,
- )
- .await?;
+ let checkpoint_id = self
+ .persist_failure_checkpoint(
+ &thread_id,
+ &run_id,
+ &state,
+ &pending,
+ &completed,
+ parent_checkpoint.clone(),
+ steps,
+ &failed_node,
+ &error,
+ &recursion_meta,
+ &child_runs_meta,
+ )
+ .await
+ .unwrap_or(None);
self.fail_run(
&run_id,
&thread_id,
started_at,
steps,
&error,
checkpoint_id,
)
.await;
return Err(error);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Node-handler failure (survived any node-retry policy): the updates | |
| // of the branches that completed before it are already folded into | |
| // `state` above, so persist a resumable failure-boundary checkpoint | |
| // scheduling the failed node (and the not-yet-run tail) for a later | |
| // `resume`/`retry`, record a `Failed` status carrying the error and | |
| // that checkpoint, and abort. Without a checkpointer/thread the | |
| // checkpoint is a no-op and the run aborts exactly as before. | |
| if let Some(fail) = failure { | |
| let StepFailure { | |
| failed_node, | |
| completed, | |
| pending, | |
| error, | |
| } = fail; | |
| let checkpoint_id = self | |
| .persist_failure_checkpoint( | |
| &thread_id, | |
| &run_id, | |
| &state, | |
| &pending, | |
| &completed, | |
| parent_checkpoint.clone(), | |
| steps, | |
| &failed_node, | |
| &error, | |
| &recursion_meta, | |
| &child_runs_meta, | |
| ) | |
| .await?; | |
| self.fail_run( | |
| &run_id, | |
| &thread_id, | |
| started_at, | |
| steps, | |
| &error, | |
| checkpoint_id, | |
| ) | |
| .await; | |
| return Err(error); | |
| } | |
| // Interrupt: persist a checkpoint whose next nodes are the | |
| // not-yet-completed members of this step (interrupted node first), | |
| // then return control to the caller. | |
| if let Some((index, emitted)) = interrupt { | |
| if let Err(err) = self.require_interrupt_durability(&thread_id) { | |
| self.fail_run(&run_id, &thread_id, started_at, steps, &err) | |
| self.fail_run(&run_id, &thread_id, started_at, steps, &err, None) | |
| // Node-handler failure (survived any node-retry policy): the updates | |
| // of the branches that completed before it are already folded into | |
| // `state` above, so persist a resumable failure-boundary checkpoint | |
| // scheduling the failed node (and the not-yet-run tail) for a later | |
| // `resume`/`retry`, record a `Failed` status carrying the error and | |
| // that checkpoint, and abort. Without a checkpointer/thread the | |
| // checkpoint is a no-op and the run aborts exactly as before. | |
| if let Some(fail) = failure { | |
| let StepFailure { | |
| failed_node, | |
| completed, | |
| pending, | |
| error, | |
| } = fail; | |
| let checkpoint_id = self | |
| .persist_failure_checkpoint( | |
| &thread_id, | |
| &run_id, | |
| &state, | |
| &pending, | |
| &completed, | |
| parent_checkpoint.clone(), | |
| steps, | |
| &failed_node, | |
| &error, | |
| &recursion_meta, | |
| &child_runs_meta, | |
| ) | |
| .await | |
| .unwrap_or(None); | |
| self.fail_run( | |
| &run_id, | |
| &thread_id, | |
| started_at, | |
| steps, | |
| &error, | |
| checkpoint_id, | |
| ) | |
| .await; | |
| return Err(error); | |
| } | |
| // Interrupt: persist a checkpoint whose next nodes are the | |
| // not-yet-completed members of this step (interrupted node first), | |
| // then return control to the caller. | |
| if let Some((index, emitted)) = interrupt { | |
| if let Err(err) = self.require_interrupt_durability(&thread_id) { | |
| self.fail_run(&run_id, &thread_id, started_at, steps, &err, None) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/graph/compiled/mod.rs` around lines 987 - 1033, In the failure-handling
path in compiled/mod.rs around the `failure` branch,
`persist_failure_checkpoint(...)` currently uses `?`, which can replace the
original node error and skip `fail_run` entirely if checkpoint persistence
fails. Update this path so `fail_run` is always invoked and the original `error`
from `StepFailure` is preserved as the terminal failure, while any
checkpoint-write error is handled separately (for example by logging or
attaching it without overriding the root cause). Make the fix in the `failure`
branch that calls `persist_failure_checkpoint`, `fail_run`, and returns
`Err(error)` so the run still records `Failed` status and emits the failure
event even when checkpoint storage is unavailable.
| /// Enables or disables actually sleeping for the computed backoff between | ||
| /// retries. | ||
| /// | ||
| /// Off by default so tests stay deterministic and fast. Enable it in | ||
| /// production so a transient failure is retried after a real, growing delay | ||
| /// rather than back-to-back. See [`RetryPolicy::backoff_sleep`]. | ||
| pub fn with_backoff_sleep(mut self, sleep: bool) -> Self { | ||
| self.backoff_sleep = sleep; | ||
| self | ||
| } | ||
|
|
||
| /// Sleeps for this policy's backoff before the given retry `attempt`, but | ||
| /// only when [`RetryPolicy::backoff_sleep`] is enabled. | ||
| /// | ||
| /// A single, reusable helper so every retry loop that honors a | ||
| /// [`RetryPolicy`] gets identical, opt-in backoff behavior. A no-op (returns | ||
| /// immediately) when sleeping is disabled or the computed backoff is zero. | ||
| pub async fn sleep_backoff(&self, attempt: usize) { | ||
| if !self.backoff_sleep { | ||
| return; | ||
| } | ||
| let backoff = self.backoff_for_attempt(attempt); | ||
| if backoff > Duration::ZERO { | ||
| tokio::time::sleep(backoff).await; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check whether a rand-like crate is already a dependency.
rg -n '^rand|"rand"|rand ' Cargo.toml
rg -n 'jitter' -g '*.rs' srcRepository: tinyhumansai/tinyagents
Length of output: 1824
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo '--- Cargo.toml deps ---'
sed -n '1,220p' Cargo.toml | nl -ba | sed -n '1,220p' | rg -n 'rand|tokio|fastrand|rng|uuid|small_rng|thread_rng|getrandom'
echo
echo '--- retry mod implementation ---'
sed -n '1,220p' src/harness/retry/mod.rs | nl -ba | sed -n '1,220p'
echo
echo '--- sleep_backoff usages ---'
rg -n 'sleep_backoff\(' srcRepository: tinyhumansai/tinyagents
Length of output: 221
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo '--- Cargo.toml deps ---'
rg -n '^(rand|fastrand|getrandom|tokio|uuid)\s*=|rand\s*=|fastrand\s*=|getrandom\s*=' Cargo.toml || true
echo
echo '--- src/harness/retry/mod.rs ---'
sed -n '1,220p' src/harness/retry/mod.rs | cat -n
echo
echo '--- src/harness/retry/types.rs ---'
sed -n '1,180p' src/harness/retry/types.rs | cat -n
echo
echo '--- sleep_backoff usages ---'
rg -n 'sleep_backoff\(' srcRepository: tinyhumansai/tinyagents
Length of output: 16137
sleep_backoff must preserve jitter. backoff_for_attempt() always uses 0.0, so with_backoff_sleep(true) + with_jitter(true) turns the delay into 0ms and retries run back-to-back. Pass a caller-supplied [0, 1) value or RNG into sleep_backoff instead of using the deterministic helper here.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/harness/retry/mod.rs` around lines 69 - 95, sleep_backoff currently drops
jitter by calling backoff_for_attempt(), which hardcodes a zero jitter value and
makes with_jitter(true) ineffective. Update RetryPolicy::sleep_backoff to take a
caller-supplied jitter value or RNG and use it when computing the delay, so the
backoff sleep preserves the randomized timing instead of always becoming
deterministic. Keep the behavior gated by backoff_sleep and continue returning
immediately when sleeping is disabled or the computed delay is zero.
Summary
Makes the graph runtime and model calls survive transient network problems and be restartable / continuable on user feedback after a hard failure. The primitives mostly existed (
RetryPolicy,FallbackPolicy, checkpointing, interrupt/resume) — this wires them into the graph's node-execution and failure paths.Scope decisions (confirmed up front): full story, with a resumable-abort default on failure (not an automatic human interrupt).
What changed
1. Model backoff actually sleeps now (opt-in)
RetryPolicygains abackoff_sleepflag +with_backoff_sleep(true)builder + asyncsleep_backoff(attempt)helper. The agent-loop retry loop andRetryMiddlewarepreviously computed backoff and discarded it for test determinism — they now wait a real, growing delay when opted in. Default stays sleep-free so existing deterministic tests are unaffected.2. Graph node-level retry
CompiledGraph::with_node_retry(RetryPolicy): a node whose handler fails with a transient (Model/Tool) error is re-run from its start up to the attempt cap, emittingGraphEvent::NodeRetryScheduled. Both the sequential and parallel runners drive handlers through one sharedrun_node_with_retryhelper (which also applies the per-node timeout).3. Resumable failures (restart / continue on feedback)
next_nodes, successful branches ascompleted_tasks, error + failed node in metadata). TheFailedstatus carries that checkpoint id.CompiledGraph::retry(thread)restarts from the failure checkpoint; editing state viaupdate_statefirst lets a human continue on feedback. Non-checkpointed runs abort exactly as before.Deliberately not done: an unconditional initial (
input) checkpoint — it would break existing checkpoint-count assertions, and the failure checkpoint already makes a caught step-1 failure resumable. Only an uncaught process crash at step 1 stays unrecoverable (flagged as future work).API / behavior changes
RetryPolicy::{backoff_sleep, with_backoff_sleep, sleep_backoff},CompiledGraph::{with_node_retry, retry},GraphEvent::NodeRetryScheduled.Failedstatus carrying its id (previously the run aborted with no checkpoint). Non-checkpointed behavior is unchanged.Commands run locally
cargo fmt --check— cleancargo clippy --all-targets -- -D warnings— cleancargo test— 983 passed, 0 failedcargo run --example resilient_graph— demonstrates both mechanisms end to endDocs / examples
examples/resilient_graph.rsdocs/modules/graph/fault-tolerance.md; added resilience sections to the graph module docs +wiki/Graph-Runtime.md; listed the example inREADME.md/wiki/Examples.mdhttps://claude.ai/code/session_01NRL2bim9Gz3UN3SRtpeXEX
Summary by CodeRabbit
New Features
Documentation
Bug Fixes