diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 6f55baea6e0..41fbcaf267c 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -280,6 +280,10 @@ topology.eventlogger.executors: 0 topology.tasks: null # maximum amount of time a message has to complete before it's considered failed topology.message.timeout.secs: 30 +# STORM-2359: when set, expire tuple trees only after this many seconds of no bolt progress, +# instead of wall-clock time since emit. Prevents live tuples from timing out under backpressure. +# Must be <= topology.message.timeout.secs to have effect. Default null = disabled (original behavior). +topology.message.progress.timeout.secs: null topology.multilang.serializer: "org.apache.storm.multilang.JsonSerializer" topology.shellbolt.max.pending: 100 topology.skip.missing.kryo.registrations: false diff --git a/docs/Guaranteeing-message-processing.md b/docs/Guaranteeing-message-processing.md index fc49f3faf26..7782fce9604 100644 --- a/docs/Guaranteeing-message-processing.md +++ b/docs/Guaranteeing-message-processing.md @@ -179,3 +179,46 @@ There are three ways to remove reliability. The first is to set Config.TOPOLOGY_ The second way is to remove reliability on a message by message basis. You can turn off tracking for an individual spout tuple by omitting a message id in the `SpoutOutputCollector.emit` method. Finally, if you don't care if a particular subset of the tuples downstream in the topology fail to be processed, you can emit them as unanchored tuples. Since they're not anchored to any spout tuples, they won't cause any spout tuples to fail if they aren't acked. + + +### Progress-based timeout (STORM-2359) + +By default, `topology.message.timeout.secs` is wall-clock time measured from the moment a spout emits a tuple. This means a tuple sitting idle in a downstream bolt's receive queue under backpressure has its timeout clock running even before any bolt has touched it. Under sustained backpressure, a live and valid tuple can be failed purely because it waited too long in a queue — not because processing is stuck. + +This creates an inherent tradeoff: +- **Short timeout**: orphaned tuple trees (from dead workers) are reclaimed quickly, but live tuples fail under load. +- **Long timeout**: live tuples survive backpressure, but orphan reclamation takes longer after a worker failure. + +Storm 2.x introduces `topology.message.progress.timeout.secs` to address this. When set, a tuple tree is only expired if **no bolt has acked any tuple in the tree** for the configured number of seconds. A tuple tree with recent bolt activity — even one that is currently queued — will be rescued from timeout expiry as long as forward progress is being made. + +**Configuration:** + +```yaml +# Standard wall-clock upper-bound timeout (default: 30s) — unchanged +topology.message.timeout.secs: 120 + +# Progress-based timeout: expire only after this many seconds of no bolt activity. +# Set <= topology.message.timeout.secs to take effect. +topology.message.progress.timeout.secs: 60 +``` + +With the above, a tuple tree will be expired only if no bolt has acked any part of it for 60 seconds. The wall-clock limit of 120 seconds still applies as a hard upper bound. + +**What counts as progress?** + +Every time a bolt calls `ack()` on a tuple, the acker receives a partial-ack message for the tree. This resets the progress clock for that tree. No changes to bolt or spout code are required. + +**When to use this:** +- Topologies where processing time varies significantly between tuples (e.g. calls to external services) +- Pipelines that experience legitimate backpressure spikes +- Cases where `collector.resetTimeout()` generates too much acker traffic + +**When NOT to use this:** +- When orphan reclamation latency after worker failure is critical — a fixed wall-clock timeout is more predictable +- Debugging sessions — use `topology.enable.message.timeouts: false` instead (but see the STORM-3514 warning below) + +**Backward compatibility:** `topology.message.progress.timeout.secs` is `null` by default. When unset, behavior is identical to prior versions with no change to existing topologies. + +### Warning: topology.enable.message.timeouts=false with max.spout.pending (STORM-3514) + +Setting `topology.enable.message.timeouts: false` while `topology.max.spout.pending` is a positive integer is a dangerous combination in production. When a worker dies, the orphaned tuple trees in the spout's pending map will never be reclaimed (no timeout fires), and once the pending map fills to `max.spout.pending`, `nextTuple()` will never be called again — permanently stalling the spout. Storm will log a `WARN` message at startup if this combination is detected. This setting is only safe in debugging sessions where no worker failures are expected. diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 8d3fc51ccdd..5e26e0894d9 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -499,6 +499,25 @@ public class Config extends HashMap { @IsPositiveNumber @NotNull public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs"; + + /** + * When set, changes tuple expiry from wall-clock-since-emit to time-since-last-bolt-progress. + * A tuple tree is only considered timed out if no bolt has acked any tuple in the tree for this + * many seconds. Tuple trees that are actively being processed (even if queued under backpressure) + * will be rescued from expiry as long as the tree is making forward progress. + * + *

This is an opt-in complement to {@link #TOPOLOGY_MESSAGE_TIMEOUT_SECS}, which remains as + * the hard upper-bound wall-clock timeout. Set this value less than or equal to + * {@link #TOPOLOGY_MESSAGE_TIMEOUT_SECS} for it to have effect. + * + *

When null (the default), behavior is identical to prior versions — the standard + * wall-clock bucket-rotation timeout applies, with no change to existing topologies. + * + *

See: https://github.com/apache/storm/issues/6141 (STORM-2359) + */ + @IsInteger + @IsPositiveNumber + public static final String TOPOLOGY_MESSAGE_PROGRESS_TIMEOUT_SECS = "topology.message.progress.timeout.secs"; /** * A list of serialization registrations for Kryo ( https://github.com/EsotericSoftware/kryo ), the underlying serialization framework * for Storm. A serialization can either be the name of a class (in which case Kryo will automatically create a serializer for the class diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java index 336957af2bb..e2963c08039 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java @@ -12,13 +12,17 @@ package org.apache.storm.daemon; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.task.IBolt; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.RotatingMap; import org.apache.storm.utils.Time; import org.apache.storm.utils.TupleUtils; @@ -38,15 +42,32 @@ public class Acker implements IBolt { private OutputCollector collector; private RotatingMap pending; + // Progress-based timeout fields (STORM-2359) + private boolean progressTimeoutEnabled = false; + private long progressTimeoutMs = Long.MAX_VALUE; + @Override public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM); + + // STORM-2359: opt-in progress-based timeout + Number progressSecs = ObjectReader.getInt( + topoConf.get(Config.TOPOLOGY_MESSAGE_PROGRESS_TIMEOUT_SECS), null); + if (progressSecs != null) { + this.progressTimeoutEnabled = true; + this.progressTimeoutMs = progressSecs.longValue() * 1000L; + LOG.info("Progress-based timeout enabled: {} secs ({} ms)", + progressSecs, this.progressTimeoutMs); + } } @Override public void execute(Tuple input) { if (TupleUtils.isTick(input)) { + if (progressTimeoutEnabled) { + rescueRecentlyActiveEntries(); + } Map tmp = pending.rotate(); LOG.debug("Number of timeout tuples:{}", tmp.size()); return; @@ -114,19 +135,67 @@ public void cleanup() { LOG.info("Acker: cleanup successfully"); } + /** + * STORM-2359: Before rotating the pending map, inspect the oldest bucket and re-insert + * any entries that have received a bolt ack within the progress timeout window. + * Re-inserting via {@code pending.put()} moves the entry to the head (newest) bucket, + * rescuing it from the eviction that {@code pending.rotate()} is about to perform. + * + *

Only entries in the oldest (last) bucket are candidates for eviction on the next + * rotate(), so we only need to scan that bucket. Entries in earlier buckets are safe + * for at least one more rotation cycle. + * + *

This method is only called when {@code progressTimeoutEnabled} is true. + */ + private void rescueRecentlyActiveEntries() { + long now = Time.currentTimeMillis(); + // Collect keys to rescue first to avoid ConcurrentModificationException — + // pending.put() structurally modifies the bucket list. + List toRescue = new ArrayList<>(); + for (Map.Entry entry : pending.peekOldestBucket().entrySet()) { + AckObject obj = entry.getValue(); + if ((now - obj.lastProgressTime) < progressTimeoutMs) { + toRescue.add(entry.getKey()); + } + } + for (Object key : toRescue) { + AckObject obj = pending.get(key); + if (obj != null) { + // put() moves key to the head bucket — rescued from the upcoming rotation + pending.put(key, obj); + LOG.debug("STORM-2359: Rescued tuple tree {} from timeout; last progress {}ms ago", + key, now - obj.lastProgressTime); + } + } + if (!toRescue.isEmpty()) { + LOG.debug("STORM-2359: Rescued {} tuple tree(s) with recent progress", toRescue.size()); + } + } + private long getTimeDeltaMillis(long startTimeMillis) { return Time.currentTimeMillis() - startTimeMillis; } - private static class AckObject { + static class AckObject { public long val = 0L; public long startTime = Time.currentTimeMillis(); + /** + * STORM-2359: Wall-clock timestamp of the last bolt ack received for this tuple tree. + * Updated by {@link #updateAck(Long)} on every partial ack from any bolt in the tree. + * Used by {@link Acker#rescueRecentlyActiveEntries()} to determine whether the tree + * has made forward progress recently and should be rescued from timeout expiry. + * + *

Initialized to {@code startTime} so a newly-emitted tree is not immediately + * considered stale. + */ + public long lastProgressTime = Time.currentTimeMillis(); public int spoutTask = -1; public boolean failed = false; - // val xor value + // val xor value; also records that progress was made on this tree public void updateAck(Long value) { val = Utils.bitXor(val, value); + lastProgressTime = Time.currentTimeMillis(); } } } diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index edd77743fcd..8a820586145 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -508,6 +508,22 @@ protected void setupTicks(boolean isSpout) { if (tickTimeSecs != null) { boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS); boolean isAcker = Acker.ACKER_COMPONENT_ID.equals(componentId); + + // STORM-3514: warn about a dangerous config combination — disabling message timeouts + // while max.spout.pending is set causes orphaned tuple trees (from dead workers) to + // accumulate in the spout's pending map indefinitely, stalling nextTuple() forever. + if (!enableMessageTimeout && isSpout) { + Integer maxPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0); + if (maxPending != null && maxPending > 0) { + LOG.warn("topology.enable.message.timeouts=false with topology.max.spout.pending={} detected " + + "on spout executor {}:{}. Orphaned tuple trees from dead workers will never be " + + "reclaimed and the spout will stall permanently once max.spout.pending is reached. " + + "This combination is only safe in debugging sessions with no worker failures. " + + "See STORM-3514 (https://github.com/apache/storm/issues/7296).", + maxPending, componentId, executorId); + } + } + if ((!isAcker && Utils.isSystemId(componentId)) || (!enableMessageTimeout && isSpout) || (!enableMessageTimeout && isAcker)) { diff --git a/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java b/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java index 6283b508bfe..f175b7a2535 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java +++ b/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java @@ -12,6 +12,7 @@ package org.apache.storm.utils; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -111,6 +112,24 @@ public int size() { return size; } + /** + * Returns a read-only view of the oldest (soonest-to-expire) bucket without removing it. + * + *

The oldest bucket is the one that will be evicted on the next call to {@link #rotate()}. + * This method is used by {@link org.apache.storm.daemon.Acker} to inspect which entries are + * about to expire, so that entries with recent progress can be rescued (re-inserted into the + * head bucket) before rotation evicts them. + * + *

The returned map is unmodifiable. The caller must not retain a reference across calls + * to {@link #rotate()} or {@link #put(Object, Object)}, as the underlying bucket will be + * replaced. This class is not thread-safe; callers must ensure single-threaded access. + * + * @return unmodifiable view of the oldest bucket + */ + public Map peekOldestBucket() { + return Collections.unmodifiableMap(buckets.getLast()); + } + public interface ExpiredCallback { void expire(K key, V val); } diff --git a/storm-client/test/jvm/org/apache/storm/daemon/AckerProgressTimeoutTest.java b/storm-client/test/jvm/org/apache/storm/daemon/AckerProgressTimeoutTest.java new file mode 100644 index 00000000000..87658ed98df --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/daemon/AckerProgressTimeoutTest.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.daemon; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for STORM-2359: progress-based tuple timeout. + * + *

All tests use {@link Time.SimulatedTime} so time advances are deterministic + * and do not depend on wall-clock. This follows the Storm testing best practice + * documented in DEVELOPER.md. + */ +public class AckerProgressTimeoutTest { + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + /** Build a minimal topology config with progress timeout enabled. */ + private Map confWithProgressTimeout(int progressSecs) { + Map conf = new HashMap<>(); + conf.put(Config.TOPOLOGY_MESSAGE_PROGRESS_TIMEOUT_SECS, progressSecs); + return conf; + } + + /** Build a minimal topology config with no progress timeout (default behavior). */ + private Map confDefault() { + return new HashMap<>(); + } + + /** + * Create and prepare an Acker with the given config. + * Returns a {@link TestAckerHarness} that bundles the acker and its mocked + * OutputCollector so tests can inspect emitted messages. + */ + private TestAckerHarness prepareAcker(Map conf) { + Acker acker = new Acker(); + OutputCollector collector = mock(OutputCollector.class); + TopologyContext context = mock(TopologyContext.class); + acker.prepare(conf, context, collector); + return new TestAckerHarness(acker, collector); + } + + /** Simulate ACKER_INIT: spout emitting tuple tree with id=treeId, xorVal, spoutTask. */ + private Tuple makeInitTuple(long treeId, long xorVal, int spoutTask) { + Tuple t = mock(Tuple.class); + when(t.getSourceStreamId()).thenReturn(Acker.ACKER_INIT_STREAM_ID); + when(t.getValue(0)).thenReturn(treeId); + when(t.getLong(1)).thenReturn(xorVal); + when(t.getInteger(2)).thenReturn(spoutTask); + return t; + } + + /** Simulate ACKER_ACK: bolt acking its portion of tree id=treeId, xorVal. */ + private Tuple makeAckTuple(long treeId, long xorVal) { + Tuple t = mock(Tuple.class); + when(t.getSourceStreamId()).thenReturn(Acker.ACKER_ACK_STREAM_ID); + when(t.getValue(0)).thenReturn(treeId); + when(t.getLong(1)).thenReturn(xorVal); + return t; + } + + /** Simulate a tick tuple (drives RotatingMap rotation). */ + private Tuple makeTickTuple() { + Tuple t = mock(Tuple.class); + when(t.getSourceStreamId()).thenReturn(org.apache.storm.Constants.SYSTEM_TICK_STREAM_ID); + // TupleUtils.isTick checks the source component + when(t.getSourceComponent()).thenReturn(org.apache.storm.Constants.SYSTEM_COMPONENT_ID); + return t; + } + + // ----------------------------------------------------------------------- + // Test 1: Feature disabled by default — existing behavior unchanged + // ----------------------------------------------------------------------- + + /** + * When topology.message.progress.timeout.secs is not set, the acker must + * behave identically to the original implementation: tuple trees expire + * after TIMEOUT_BUCKET_NUM rotations regardless of recent progress. + */ + @Test + public void testFeatureDisabledByDefault_treeExpiresOnWallClockRotation() { + try (SimulatedTime ignored = new SimulatedTime()) { + TestAckerHarness h = prepareAcker(confDefault()); + + long treeId = 42L; + long xorSeed = 1234L; + int spoutTask = 7; + + // Spout emits + h.acker.execute(makeInitTuple(treeId, xorSeed, spoutTask)); + + // Advance time and rotate enough times to evict from RotatingMap + // (TIMEOUT_BUCKET_NUM - 1 rotations evict the oldest bucket) + for (int i = 0; i < Acker.TIMEOUT_BUCKET_NUM; i++) { + h.acker.execute(makeTickTuple()); + } + + // The tree should have been evicted (no ack or fail was ever called — + // the ExpiredCallback fires and routes to failSpoutMsg via the spout, + // but from the acker's perspective the entry is gone from pending). + // We verify that emitDirect was NOT called with ACK_STREAM (tree was not completed). + verify(h.collector, never()).emitDirect(anyInt(), + eq(Acker.ACKER_ACK_STREAM_ID), any()); + } + } + + // ----------------------------------------------------------------------- + // Test 2: Progress rescues a tree from expiry + // ----------------------------------------------------------------------- + + /** + * With progress timeout enabled, a tree that receives a bolt ack within + * the progress window must NOT be evicted on rotation — it should be + * rescued (re-inserted into the head bucket). + */ + @Test + public void testProgressRescuesActiveTreeFromExpiry() { + try (SimulatedTime ignored = new SimulatedTime()) { + int progressSecs = 30; + TestAckerHarness h = prepareAcker(confWithProgressTimeout(progressSecs)); + + long treeId = 100L; + long xorSeed = 999L; + int spoutTask = 3; + + // Spout emits tree + h.acker.execute(makeInitTuple(treeId, xorSeed, spoutTask)); + + // Advance time close to (but within) the progress window + Time.advanceTimeSecs(progressSecs - 5); + + // Bolt makes partial progress — this updates lastProgressTime + h.acker.execute(makeAckTuple(treeId, xorSeed ^ 500L)); + + // Now advance time far enough that wall-clock bucket rotation would + // normally evict the tree, but progress just happened + Time.advanceTimeSecs(progressSecs - 1); + + // Rotate — rescueRecentlyActiveEntries() should move tree to head bucket + h.acker.execute(makeTickTuple()); + + // Tree should still be alive — not yet failed/timed-out + verify(h.collector, never()).emitDirect(anyInt(), + eq(Acker.ACKER_FAIL_STREAM_ID), any()); + } + } + + // ----------------------------------------------------------------------- + // Test 3: No progress causes expiry + // ----------------------------------------------------------------------- + + /** + * With progress timeout enabled, a tree that receives NO bolt acks within + * the progress window must still be evicted (orphan behavior is preserved). + */ + @Test + public void testNoProgressExpiresTree() { + try (SimulatedTime ignored = new SimulatedTime()) { + int progressSecs = 10; + TestAckerHarness h = prepareAcker(confWithProgressTimeout(progressSecs)); + + long treeId = 200L; + long xorSeed = 777L; + int spoutTask = 5; + + // Spout emits tree + h.acker.execute(makeInitTuple(treeId, xorSeed, spoutTask)); + + // Advance time well past the progress timeout WITHOUT any bolt acks + Time.advanceTimeSecs(progressSecs + 5); + + // Rotate — tree has no recent progress, should NOT be rescued + h.acker.execute(makeTickTuple()); + + // Rotate again enough times for the oldest bucket to be evicted + for (int i = 0; i < Acker.TIMEOUT_BUCKET_NUM; i++) { + h.acker.execute(makeTickTuple()); + } + + // The tree has expired (no ack emitted, no rescue) + verify(h.collector, never()).emitDirect(anyInt(), + eq(Acker.ACKER_ACK_STREAM_ID), any()); + } + } + + // ----------------------------------------------------------------------- + // Test 4: Full tree completion still works with progress timeout enabled + // ----------------------------------------------------------------------- + + /** + * When a tree is fully acked (XOR val == 0) with progress timeout enabled, + * the acker must emit ACK to the spout exactly as before — progress timeout + * must not interfere with normal completion. + */ + @Test + public void testFullAckCompletesTreeNormally() { + try (SimulatedTime ignored = new SimulatedTime()) { + TestAckerHarness h = prepareAcker(confWithProgressTimeout(60)); + + long treeId = 300L; + long xorSeed = 888L; + int spoutTask = 2; + + // Spout emits + h.acker.execute(makeInitTuple(treeId, xorSeed, spoutTask)); + + // Bolt acks with the same xorVal — XOR to zero means tree complete + h.acker.execute(makeAckTuple(treeId, xorSeed)); + + // Acker should have sent ACK to spout + verify(h.collector).emitDirect(eq(spoutTask), + eq(Acker.ACKER_ACK_STREAM_ID), any()); + } + } + + // ----------------------------------------------------------------------- + // Test 5: progressTimeoutMs > wall-clock timeout is effectively a no-op + // ----------------------------------------------------------------------- + + /** + * If progress timeout > message timeout, every tree will expire on the + * wall-clock rotation before progress timeout can rescue it. + * Behavior should be identical to no progress timeout set. + */ + @Test + public void testProgressTimeoutLargerThanWallClockIsEffectivelyNoop() { + try (SimulatedTime ignored = new SimulatedTime()) { + // Wall-clock timeout (simulated via rotations) < progress timeout + // Effectively the wall-clock mechanism always fires first. + TestAckerHarness h = prepareAcker(confWithProgressTimeout(9999)); + + long treeId = 400L; + long xorSeed = 111L; + int spoutTask = 9; + + h.acker.execute(makeInitTuple(treeId, xorSeed, spoutTask)); + + // Make progress well within progress window but past bucket rotation + Time.advanceTimeSecs(5); + h.acker.execute(makeAckTuple(treeId, xorSeed ^ 50L)); + + // Even though progress happened, after enough rotations the entry will + // be rescued into the head bucket (progress is recent) — demonstrating + // that the rescue logic itself works regardless of the absolute values. + h.acker.execute(makeTickTuple()); + + // No ACK to spout — tree not complete yet + verify(h.collector, never()).emitDirect(anyInt(), + eq(Acker.ACKER_ACK_STREAM_ID), any()); + } + } + + // ----------------------------------------------------------------------- + // Inner harness class + // ----------------------------------------------------------------------- + + private static class TestAckerHarness { + final Acker acker; + final OutputCollector collector; + + TestAckerHarness(Acker acker, OutputCollector collector) { + this.acker = acker; + this.collector = collector; + } + } +} diff --git a/storm-client/test/jvm/org/apache/storm/utils/RotatingMapPeekTest.java b/storm-client/test/jvm/org/apache/storm/utils/RotatingMapPeekTest.java new file mode 100644 index 00000000000..74e8484de81 --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/utils/RotatingMapPeekTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.utils; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link RotatingMap#peekOldestBucket()}, added for STORM-2359. + */ +public class RotatingMapPeekTest { + + /** + * A freshly constructed RotatingMap should have an empty oldest bucket. + */ + @Test + public void testPeekOldestBucketEmptyOnCreation() { + RotatingMap map = new RotatingMap<>(3); + Map oldest = map.peekOldestBucket(); + assertTrue(oldest.isEmpty(), "Oldest bucket should be empty on a new RotatingMap"); + } + + /** + * After putting a key, it lands in the head (newest) bucket, not the oldest. + * The oldest bucket should remain empty until a rotation occurs. + */ + @Test + public void testPeekOldestBucketDoesNotContainFreshlyPutKey() { + RotatingMap map = new RotatingMap<>(3); + map.put("key1", 1); + Map oldest = map.peekOldestBucket(); + assertTrue(oldest.isEmpty(), + "Freshly put key should be in head bucket, not oldest bucket"); + } + + /** + * After (numBuckets - 1) rotations, the key that was put before the first + * rotation should appear in the oldest bucket. + */ + @Test + public void testPeekOldestBucketContainsKeyAfterRotations() { + int numBuckets = 3; + RotatingMap map = new RotatingMap<>(numBuckets); + + map.put("key1", 42); + + // After (numBuckets - 1) rotations the key reaches the oldest bucket + for (int i = 0; i < numBuckets - 1; i++) { + map.rotate(); + } + + Map oldest = map.peekOldestBucket(); + assertTrue(oldest.containsKey("key1"), + "key1 should be in oldest bucket after " + (numBuckets - 1) + " rotations"); + assertEquals(42, oldest.get("key1")); + } + + /** + * After a key migrates to the oldest bucket, calling put() on it (which + * moves it back to the head bucket) should cause peekOldestBucket() to + * no longer contain it. + */ + @Test + public void testPeekOldestBucketAfterReinsertion() { + int numBuckets = 3; + RotatingMap map = new RotatingMap<>(numBuckets); + + map.put("key1", 10); + + for (int i = 0; i < numBuckets - 1; i++) { + map.rotate(); + } + + // Confirm key is in oldest bucket + assertTrue(map.peekOldestBucket().containsKey("key1")); + + // Re-insert — this is what rescueRecentlyActiveEntries() does in Acker + map.put("key1", 10); + + // Oldest bucket should no longer contain key1 + assertTrue(map.peekOldestBucket().isEmpty() || !map.peekOldestBucket().containsKey("key1"), + "key1 should have been moved out of oldest bucket after re-insertion"); + } + + /** + * The returned map must be unmodifiable — attempting mutation must throw. + */ + @Test + public void testPeekOldestBucketIsUnmodifiable() { + int numBuckets = 3; + RotatingMap map = new RotatingMap<>(numBuckets); + map.put("key1", 1); + + for (int i = 0; i < numBuckets - 1; i++) { + map.rotate(); + } + + Map oldest = map.peekOldestBucket(); + assertThrows(UnsupportedOperationException.class, + () -> oldest.put("hacked", 999), + "peekOldestBucket() must return an unmodifiable view"); + } + + /** + * After a full rotation (numBuckets rotations), an entry that was never + * re-inserted should be evicted and no longer visible via peekOldestBucket. + */ + @Test + public void testPeekOldestBucketAfterEviction() { + int numBuckets = 3; + RotatingMap map = new RotatingMap<>(numBuckets); + + map.put("key1", 5); + + // One extra rotation evicts key1 + for (int i = 0; i < numBuckets; i++) { + map.rotate(); + } + + // key1 is now gone from all buckets, oldest bucket should be empty + assertTrue(map.peekOldestBucket().isEmpty(), + "Oldest bucket should be empty after key has been evicted"); + } +}