From 3a39aca87a775f6a8eb15c0e5218ac1d16d6f3ce Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Fri, 12 Jun 2026 23:30:10 +0200 Subject: [PATCH 1/5] feat: add DetachedTuple, a serializable Tuple snapshot A TupleImpl references its topology context and cannot be serialized, so a tuple emitted as a value inside another tuple breaks as soon as it crosses a worker boundary. DetachedTuple snapshots the source component, task, stream, output fields and values of a tuple into a self-contained, Kryo-registered representation that survives serialization. --- .../serialization/SerializationFactory.java | 2 + .../org/apache/storm/tuple/DetachedTuple.java | 262 ++++++++++++++++++ .../apache/storm/tuple/DetachedTupleTest.java | 120 ++++++++ 3 files changed, 384 insertions(+) create mode 100644 storm-client/src/jvm/org/apache/storm/tuple/DetachedTuple.java create mode 100644 storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java diff --git a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java index eb734aee340..b53f8375f24 100644 --- a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java +++ b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java @@ -40,6 +40,7 @@ import org.apache.storm.serialization.types.HashSetSerializer; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.trident.tuple.ConsList; +import org.apache.storm.tuple.DetachedTuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.ListDelegate; import org.apache.storm.utils.ReflectionUtils; @@ -77,6 +78,7 @@ public static Kryo getKryo(Map conf) { k.register(ConsList.class); k.register(BackPressureStatus.class); k.register(NodeInfo.class); + k.register(DetachedTuple.class); synchronized (loader) { for (SerializationRegister sr : loader) { diff --git a/storm-client/src/jvm/org/apache/storm/tuple/DetachedTuple.java b/storm-client/src/jvm/org/apache/storm/tuple/DetachedTuple.java new file mode 100644 index 00000000000..c3635fbc4b3 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/tuple/DetachedTuple.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.tuple; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.task.GeneralTopologyContext; + +/** + * A self-contained, serializable copy of a {@link Tuple} that is detached from the topology context. + * + *

A regular {@link TupleImpl} holds a reference to the {@link GeneralTopologyContext} it was created in, which cannot be + * serialized. A {@code DetachedTuple} instead snapshots the source component, task, stream, output fields and values of the + * original tuple, so it can be emitted as a value inside another tuple and cross worker boundaries (see STORM-4000). + * + *

Detached tuples are unanchored: {@link #getMessageId()} always returns an unanchored message id, and + * {@link #getContext()} throws {@link UnsupportedOperationException} since no topology context is available. + * + *

Unlike {@link TupleImpl}, which uses identity-based equality, two detached tuples are equal if they snapshot + * the same source metadata, fields and values, so a detached tuple keeps comparing equal after a + * serialization round-trip. + */ +public class DetachedTuple implements Tuple, Serializable { + private static final long serialVersionUID = 1L; + + private final String srcComponent; + private final int srcTask; + private final String srcStream; + private final List fieldNames; + private final List values; + private transient Fields fields; + + /** + * Creates a detached copy of the given tuple. The tuple must still be attached to its topology context, since the + * output fields of the source component are resolved through it. + * + * @param tuple the tuple to detach + */ + public DetachedTuple(Tuple tuple) { + this.srcComponent = tuple.getSourceComponent(); + this.srcTask = tuple.getSourceTask(); + this.srcStream = tuple.getSourceStreamId(); + this.fieldNames = new ArrayList<>(tuple.getFields().toList()); + this.values = new ArrayList<>(tuple.getValues()); + } + + /** + * No-arg constructor for serialization frameworks only. + */ + private DetachedTuple() { + this.srcComponent = null; + this.srcTask = 0; + this.srcStream = null; + this.fieldNames = null; + this.values = null; + } + + @Override + public int size() { + return values.size(); + } + + @Override + public boolean contains(String field) { + return getFields().contains(field); + } + + @Override + public Fields getFields() { + if (fields == null) { + fields = new Fields(fieldNames); + } + return fields; + } + + @Override + public int fieldIndex(String field) { + return getFields().fieldIndex(field); + } + + @Override + public List select(Fields selector) { + return getFields().select(selector, values); + } + + @Override + public Object getValue(int i) { + return values.get(i); + } + + @Override + public String getString(int i) { + return (String) values.get(i); + } + + @Override + public Integer getInteger(int i) { + return (Integer) values.get(i); + } + + @Override + public Long getLong(int i) { + return (Long) values.get(i); + } + + @Override + public Boolean getBoolean(int i) { + return (Boolean) values.get(i); + } + + @Override + public Short getShort(int i) { + return (Short) values.get(i); + } + + @Override + public Byte getByte(int i) { + return (Byte) values.get(i); + } + + @Override + public Double getDouble(int i) { + return (Double) values.get(i); + } + + @Override + public Float getFloat(int i) { + return (Float) values.get(i); + } + + @Override + public byte[] getBinary(int i) { + return (byte[]) values.get(i); + } + + @Override + public Object getValueByField(String field) { + return values.get(fieldIndex(field)); + } + + @Override + public String getStringByField(String field) { + return (String) values.get(fieldIndex(field)); + } + + @Override + public Integer getIntegerByField(String field) { + return (Integer) values.get(fieldIndex(field)); + } + + @Override + public Long getLongByField(String field) { + return (Long) values.get(fieldIndex(field)); + } + + @Override + public Boolean getBooleanByField(String field) { + return (Boolean) values.get(fieldIndex(field)); + } + + @Override + public Short getShortByField(String field) { + return (Short) values.get(fieldIndex(field)); + } + + @Override + public Byte getByteByField(String field) { + return (Byte) values.get(fieldIndex(field)); + } + + @Override + public Double getDoubleByField(String field) { + return (Double) values.get(fieldIndex(field)); + } + + @Override + public Float getFloatByField(String field) { + return (Float) values.get(fieldIndex(field)); + } + + @Override + public byte[] getBinaryByField(String field) { + return (byte[]) values.get(fieldIndex(field)); + } + + @Override + public List getValues() { + return values; + } + + @Override + public GlobalStreamId getSourceGlobalStreamId() { + return new GlobalStreamId(srcComponent, srcStream); + } + + @Override + public String getSourceComponent() { + return srcComponent; + } + + @Override + public int getSourceTask() { + return srcTask; + } + + @Override + public String getSourceStreamId() { + return srcStream; + } + + @Override + public MessageId getMessageId() { + return MessageId.makeUnanchored(); + } + + @Override + public GeneralTopologyContext getContext() { + throw new UnsupportedOperationException("A detached tuple has no topology context"); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DetachedTuple)) { + return false; + } + DetachedTuple other = (DetachedTuple) o; + return srcTask == other.srcTask + && Objects.equals(srcComponent, other.srcComponent) + && Objects.equals(srcStream, other.srcStream) + && Objects.equals(fieldNames, other.fieldNames) + && Objects.equals(values, other.values); + } + + @Override + public int hashCode() { + return Objects.hash(srcComponent, srcTask, srcStream, fieldNames, values); + } + + @Override + public String toString() { + return "source: " + srcComponent + ":" + srcTask + + ", stream: " + srcStream + + ", fields: " + fieldNames + + ", " + values; + } +} diff --git a/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java b/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java new file mode 100644 index 00000000000..9a6914d9fd9 --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.tuple; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.apache.storm.task.GeneralTopologyContext; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for {@link DetachedTuple}. + */ +public class DetachedTupleTest { + + private Tuple sourceTuple; + private DetachedTuple detached; + + private GeneralTopologyContext getContext(final Fields fields) { + TopologyBuilder builder = new TopologyBuilder(); + return new GeneralTopologyContext( + builder.createTopology(), new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { + + @Override + public Fields getComponentOutputFields(String componentId, String streamId) { + return fields; + } + + }; + } + + @BeforeEach + public void setUp() { + Fields fields = new Fields("id", "ts"); + sourceTuple = new TupleImpl(getContext(fields), new Values(42, 1000L), "srcComponent", 7, "srcStream"); + detached = new DetachedTuple(sourceTuple); + } + + @Test + public void testSnapshotsSourceMetadata() { + assertEquals("srcComponent", detached.getSourceComponent()); + assertEquals(7, detached.getSourceTask()); + assertEquals("srcStream", detached.getSourceStreamId()); + assertEquals(new GlobalStreamId("srcComponent", "srcStream"), detached.getSourceGlobalStreamId()); + } + + @Test + public void testSnapshotsFieldsAndValues() { + assertEquals(2, detached.size()); + assertEquals(Arrays.asList("id", "ts"), detached.getFields().toList()); + assertEquals(sourceTuple.getValues(), detached.getValues()); + assertTrue(detached.contains("id")); + assertEquals(1, detached.fieldIndex("ts")); + assertEquals(42, detached.getValue(0)); + assertEquals(Integer.valueOf(42), detached.getIntegerByField("id")); + assertEquals(Long.valueOf(1000L), detached.getLongByField("ts")); + assertEquals(Arrays.asList(1000L), detached.select(new Fields("ts"))); + } + + @Test + public void testIsUnanchoredAndDetachedFromContext() { + assertEquals(MessageId.makeUnanchored(), detached.getMessageId()); + assertThrows(UnsupportedOperationException.class, () -> detached.getContext()); + } + + @Test + public void testValueBasedEquality() { + DetachedTuple other = new DetachedTuple(sourceTuple); + assertEquals(detached, other); + assertEquals(detached.hashCode(), other.hashCode()); + + Tuple differentTuple = new TupleImpl(getContext(new Fields("id", "ts")), new Values(43, 1000L), "srcComponent", 7, "srcStream"); + assertNotEquals(detached, new DetachedTuple(differentTuple)); + } + + /** + * STORM-4000 regression: a tuple emitted on the late tuple stream must survive Kryo serialization. With + * {@link Config#TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION} disabled, serializing a {@link TupleImpl} (with its + * attached topology context) fails, while a {@link DetachedTuple} round-trips cleanly. + */ + @Test + public void testKryoRoundTripWithoutJavaFallback() { + Map conf = Utils.readDefaultConfig(); + conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false); + + byte[] serialized = new KryoValuesSerializer(conf).serialize(new Values(detached)); + List roundTripped = new KryoValuesDeserializer(conf).deserialize(serialized); + + assertInstanceOf(DetachedTuple.class, roundTripped.get(0)); + DetachedTuple deserialized = (DetachedTuple) roundTripped.get(0); + assertEquals(detached, deserialized); + // Fields are rebuilt lazily after deserialization + assertEquals(Arrays.asList("id", "ts"), deserialized.getFields().toList()); + assertEquals(Long.valueOf(1000L), deserialized.getLongByField("ts")); + } +} From 4fdd1dc35796c0f6b439cb693947647e826b9f06 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Fri, 12 Jun 2026 23:47:04 +0200 Subject: [PATCH 2/5] fix: emit late tuples as DetachedTuple in WindowedBoltExecutor The late tuple stream emitted the original TupleImpl as a value of the outgoing tuple. TupleImpl references the topology context, which cannot be serialized, so any consumer of the late tuple stream running in a different worker failed with a KryoException. Emit a DetachedTuple copy instead, which survives serialization while still exposing the Tuple interface to consumers. --- .../storm/topology/WindowedBoltExecutor.java | 8 ++- .../PersistentWindowedBoltExecutorTest.java | 6 ++- .../topology/WindowedBoltExecutorTest.java | 52 ++++++++++++++++++- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java index 4713e95d39b..7f7f2321cd5 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java @@ -33,6 +33,7 @@ import org.apache.storm.task.IOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.DetachedTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; @@ -61,6 +62,10 @@ * An {@link IWindowedBolt} wrapper that does the windowing of tuples. */ public class WindowedBoltExecutor implements IRichBolt { + /** + * Name of the field carrying a late tuple on the late tuple stream. The value is a {@link DetachedTuple}, + * a serializable copy of the original tuple detached from the topology context. + */ public static final String LATE_TUPLE_FIELD = "late_tuple"; private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class); private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s @@ -310,7 +315,8 @@ public void execute(Tuple input) { windowManager.add(input, ts); } else { if (lateTupleStream != null) { - windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); + // emit a detached copy: the original tuple references the topology context and cannot be serialized + windowedOutputCollector.emit(lateTupleStream, input, new Values(new DetachedTuple(input))); } else { LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts); } diff --git a/storm-client/test/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutorTest.java b/storm-client/test/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutorTest.java index 18f3b43f857..7343ac98408 100644 --- a/storm-client/test/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutorTest.java +++ b/storm-client/test/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutorTest.java @@ -28,6 +28,8 @@ import org.apache.storm.streams.Pair; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.DetachedTuple; +import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.windowing.Event; @@ -136,6 +138,8 @@ public void testExecuteLatetuple() { Mockito.when(mockWaterMarkEventGenerator.track(Mockito.any(), Mockito.anyLong())).thenReturn(false); Mockito.when(mockTimestampExtractor.extractTimestamp(Mockito.any())).thenReturn(tupleTs); Tuple mockTuple = Mockito.mock(Tuple.class); + Mockito.when(mockTuple.getFields()).thenReturn(new Fields("ts")); + Mockito.when(mockTuple.getValues()).thenReturn(new Values(tupleTs)); executor.initState(null); executor.waterMarkEventGenerator = mockWaterMarkEventGenerator; executor.execute(mockTuple); @@ -147,7 +151,7 @@ public void testExecuteLatetuple() { .emit(stringCaptor.capture(), anchorCaptor.capture(), valuesCaptor.capture()); assertEquals(LATE_STREAM, stringCaptor.getValue()); assertEquals(Collections.singletonList(mockTuple), anchorCaptor.getValue()); - assertEquals(new Values(mockTuple), valuesCaptor.getValue()); + assertEquals(new Values(new DetachedTuple(mockTuple)), valuesCaptor.getValue()); } @Test diff --git a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java index 2f028fc8f46..98c8be8de49 100644 --- a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java +++ b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java @@ -22,23 +22,29 @@ import org.apache.storm.Config; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.Grouping; +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.DetachedTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; import org.apache.storm.windowing.TupleWindow; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -215,7 +221,51 @@ public void testExecuteWithLateTupleStream() { } System.out.println(testWindowedBolt.tupleWindows); Tuple tuple = tuples.get(tuples.size() - 1); - Mockito.verify(outputCollector).emit("$late", Collections.singletonList(tuple), new Values(tuple)); + Mockito.verify(outputCollector).emit("$late", Collections.singletonList(tuple), new Values(new DetachedTuple(tuple))); + } + + @Test + public void testLateTupleStreamEmitsSerializableTuple() throws Exception { + testWindowedBolt = new TestWindowedBolt(); + testWindowedBolt.withTimestampField("ts"); + executor = new WindowedBoltExecutor(testWindowedBolt); + TopologyContext context = getTopologyContext(); + Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default", "$late"))); + + OutputCollector outputCollector = Mockito.mock(OutputCollector.class); + Map conf = new HashMap<>(); + conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000); + conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20); + conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10); + conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late"); + conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5); + //Trigger manually to avoid timing issues + conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 1_000_000); + executor.prepare(conf, context, outputCollector); + + long[] timestamps = { 603, 605, 607, 618, 626, 636, 600 }; + Tuple lateTuple = null; + for (long ts : timestamps) { + lateTuple = getTuple("s1", new Fields("ts"), new Values(ts), "s1Src"); + executor.execute(lateTuple); + executor.waterMarkEventGenerator.run(); + } + + ArgumentCaptor> valuesCaptor = ArgumentCaptor.forClass(List.class); + Mockito.verify(outputCollector).emit(Mockito.eq("$late"), Mockito.anyCollection(), valuesCaptor.capture()); + Object lateValue = valuesCaptor.getValue().get(0); + assertInstanceOf(DetachedTuple.class, lateValue); + DetachedTuple detached = (DetachedTuple) lateValue; + assertEquals(lateTuple.getValues(), detached.getValues()); + assertEquals(lateTuple.getSourceComponent(), detached.getSourceComponent()); + assertEquals(lateTuple.getSourceStreamId(), detached.getSourceStreamId()); + + // STORM-4000 regression: the late tuple must survive Kryo serialization without java fallback + Map serConf = Utils.readDefaultConfig(); + serConf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false); + byte[] serialized = new KryoValuesSerializer(serConf).serialize(new Values(lateValue)); + List roundTripped = new KryoValuesDeserializer(serConf).deserialize(serialized); + assertEquals(detached, roundTripped.get(0)); } @Test From 50becd9e521c1eaa777f6d40b019457bba6378a2 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Fri, 12 Jun 2026 23:47:51 +0200 Subject: [PATCH 3/5] docs: document late tuple stream serialization contract Document that the value of the late tuple field is a DetachedTuple, a serializable copy of the original tuple detached from the topology context, in the windowing guide and in the withLateTupleStream javadoc of BaseWindowedBolt, TumblingWindows and SlidingWindows. --- docs/Windowing.md | 4 +++- .../org/apache/storm/streams/windowing/SlidingWindows.java | 3 +++ .../org/apache/storm/streams/windowing/TumblingWindows.java | 3 +++ .../jvm/org/apache/storm/topology/base/BaseWindowedBolt.java | 3 +++ 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/Windowing.md b/docs/Windowing.md index bf625d2603a..5ed190d1f91 100644 --- a/docs/Windowing.md +++ b/docs/Windowing.md @@ -188,7 +188,9 @@ public BaseWindowedBolt withLateTupleStream(String streamId) ``` This behaviour can be changed by specifying the above `streamId`. In this case late tuples are going to be emitted on the specified stream and accessible -via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`. +via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`. The value of this field is a `org.apache.storm.tuple.DetachedTuple`, a serializable copy of the +original tuple (source component, task, stream, fields and values) detached from the topology context, so that the late tuple stream can also be +consumed by bolts running in different workers. ### Watermarks diff --git a/storm-client/src/jvm/org/apache/storm/streams/windowing/SlidingWindows.java b/storm-client/src/jvm/org/apache/storm/streams/windowing/SlidingWindows.java index 428433dd636..254c3e18c8e 100644 --- a/storm-client/src/jvm/org/apache/storm/streams/windowing/SlidingWindows.java +++ b/storm-client/src/jvm/org/apache/storm/streams/windowing/SlidingWindows.java @@ -104,6 +104,9 @@ public SlidingWindows withTimestampField(String fieldName) { * org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. It must be defined on a per-component basis, and in * conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * + *

The late tuple is emitted as a {@link org.apache.storm.tuple.DetachedTuple}, a serializable copy of the original + * tuple detached from the topology context, so it can be consumed by bolts running in other workers. + * * @param streamId the name of the stream used to emit late tuples on */ public SlidingWindows withLateTupleStream(String streamId) { diff --git a/storm-client/src/jvm/org/apache/storm/streams/windowing/TumblingWindows.java b/storm-client/src/jvm/org/apache/storm/streams/windowing/TumblingWindows.java index faec6bf0db0..c06f29d9cc8 100644 --- a/storm-client/src/jvm/org/apache/storm/streams/windowing/TumblingWindows.java +++ b/storm-client/src/jvm/org/apache/storm/streams/windowing/TumblingWindows.java @@ -74,6 +74,9 @@ public TumblingWindows withTimestampField(String fieldName) { * org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. It must be defined on a per-component basis, and in * conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * + *

The late tuple is emitted as a {@link org.apache.storm.tuple.DetachedTuple}, a serializable copy of the original + * tuple detached from the topology context, so it can be consumed by bolts running in other workers. + * * @param streamId the name of the stream used to emit late tuples on */ public TumblingWindows withLateTupleStream(String streamId) { diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java index f411e87dbdd..318700d5d58 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java +++ b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java @@ -193,6 +193,9 @@ public TimestampExtractor getTimestampExtractor() { * org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. It must be defined on a per-component basis, and in * conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * + *

The late tuple is emitted as a {@link org.apache.storm.tuple.DetachedTuple}, a serializable copy of the original + * tuple detached from the topology context, so it can be consumed by bolts running in other workers. + * * @param streamId the name of the stream used to emit late tuples on */ public BaseWindowedBolt withLateTupleStream(String streamId) { From 5baee16ea4b996b7edf3f562206a9ac25569af9e Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Tue, 16 Jun 2026 10:34:32 +0200 Subject: [PATCH 4/5] test: cover null values, typed getters, equals and transient fields in DetachedTuple Address review feedback on PR #8790: - serialize/deserialize a DetachedTuple holding a null value - exercise getShort, getByte and getBoolean accessors - assert equality against null and non-DetachedTuple instances - round-trip a DetachedTuple whose lazy transient Fields was already built, locking in that the transient field is skipped and rebuilt --- .../apache/storm/tuple/DetachedTupleTest.java | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java b/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java index 9a6914d9fd9..6d56f143142 100644 --- a/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java +++ b/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java @@ -27,8 +27,10 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -97,6 +99,67 @@ public void testValueBasedEquality() { assertNotEquals(detached, new DetachedTuple(differentTuple)); } + @Test + public void testNotEqualToNullOrOtherType() { + assertFalse(detached.equals(null)); + assertFalse(detached.equals("not a tuple")); + assertFalse(detached.equals(sourceTuple)); + } + + @Test + public void testTypedGetters() { + Fields fields = new Fields("bool", "byte", "short"); + Tuple typed = new TupleImpl(getContext(fields), new Values(true, (byte) 1, (short) 2), "srcComponent", 0, "srcStream"); + DetachedTuple typedDetached = new DetachedTuple(typed); + + assertEquals(Boolean.TRUE, typedDetached.getBoolean(0)); + assertEquals(Byte.valueOf((byte) 1), typedDetached.getByte(1)); + assertEquals(Short.valueOf((short) 2), typedDetached.getShort(2)); + assertEquals(Boolean.TRUE, typedDetached.getBooleanByField("bool")); + assertEquals(Byte.valueOf((byte) 1), typedDetached.getByteByField("byte")); + assertEquals(Short.valueOf((short) 2), typedDetached.getShortByField("short")); + } + + @Test + public void testKryoRoundTripWithNullValue() { + Fields fields = new Fields("id", "ts"); + Tuple withNull = new TupleImpl(getContext(fields), new Values(42, null), "srcComponent", 7, "srcStream"); + DetachedTuple detachedWithNull = new DetachedTuple(withNull); + + Map conf = Utils.readDefaultConfig(); + conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false); + + byte[] serialized = new KryoValuesSerializer(conf).serialize(new Values(detachedWithNull)); + List roundTripped = new KryoValuesDeserializer(conf).deserialize(serialized); + + DetachedTuple deserialized = (DetachedTuple) roundTripped.get(0); + assertEquals(detachedWithNull, deserialized); + assertNull(deserialized.getValue(1)); + assertNull(deserialized.getLongByField("ts")); + } + + /** + * The {@code fields} member is transient and rebuilt lazily. Populate it via {@link DetachedTuple#getFields()} + * before serializing so the transient-skip path is actually exercised: the deserialized copy must rebuild its + * fields from scratch rather than carry a serialized {@link Fields} instance. + */ + @Test + public void testKryoRoundTripWithPopulatedFields() { + // force the lazy transient Fields to be built before serialization + detached.getFields(); + + Map conf = Utils.readDefaultConfig(); + conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false); + + byte[] serialized = new KryoValuesSerializer(conf).serialize(new Values(detached)); + List roundTripped = new KryoValuesDeserializer(conf).deserialize(serialized); + + DetachedTuple deserialized = (DetachedTuple) roundTripped.get(0); + assertEquals(detached, deserialized); + assertEquals(Arrays.asList("id", "ts"), deserialized.getFields().toList()); + assertEquals(Long.valueOf(1000L), deserialized.getLongByField("ts")); + } + /** * STORM-4000 regression: a tuple emitted on the late tuple stream must survive Kryo serialization. With * {@link Config#TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION} disabled, serializing a {@link TupleImpl} (with its From c55a572bee9c1f64e501a69adf5fbd5f5c7bb4d9 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Tue, 16 Jun 2026 10:54:37 +0200 Subject: [PATCH 5/5] docs: document late tuple value behavior change The value emitted on the late tuple stream changed from the raw input TupleImpl to a DetachedTuple. Call this out in the windowing guide so users consuming the late stream know it is unanchored, has no topology context (getContext() throws) and can no longer be cast to TupleImpl. --- docs/Windowing.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/Windowing.md b/docs/Windowing.md index 5ed190d1f91..ea8b1cae428 100644 --- a/docs/Windowing.md +++ b/docs/Windowing.md @@ -192,6 +192,11 @@ via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`. The value of this field i original tuple (source component, task, stream, fields and values) detached from the topology context, so that the late tuple stream can also be consumed by bolts running in different workers. +> **Behavior change:** previously the value in `LATE_TUPLE_FIELD` was the original input tuple (`TupleImpl`), which could not be serialized and therefore +> only worked when the late tuple stream was consumed in the same worker. It is now a `DetachedTuple`. Read it through the `Tuple` interface as before; +> however the detached tuple is unanchored and carries no topology context, so `getContext()` throws `UnsupportedOperationException` and the value can no +> longer be cast to `TupleImpl`. + ### Watermarks For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is