Skip to content

STORM-2359: Add progress-based tuple timeout (topology.message.progress.timeout.secs)#8787

Draft
Gowtham-Gowts wants to merge 7 commits into
apache:masterfrom
Gowtham-Gowts:STORM-2359
Draft

STORM-2359: Add progress-based tuple timeout (topology.message.progress.timeout.secs)#8787
Gowtham-Gowts wants to merge 7 commits into
apache:masterfrom
Gowtham-Gowts:STORM-2359

Conversation

@Gowtham-Gowts

Copy link
Copy Markdown

Summary

Adds topology.message.progress.timeout.secs — an opt-in config that changes
tuple expiry from wall-clock-since-emit to time-since-last-bolt-progress.

When set, a tuple tree is only expired if no bolt has acked any tuple in the
tree for N seconds. Tuple trees actively being processed (even if queued under
backpressure) are rescued from RotatingMap rotation as long as forward progress
is being made.

topology.message.timeout.secs remains the hard upper-bound, unchanged.

Motivation

Raised on dev@ (June 2026):
https://lists.apache.org/thread/tjcso52j1tjk6d344vjfmkd9ngy6k2rb

Users face an inherent tradeoff:

  • Short timeout → orphans reclaimed quickly, but live tuples fail under backpressure
  • Long timeout → backpressure-safe, but orphan reclamation is slow after worker failure

This config dissolves that tradeoff. Discussed with rzo1 on the issue tracker:
#6141

Changes

File Change
Config.java New TOPOLOGY_MESSAGE_PROGRESS_TIMEOUT_SECS constant
Acker.java lastProgressTime in AckObject; rescueRecentlyActiveEntries()
RotatingMap.java peekOldestBucket() package-private method
Executor.java WARN log for STORM-3514 dangerous config combo
AckerProgressTimeoutTest.java 5 new unit tests
RotatingMapPeekTest.java 6 new unit tests
conf/defaults.yaml null default entry with comment
docs/Guaranteeing-message-processing.md New section

Backward Compatibility

  • Off by defaulttopology.message.progress.timeout.secs is null.
    Zero behavior change for all existing topologies.
  • No Thrift IDL changes
  • No new stream IDs
  • No bolt or spout API changes
  • Memory cost: +8 bytes per in-flight tuple tree (one long field)

Related Issues

@rzo1

rzo1 commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

@GGraziadei would appreciate your look for a review here too. Thx.

Adds topology.message.progress.timeout.secs as a new optional topology
configuration. When set, tuple trees will only be expired if no bolt
has acked any tuple in the tree for the configured number of seconds,
rather than expiring on wall-clock time since emit.

Default is null (disabled), preserving existing behavior for all
topologies that do not set this config.
Adds a package-private method that returns an unmodifiable view of the
oldest (soonest-to-expire) bucket without removing it or triggering
the expiry callback.

This is used by Acker.rescueRecentlyActiveEntries() to inspect which
tuple trees are about to be evicted on the next rotate(), so that
trees with recent bolt activity can be re-inserted (rescued) before
rotation removes them.

The returned map is unmodifiable and callers must not retain references
across calls to rotate() or put().
Core implementation of STORM-2359.

Changes to Acker:
- AckObject gains a lastProgressTime field (8 bytes per in-flight tree)
  initialized to startTime and updated on every updateAck() call. This
  records the wall-clock time of the most recent partial bolt ack for
  the tree.
- prepare() reads topology.message.progress.timeout.secs from topoConf.
  When set, progressTimeoutEnabled=true and progressTimeoutMs is set.
- execute() on tick: when progressTimeoutEnabled, calls
  rescueRecentlyActiveEntries() before pending.rotate().
- rescueRecentlyActiveEntries(): scans the oldest bucket (the one about
  to be evicted by rotate()), collects entries whose lastProgressTime is
  within the progress window, and re-inserts them via pending.put().
  put() moves the key to the head bucket, rescuing it from eviction.
  Keys are collected into a list first to avoid ConcurrentModification.

AckObject visibility changed from private static to package-private
static to allow direct access from unit tests.

When topology.message.progress.timeout.secs is null (default), behavior
is identical to the original implementation — rescueRecentlyActiveEntries
is never called and there is no overhead on the tick path.

Memory cost: +8 bytes per in-flight tuple tree (lastProgressTime long).
At 1M concurrent trees that is ~8MB, within normal heap budgets.

No Thrift IDL changes. No new stream IDs. No bolt/spout API changes.
Fully backward compatible.

Related: apache#6141 (STORM-2359)
Related: apache#7296 (STORM-3514)
Adds a WARN log when topology.enable.message.timeouts=false is combined
with a positive topology.max.spout.pending on a spout executor.

This combination causes orphaned tuple trees from dead workers to
accumulate in the spout pending map forever. Once the map fills to
max.spout.pending, nextTuple() is never called again — permanently
stalling the spout. This behavior was reported in STORM-3514 (closed
Won't Fix) and re-surfaced in the dev@ mailing list thread from 2026.

The warning fires at executor setup time so operators see it in the
worker logs on topology startup, before the problem manifests.

See: apache#7296 (STORM-3514)
AckerProgressTimeoutTest (5 tests):
- testFeatureDisabledByDefault_treeExpiresOnWallClockRotation: verifies
  no behavior change when config is unset
- testProgressRescuesActiveTreeFromExpiry: tree with recent bolt ack
  must survive rotation
- testNoProgressExpiresTree: tree with no bolt activity must still expire
  (orphan reclamation preserved)
- testFullAckCompletesTreeNormally: normal completion (XOR==0) must still
  emit ACK to spout with progress timeout enabled
- testProgressTimeoutLargerThanWallClockIsEffectivelyNoop: setting
  progress timeout > wall-clock timeout does not break anything

RotatingMapPeekTest (6 tests):
- testPeekOldestBucketEmptyOnCreation
- testPeekOldestBucketDoesNotContainFreshlyPutKey
- testPeekOldestBucketContainsKeyAfterRotations
- testPeekOldestBucketAfterReinsertion: verifies rescue pattern works
- testPeekOldestBucketIsUnmodifiable: returned map must throw on mutation
- testPeekOldestBucketAfterEviction: evicted key not visible

All tests use Time.SimulatedTime for deterministic time-based assertions,
following Storm's testing best practice (DEVELOPER.md).
Acker (org.apache.storm.daemon) calls peekOldestBucket() on RotatingMap
(org.apache.storm.utils). Package-private visibility does not work across
packages — method must be public.
- conf/defaults.yaml: add null default entry with explanatory comment
- docs/Guaranteeing-message-processing.md: add new section explaining
  the progress-based timeout feature, when to use it, configuration
  example, and the STORM-3514 dangerous config warning
@rzo1 rzo1 added this to the 3.0.0 milestone Jun 12, 2026
@GGraziadei

Copy link
Copy Markdown
Contributor

Hi @Gowtham-Gowts,
I went through the change carefully and also exercised it on a LocalCluster. My conclusion is that, as currntly wired, the feature does not change the tuple fate: a tuple making continuous progress is still failed at the wall-clock topology.message.timeout.secs.

I think the design needs one more piece to actually work, and the scope should be stated more narrowly.
Message timeouts are not enforced by the acker's pending map, they are enforced on the spout
side.

  • SpoutExecutor holds its own RotatingMap pending (2 buckets, this foundamental)
  • The acker's pending map is created with no callback; when it evicts a tree it simply drops the XOR state to free memory. It never fails and never signals the spout.

So re-inserting an entry into the acker's own map has no effect on whether the spout fails the
tuple. The spout independently times out at message.timeout.secs and replays. The only mechanism
that extends the spout's timer is ACKER_RESET_TIMEOUT_STREAM_ID.
This PR never emits it.

A second, structural reason the rescue can't bite today: both maps are ticked at
message.timeout.secs (StormCommon.java L267 for the acker, L282 for the spout), but the acker
has 3 buckets, and the spout has 2
: The spout always fails before the acker would ever evict.

The acker's extra bucket is already the safety margin that guarantees the acker doesn't forget a tree, the spout still tracks, so the rescue extends retention of trees the spout has, by then, already abandoned.

Original STORM-2359 design

https://issues.apache.org/jira/browse/STORM-2359 (there are lots of attachments)
This is worth aligning with, since the issue history covers exactly this ground. In the 2017/2019
discussion, @srdo prototyped "reinserting tuples in the acker's pending upon acks" (this PR's
rescue) and reported no throughput regression, so the primitive is sound. But the full design moved
the timeout into the acker and added a periodic spout acker sync (anchor-id diff) so the
spout could still detect loss; the acker-internal reset alone was understood not to reach the spout's
pending. The work was abandoned due to performance (critical-path tracking, memory) and the
sync-protocol complexity, i.e. precisely the parts this PR omits.

@rzo1, this looks like an architectural block rather than a bug fix. The rescue mechanism can't reach the spout's pending map as currently wired. Please let me know your thoughts on how to align this with the broader design or the previous STORM-2359 findings/discussion.

@rzo1

rzo1 commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Please let me know your thoughts on how to align this with the broader design or the previous STORM-2359 findings/discussion.

I would suppose that there was no bigger discussion rather a question if this can be picked up again. Sometimes it is just better to discuss on a PR proposal instead of an issue, so you actually can see the intention of a change. I am open to re-start the discussion regarding architecture (which would be most likely sth related to a 4.x or sth.)

Didn't had a look at the PR yet (since busy with other stuff recently), but I would trust your judgement here ;-)

@rzo1 rzo1 removed this from the 3.0.0 milestone Jun 12, 2026
@rzo1 rzo1 marked this pull request as draft June 12, 2026 14:34
@srdo

srdo commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Hi. Not to involve myself too much in the current design discussion, but I just wanted to add a bit of context that is missing from the original ticket, so you don't draw the wrong conclusions from the discussion we had there :)

As a reminder, here is the original design:

The idea is to track the anchor ids of each non-system message that enters an executor in/out queue. For the inbound queue, the anchor is no longer in progress when the associated tuple is acked or failed. For the outbound queue (pendingEmits in the Executor), the anchor is no longer in progress when the associated tuple gets flushed from pendingEmits.

Occasionally a thread will check the set of in-progress anchors for the worker and send reset messages for all of them to the relevant ackers. In order to avoid sending too many messages, this thread snapshots the anchor set when it runs, and only sends reset messages for anchors that have been in progress sufficiently long in that worker.

Since there may be more than one tuple per anchor, anchors are tracked as a count in a multiset, rather than just presence in a set.

The concern at the time was that adding this tracking of anchor ids would be too expensive. The reason we needed it was that the executor in/out queues did not support looking at the queue contents from the resetter thread, so we needed separate tracking of the anchors, because the resetter thread would be unable to look at the contents of those queues directly.

The reason it was abandoned was partially that I lost interest, and partially that there was a side discussion going on regarding redesigning the acking infrastructure entirely in https://issues.apache.org/jira/browse/STORM-3314, which would have required an alternative approach.

That side discussion was abandoned later though.

Support for looking at the executor queues was added to JCTools later, in JCTools/JCTools#229. Ignore the title, the change is actually adding iterator support, not an unorderedSnapshot method. With those changes, it's possible the original design could be tractable now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants