Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ topology.eventlogger.executors: 0
topology.tasks: null
# maximum amount of time a message has to complete before it's considered failed
topology.message.timeout.secs: 30
# STORM-2359: when set, expire tuple trees only after this many seconds of no bolt progress,
# instead of wall-clock time since emit. Prevents live tuples from timing out under backpressure.
# Must be <= topology.message.timeout.secs to have effect. Default null = disabled (original behavior).
topology.message.progress.timeout.secs: null
topology.multilang.serializer: "org.apache.storm.multilang.JsonSerializer"
topology.shellbolt.max.pending: 100
topology.skip.missing.kryo.registrations: false
Expand Down
43 changes: 43 additions & 0 deletions docs/Guaranteeing-message-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,46 @@ There are three ways to remove reliability. The first is to set Config.TOPOLOGY_
The second way is to remove reliability on a message by message basis. You can turn off tracking for an individual spout tuple by omitting a message id in the `SpoutOutputCollector.emit` method.

Finally, if you don't care if a particular subset of the tuples downstream in the topology fail to be processed, you can emit them as unanchored tuples. Since they're not anchored to any spout tuples, they won't cause any spout tuples to fail if they aren't acked.


### Progress-based timeout (STORM-2359)

By default, `topology.message.timeout.secs` is wall-clock time measured from the moment a spout emits a tuple. This means a tuple sitting idle in a downstream bolt's receive queue under backpressure has its timeout clock running even before any bolt has touched it. Under sustained backpressure, a live and valid tuple can be failed purely because it waited too long in a queue — not because processing is stuck.

This creates an inherent tradeoff:
- **Short timeout**: orphaned tuple trees (from dead workers) are reclaimed quickly, but live tuples fail under load.
- **Long timeout**: live tuples survive backpressure, but orphan reclamation takes longer after a worker failure.

Storm 2.x introduces `topology.message.progress.timeout.secs` to address this. When set, a tuple tree is only expired if **no bolt has acked any tuple in the tree** for the configured number of seconds. A tuple tree with recent bolt activity — even one that is currently queued — will be rescued from timeout expiry as long as forward progress is being made.

**Configuration:**

```yaml
# Standard wall-clock upper-bound timeout (default: 30s) — unchanged
topology.message.timeout.secs: 120

# Progress-based timeout: expire only after this many seconds of no bolt activity.
# Set <= topology.message.timeout.secs to take effect.
topology.message.progress.timeout.secs: 60
```

With the above, a tuple tree will be expired only if no bolt has acked any part of it for 60 seconds. The wall-clock limit of 120 seconds still applies as a hard upper bound.

**What counts as progress?**

Every time a bolt calls `ack()` on a tuple, the acker receives a partial-ack message for the tree. This resets the progress clock for that tree. No changes to bolt or spout code are required.

**When to use this:**
- Topologies where processing time varies significantly between tuples (e.g. calls to external services)
- Pipelines that experience legitimate backpressure spikes
- Cases where `collector.resetTimeout()` generates too much acker traffic

**When NOT to use this:**
- When orphan reclamation latency after worker failure is critical — a fixed wall-clock timeout is more predictable
- Debugging sessions — use `topology.enable.message.timeouts: false` instead (but see the STORM-3514 warning below)

**Backward compatibility:** `topology.message.progress.timeout.secs` is `null` by default. When unset, behavior is identical to prior versions with no change to existing topologies.

### Warning: topology.enable.message.timeouts=false with max.spout.pending (STORM-3514)

Setting `topology.enable.message.timeouts: false` while `topology.max.spout.pending` is a positive integer is a dangerous combination in production. When a worker dies, the orphaned tuple trees in the spout's pending map will never be reclaimed (no timeout fires), and once the pending map fills to `max.spout.pending`, `nextTuple()` will never be called again — permanently stalling the spout. Storm will log a `WARN` message at startup if this combination is detected. This setting is only safe in debugging sessions where no worker failures are expected.
19 changes: 19 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,25 @@ public class Config extends HashMap<String, Object> {
@IsPositiveNumber
@NotNull
public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";

/**
* When set, changes tuple expiry from wall-clock-since-emit to time-since-last-bolt-progress.
* A tuple tree is only considered timed out if no bolt has acked any tuple in the tree for this
* many seconds. Tuple trees that are actively being processed (even if queued under backpressure)
* will be rescued from expiry as long as the tree is making forward progress.
*
* <p>This is an opt-in complement to {@link #TOPOLOGY_MESSAGE_TIMEOUT_SECS}, which remains as
* the hard upper-bound wall-clock timeout. Set this value less than or equal to
* {@link #TOPOLOGY_MESSAGE_TIMEOUT_SECS} for it to have effect.
*
* <p>When null (the default), behavior is identical to prior versions — the standard
* wall-clock bucket-rotation timeout applies, with no change to existing topologies.
*
* <p>See: https://github.com/apache/storm/issues/6141 (STORM-2359)
*/
@IsInteger
@IsPositiveNumber
public static final String TOPOLOGY_MESSAGE_PROGRESS_TIMEOUT_SECS = "topology.message.progress.timeout.secs";
/**
* A list of serialization registrations for Kryo ( https://github.com/EsotericSoftware/kryo ), the underlying serialization framework
* for Storm. A serialization can either be the name of a class (in which case Kryo will automatically create a serializer for the class
Expand Down
73 changes: 71 additions & 2 deletions storm-client/src/jvm/org/apache/storm/daemon/Acker.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@

package org.apache.storm.daemon;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.TupleUtils;
Expand All @@ -38,15 +42,32 @@ public class Acker implements IBolt {
private OutputCollector collector;
private RotatingMap<Object, AckObject> pending;

// Progress-based timeout fields (STORM-2359)
private boolean progressTimeoutEnabled = false;
private long progressTimeoutMs = Long.MAX_VALUE;

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM);

// STORM-2359: opt-in progress-based timeout
Number progressSecs = ObjectReader.getInt(
topoConf.get(Config.TOPOLOGY_MESSAGE_PROGRESS_TIMEOUT_SECS), null);
if (progressSecs != null) {
this.progressTimeoutEnabled = true;
this.progressTimeoutMs = progressSecs.longValue() * 1000L;
LOG.info("Progress-based timeout enabled: {} secs ({} ms)",
progressSecs, this.progressTimeoutMs);
}
}

@Override
public void execute(Tuple input) {
if (TupleUtils.isTick(input)) {
if (progressTimeoutEnabled) {
rescueRecentlyActiveEntries();
}
Map<Object, AckObject> tmp = pending.rotate();
LOG.debug("Number of timeout tuples:{}", tmp.size());
return;
Expand Down Expand Up @@ -114,19 +135,67 @@ public void cleanup() {
LOG.info("Acker: cleanup successfully");
}

/**
* STORM-2359: Before rotating the pending map, inspect the oldest bucket and re-insert
* any entries that have received a bolt ack within the progress timeout window.
* Re-inserting via {@code pending.put()} moves the entry to the head (newest) bucket,
* rescuing it from the eviction that {@code pending.rotate()} is about to perform.
*
* <p>Only entries in the oldest (last) bucket are candidates for eviction on the next
* rotate(), so we only need to scan that bucket. Entries in earlier buckets are safe
* for at least one more rotation cycle.
*
* <p>This method is only called when {@code progressTimeoutEnabled} is true.
*/
private void rescueRecentlyActiveEntries() {
long now = Time.currentTimeMillis();
// Collect keys to rescue first to avoid ConcurrentModificationException —
// pending.put() structurally modifies the bucket list.
List<Object> toRescue = new ArrayList<>();
for (Map.Entry<Object, AckObject> entry : pending.peekOldestBucket().entrySet()) {
AckObject obj = entry.getValue();
if ((now - obj.lastProgressTime) < progressTimeoutMs) {
toRescue.add(entry.getKey());
}
}
for (Object key : toRescue) {
AckObject obj = pending.get(key);
if (obj != null) {
// put() moves key to the head bucket — rescued from the upcoming rotation
pending.put(key, obj);
LOG.debug("STORM-2359: Rescued tuple tree {} from timeout; last progress {}ms ago",
key, now - obj.lastProgressTime);
}
}
if (!toRescue.isEmpty()) {
LOG.debug("STORM-2359: Rescued {} tuple tree(s) with recent progress", toRescue.size());
}
}

private long getTimeDeltaMillis(long startTimeMillis) {
return Time.currentTimeMillis() - startTimeMillis;
}

private static class AckObject {
static class AckObject {
public long val = 0L;
public long startTime = Time.currentTimeMillis();
/**
* STORM-2359: Wall-clock timestamp of the last bolt ack received for this tuple tree.
* Updated by {@link #updateAck(Long)} on every partial ack from any bolt in the tree.
* Used by {@link Acker#rescueRecentlyActiveEntries()} to determine whether the tree
* has made forward progress recently and should be rescued from timeout expiry.
*
* <p>Initialized to {@code startTime} so a newly-emitted tree is not immediately
* considered stale.
*/
public long lastProgressTime = Time.currentTimeMillis();
public int spoutTask = -1;
public boolean failed = false;

// val xor value
// val xor value; also records that progress was made on this tree
public void updateAck(Long value) {
val = Utils.bitXor(val, value);
lastProgressTime = Time.currentTimeMillis();
}
}
}
16 changes: 16 additions & 0 deletions storm-client/src/jvm/org/apache/storm/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,22 @@ protected void setupTicks(boolean isSpout) {
if (tickTimeSecs != null) {
boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
boolean isAcker = Acker.ACKER_COMPONENT_ID.equals(componentId);

// STORM-3514: warn about a dangerous config combination — disabling message timeouts
// while max.spout.pending is set causes orphaned tuple trees (from dead workers) to
// accumulate in the spout's pending map indefinitely, stalling nextTuple() forever.
if (!enableMessageTimeout && isSpout) {
Integer maxPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0);
if (maxPending != null && maxPending > 0) {
LOG.warn("topology.enable.message.timeouts=false with topology.max.spout.pending={} detected "
+ "on spout executor {}:{}. Orphaned tuple trees from dead workers will never be "
+ "reclaimed and the spout will stall permanently once max.spout.pending is reached. "
+ "This combination is only safe in debugging sessions with no worker failures. "
+ "See STORM-3514 (https://github.com/apache/storm/issues/7296).",
maxPending, componentId, executorId);
}
}

if ((!isAcker && Utils.isSystemId(componentId))
|| (!enableMessageTimeout && isSpout)
|| (!enableMessageTimeout && isAcker)) {
Expand Down
19 changes: 19 additions & 0 deletions storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

package org.apache.storm.utils;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -111,6 +112,24 @@ public int size() {
return size;
}

/**
* Returns a read-only view of the oldest (soonest-to-expire) bucket without removing it.
*
* <p>The oldest bucket is the one that will be evicted on the next call to {@link #rotate()}.
* This method is used by {@link org.apache.storm.daemon.Acker} to inspect which entries are
* about to expire, so that entries with recent progress can be rescued (re-inserted into the
* head bucket) before rotation evicts them.
*
* <p>The returned map is unmodifiable. The caller must not retain a reference across calls
* to {@link #rotate()} or {@link #put(Object, Object)}, as the underlying bucket will be
* replaced. This class is not thread-safe; callers must ensure single-threaded access.
*
* @return unmodifiable view of the oldest bucket
*/
public Map<K, V> peekOldestBucket() {
return Collections.unmodifiableMap(buckets.getLast());
}

public interface ExpiredCallback<K, V> {
void expire(K key, V val);
}
Expand Down
Loading
Loading