STORM-2359: Add progress-based tuple timeout (topology.message.progress.timeout.secs)#8787
STORM-2359: Add progress-based tuple timeout (topology.message.progress.timeout.secs)#8787Gowtham-Gowts wants to merge 7 commits into
Conversation
|
@GGraziadei would appreciate your look for a review here too. Thx. |
Adds topology.message.progress.timeout.secs as a new optional topology configuration. When set, tuple trees will only be expired if no bolt has acked any tuple in the tree for the configured number of seconds, rather than expiring on wall-clock time since emit. Default is null (disabled), preserving existing behavior for all topologies that do not set this config.
Adds a package-private method that returns an unmodifiable view of the oldest (soonest-to-expire) bucket without removing it or triggering the expiry callback. This is used by Acker.rescueRecentlyActiveEntries() to inspect which tuple trees are about to be evicted on the next rotate(), so that trees with recent bolt activity can be re-inserted (rescued) before rotation removes them. The returned map is unmodifiable and callers must not retain references across calls to rotate() or put().
Core implementation of STORM-2359. Changes to Acker: - AckObject gains a lastProgressTime field (8 bytes per in-flight tree) initialized to startTime and updated on every updateAck() call. This records the wall-clock time of the most recent partial bolt ack for the tree. - prepare() reads topology.message.progress.timeout.secs from topoConf. When set, progressTimeoutEnabled=true and progressTimeoutMs is set. - execute() on tick: when progressTimeoutEnabled, calls rescueRecentlyActiveEntries() before pending.rotate(). - rescueRecentlyActiveEntries(): scans the oldest bucket (the one about to be evicted by rotate()), collects entries whose lastProgressTime is within the progress window, and re-inserts them via pending.put(). put() moves the key to the head bucket, rescuing it from eviction. Keys are collected into a list first to avoid ConcurrentModification. AckObject visibility changed from private static to package-private static to allow direct access from unit tests. When topology.message.progress.timeout.secs is null (default), behavior is identical to the original implementation — rescueRecentlyActiveEntries is never called and there is no overhead on the tick path. Memory cost: +8 bytes per in-flight tuple tree (lastProgressTime long). At 1M concurrent trees that is ~8MB, within normal heap budgets. No Thrift IDL changes. No new stream IDs. No bolt/spout API changes. Fully backward compatible. Related: apache#6141 (STORM-2359) Related: apache#7296 (STORM-3514)
Adds a WARN log when topology.enable.message.timeouts=false is combined with a positive topology.max.spout.pending on a spout executor. This combination causes orphaned tuple trees from dead workers to accumulate in the spout pending map forever. Once the map fills to max.spout.pending, nextTuple() is never called again — permanently stalling the spout. This behavior was reported in STORM-3514 (closed Won't Fix) and re-surfaced in the dev@ mailing list thread from 2026. The warning fires at executor setup time so operators see it in the worker logs on topology startup, before the problem manifests. See: apache#7296 (STORM-3514)
AckerProgressTimeoutTest (5 tests): - testFeatureDisabledByDefault_treeExpiresOnWallClockRotation: verifies no behavior change when config is unset - testProgressRescuesActiveTreeFromExpiry: tree with recent bolt ack must survive rotation - testNoProgressExpiresTree: tree with no bolt activity must still expire (orphan reclamation preserved) - testFullAckCompletesTreeNormally: normal completion (XOR==0) must still emit ACK to spout with progress timeout enabled - testProgressTimeoutLargerThanWallClockIsEffectivelyNoop: setting progress timeout > wall-clock timeout does not break anything RotatingMapPeekTest (6 tests): - testPeekOldestBucketEmptyOnCreation - testPeekOldestBucketDoesNotContainFreshlyPutKey - testPeekOldestBucketContainsKeyAfterRotations - testPeekOldestBucketAfterReinsertion: verifies rescue pattern works - testPeekOldestBucketIsUnmodifiable: returned map must throw on mutation - testPeekOldestBucketAfterEviction: evicted key not visible All tests use Time.SimulatedTime for deterministic time-based assertions, following Storm's testing best practice (DEVELOPER.md).
Acker (org.apache.storm.daemon) calls peekOldestBucket() on RotatingMap (org.apache.storm.utils). Package-private visibility does not work across packages — method must be public.
- conf/defaults.yaml: add null default entry with explanatory comment - docs/Guaranteeing-message-processing.md: add new section explaining the progress-based timeout feature, when to use it, configuration example, and the STORM-3514 dangerous config warning
eb98f66 to
05506e7
Compare
|
Hi @Gowtham-Gowts, I think the design needs one more piece to actually work, and the scope should be stated more narrowly.
So re-inserting an entry into the acker's own map has no effect on whether the spout fails the A second, structural reason the rescue can't bite today: both maps are ticked at The acker's extra bucket is already the safety margin that guarantees the acker doesn't forget a tree, the spout still tracks, so the rescue extends retention of trees the spout has, by then, already abandoned. Original STORM-2359 designhttps://issues.apache.org/jira/browse/STORM-2359 (there are lots of attachments) @rzo1, this looks like an architectural block rather than a bug fix. The rescue mechanism can't reach the spout's pending map as currently wired. Please let me know your thoughts on how to align this with the broader design or the previous STORM-2359 findings/discussion. |
I would suppose that there was no bigger discussion rather a question if this can be picked up again. Sometimes it is just better to discuss on a PR proposal instead of an issue, so you actually can see the intention of a change. I am open to re-start the discussion regarding architecture (which would be most likely sth related to a 4.x or sth.) Didn't had a look at the PR yet (since busy with other stuff recently), but I would trust your judgement here ;-) |
|
Hi. Not to involve myself too much in the current design discussion, but I just wanted to add a bit of context that is missing from the original ticket, so you don't draw the wrong conclusions from the discussion we had there :) As a reminder, here is the original design:
The concern at the time was that adding this tracking of anchor ids would be too expensive. The reason we needed it was that the executor in/out queues did not support looking at the queue contents from the resetter thread, so we needed separate tracking of the anchors, because the resetter thread would be unable to look at the contents of those queues directly. The reason it was abandoned was partially that I lost interest, and partially that there was a side discussion going on regarding redesigning the acking infrastructure entirely in https://issues.apache.org/jira/browse/STORM-3314, which would have required an alternative approach. That side discussion was abandoned later though. Support for looking at the executor queues was added to JCTools later, in JCTools/JCTools#229. Ignore the title, the change is actually adding iterator support, not an |
Summary
Adds
topology.message.progress.timeout.secs— an opt-in config that changestuple expiry from wall-clock-since-emit to time-since-last-bolt-progress.
When set, a tuple tree is only expired if no bolt has acked any tuple in the
tree for N seconds. Tuple trees actively being processed (even if queued under
backpressure) are rescued from RotatingMap rotation as long as forward progress
is being made.
topology.message.timeout.secsremains the hard upper-bound, unchanged.Motivation
Raised on dev@ (June 2026):
https://lists.apache.org/thread/tjcso52j1tjk6d344vjfmkd9ngy6k2rb
Users face an inherent tradeoff:
This config dissolves that tradeoff. Discussed with rzo1 on the issue tracker:
#6141
Changes
Config.javaTOPOLOGY_MESSAGE_PROGRESS_TIMEOUT_SECSconstantAcker.javalastProgressTimeinAckObject;rescueRecentlyActiveEntries()RotatingMap.javapeekOldestBucket()package-private methodExecutor.javaAckerProgressTimeoutTest.javaRotatingMapPeekTest.javaconf/defaults.yamldocs/Guaranteeing-message-processing.mdBackward Compatibility
topology.message.progress.timeout.secsisnull.Zero behavior change for all existing topologies.
longfield)Related Issues