diff --git a/docs/Windowing.md b/docs/Windowing.md index bf625d2603a..ea8b1cae428 100644 --- a/docs/Windowing.md +++ b/docs/Windowing.md @@ -188,7 +188,14 @@ public BaseWindowedBolt withLateTupleStream(String streamId) ``` This behaviour can be changed by specifying the above `streamId`. In this case late tuples are going to be emitted on the specified stream and accessible -via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`. +via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`. The value of this field is a `org.apache.storm.tuple.DetachedTuple`, a serializable copy of the +original tuple (source component, task, stream, fields and values) detached from the topology context, so that the late tuple stream can also be +consumed by bolts running in different workers. + +> **Behavior change:** previously the value in `LATE_TUPLE_FIELD` was the original input tuple (`TupleImpl`), which could not be serialized and therefore +> only worked when the late tuple stream was consumed in the same worker. It is now a `DetachedTuple`. Read it through the `Tuple` interface as before; +> however the detached tuple is unanchored and carries no topology context, so `getContext()` throws `UnsupportedOperationException` and the value can no +> longer be cast to `TupleImpl`. ### Watermarks diff --git a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java index eb734aee340..b53f8375f24 100644 --- a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java +++ b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java @@ -40,6 +40,7 @@ import org.apache.storm.serialization.types.HashSetSerializer; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.trident.tuple.ConsList; +import org.apache.storm.tuple.DetachedTuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.ListDelegate; import org.apache.storm.utils.ReflectionUtils; @@ -77,6 +78,7 @@ public static Kryo getKryo(Map conf) { k.register(ConsList.class); k.register(BackPressureStatus.class); k.register(NodeInfo.class); + k.register(DetachedTuple.class); synchronized (loader) { for (SerializationRegister sr : loader) { diff --git a/storm-client/src/jvm/org/apache/storm/streams/windowing/SlidingWindows.java b/storm-client/src/jvm/org/apache/storm/streams/windowing/SlidingWindows.java index 428433dd636..254c3e18c8e 100644 --- a/storm-client/src/jvm/org/apache/storm/streams/windowing/SlidingWindows.java +++ b/storm-client/src/jvm/org/apache/storm/streams/windowing/SlidingWindows.java @@ -104,6 +104,9 @@ public SlidingWindows withTimestampField(String fieldName) { * org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. It must be defined on a per-component basis, and in * conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * + *

The late tuple is emitted as a {@link org.apache.storm.tuple.DetachedTuple}, a serializable copy of the original + * tuple detached from the topology context, so it can be consumed by bolts running in other workers. + * * @param streamId the name of the stream used to emit late tuples on */ public SlidingWindows withLateTupleStream(String streamId) { diff --git a/storm-client/src/jvm/org/apache/storm/streams/windowing/TumblingWindows.java b/storm-client/src/jvm/org/apache/storm/streams/windowing/TumblingWindows.java index faec6bf0db0..c06f29d9cc8 100644 --- a/storm-client/src/jvm/org/apache/storm/streams/windowing/TumblingWindows.java +++ b/storm-client/src/jvm/org/apache/storm/streams/windowing/TumblingWindows.java @@ -74,6 +74,9 @@ public TumblingWindows withTimestampField(String fieldName) { * org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. It must be defined on a per-component basis, and in * conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * + *

The late tuple is emitted as a {@link org.apache.storm.tuple.DetachedTuple}, a serializable copy of the original + * tuple detached from the topology context, so it can be consumed by bolts running in other workers. + * * @param streamId the name of the stream used to emit late tuples on */ public TumblingWindows withLateTupleStream(String streamId) { diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java index 4713e95d39b..7f7f2321cd5 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java @@ -33,6 +33,7 @@ import org.apache.storm.task.IOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.DetachedTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; @@ -61,6 +62,10 @@ * An {@link IWindowedBolt} wrapper that does the windowing of tuples. */ public class WindowedBoltExecutor implements IRichBolt { + /** + * Name of the field carrying a late tuple on the late tuple stream. The value is a {@link DetachedTuple}, + * a serializable copy of the original tuple detached from the topology context. + */ public static final String LATE_TUPLE_FIELD = "late_tuple"; private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class); private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s @@ -310,7 +315,8 @@ public void execute(Tuple input) { windowManager.add(input, ts); } else { if (lateTupleStream != null) { - windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); + // emit a detached copy: the original tuple references the topology context and cannot be serialized + windowedOutputCollector.emit(lateTupleStream, input, new Values(new DetachedTuple(input))); } else { LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts); } diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java index f411e87dbdd..318700d5d58 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java +++ b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java @@ -193,6 +193,9 @@ public TimestampExtractor getTimestampExtractor() { * org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. It must be defined on a per-component basis, and in * conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * + *

The late tuple is emitted as a {@link org.apache.storm.tuple.DetachedTuple}, a serializable copy of the original + * tuple detached from the topology context, so it can be consumed by bolts running in other workers. + * * @param streamId the name of the stream used to emit late tuples on */ public BaseWindowedBolt withLateTupleStream(String streamId) { diff --git a/storm-client/src/jvm/org/apache/storm/tuple/DetachedTuple.java b/storm-client/src/jvm/org/apache/storm/tuple/DetachedTuple.java new file mode 100644 index 00000000000..c3635fbc4b3 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/tuple/DetachedTuple.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.tuple; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.task.GeneralTopologyContext; + +/** + * A self-contained, serializable copy of a {@link Tuple} that is detached from the topology context. + * + *

A regular {@link TupleImpl} holds a reference to the {@link GeneralTopologyContext} it was created in, which cannot be + * serialized. A {@code DetachedTuple} instead snapshots the source component, task, stream, output fields and values of the + * original tuple, so it can be emitted as a value inside another tuple and cross worker boundaries (see STORM-4000). + * + *

Detached tuples are unanchored: {@link #getMessageId()} always returns an unanchored message id, and + * {@link #getContext()} throws {@link UnsupportedOperationException} since no topology context is available. + * + *

Unlike {@link TupleImpl}, which uses identity-based equality, two detached tuples are equal if they snapshot + * the same source metadata, fields and values, so a detached tuple keeps comparing equal after a + * serialization round-trip. + */ +public class DetachedTuple implements Tuple, Serializable { + private static final long serialVersionUID = 1L; + + private final String srcComponent; + private final int srcTask; + private final String srcStream; + private final List fieldNames; + private final List values; + private transient Fields fields; + + /** + * Creates a detached copy of the given tuple. The tuple must still be attached to its topology context, since the + * output fields of the source component are resolved through it. + * + * @param tuple the tuple to detach + */ + public DetachedTuple(Tuple tuple) { + this.srcComponent = tuple.getSourceComponent(); + this.srcTask = tuple.getSourceTask(); + this.srcStream = tuple.getSourceStreamId(); + this.fieldNames = new ArrayList<>(tuple.getFields().toList()); + this.values = new ArrayList<>(tuple.getValues()); + } + + /** + * No-arg constructor for serialization frameworks only. + */ + private DetachedTuple() { + this.srcComponent = null; + this.srcTask = 0; + this.srcStream = null; + this.fieldNames = null; + this.values = null; + } + + @Override + public int size() { + return values.size(); + } + + @Override + public boolean contains(String field) { + return getFields().contains(field); + } + + @Override + public Fields getFields() { + if (fields == null) { + fields = new Fields(fieldNames); + } + return fields; + } + + @Override + public int fieldIndex(String field) { + return getFields().fieldIndex(field); + } + + @Override + public List select(Fields selector) { + return getFields().select(selector, values); + } + + @Override + public Object getValue(int i) { + return values.get(i); + } + + @Override + public String getString(int i) { + return (String) values.get(i); + } + + @Override + public Integer getInteger(int i) { + return (Integer) values.get(i); + } + + @Override + public Long getLong(int i) { + return (Long) values.get(i); + } + + @Override + public Boolean getBoolean(int i) { + return (Boolean) values.get(i); + } + + @Override + public Short getShort(int i) { + return (Short) values.get(i); + } + + @Override + public Byte getByte(int i) { + return (Byte) values.get(i); + } + + @Override + public Double getDouble(int i) { + return (Double) values.get(i); + } + + @Override + public Float getFloat(int i) { + return (Float) values.get(i); + } + + @Override + public byte[] getBinary(int i) { + return (byte[]) values.get(i); + } + + @Override + public Object getValueByField(String field) { + return values.get(fieldIndex(field)); + } + + @Override + public String getStringByField(String field) { + return (String) values.get(fieldIndex(field)); + } + + @Override + public Integer getIntegerByField(String field) { + return (Integer) values.get(fieldIndex(field)); + } + + @Override + public Long getLongByField(String field) { + return (Long) values.get(fieldIndex(field)); + } + + @Override + public Boolean getBooleanByField(String field) { + return (Boolean) values.get(fieldIndex(field)); + } + + @Override + public Short getShortByField(String field) { + return (Short) values.get(fieldIndex(field)); + } + + @Override + public Byte getByteByField(String field) { + return (Byte) values.get(fieldIndex(field)); + } + + @Override + public Double getDoubleByField(String field) { + return (Double) values.get(fieldIndex(field)); + } + + @Override + public Float getFloatByField(String field) { + return (Float) values.get(fieldIndex(field)); + } + + @Override + public byte[] getBinaryByField(String field) { + return (byte[]) values.get(fieldIndex(field)); + } + + @Override + public List getValues() { + return values; + } + + @Override + public GlobalStreamId getSourceGlobalStreamId() { + return new GlobalStreamId(srcComponent, srcStream); + } + + @Override + public String getSourceComponent() { + return srcComponent; + } + + @Override + public int getSourceTask() { + return srcTask; + } + + @Override + public String getSourceStreamId() { + return srcStream; + } + + @Override + public MessageId getMessageId() { + return MessageId.makeUnanchored(); + } + + @Override + public GeneralTopologyContext getContext() { + throw new UnsupportedOperationException("A detached tuple has no topology context"); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DetachedTuple)) { + return false; + } + DetachedTuple other = (DetachedTuple) o; + return srcTask == other.srcTask + && Objects.equals(srcComponent, other.srcComponent) + && Objects.equals(srcStream, other.srcStream) + && Objects.equals(fieldNames, other.fieldNames) + && Objects.equals(values, other.values); + } + + @Override + public int hashCode() { + return Objects.hash(srcComponent, srcTask, srcStream, fieldNames, values); + } + + @Override + public String toString() { + return "source: " + srcComponent + ":" + srcTask + + ", stream: " + srcStream + + ", fields: " + fieldNames + + ", " + values; + } +} diff --git a/storm-client/test/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutorTest.java b/storm-client/test/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutorTest.java index 18f3b43f857..7343ac98408 100644 --- a/storm-client/test/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutorTest.java +++ b/storm-client/test/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutorTest.java @@ -28,6 +28,8 @@ import org.apache.storm.streams.Pair; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.DetachedTuple; +import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.windowing.Event; @@ -136,6 +138,8 @@ public void testExecuteLatetuple() { Mockito.when(mockWaterMarkEventGenerator.track(Mockito.any(), Mockito.anyLong())).thenReturn(false); Mockito.when(mockTimestampExtractor.extractTimestamp(Mockito.any())).thenReturn(tupleTs); Tuple mockTuple = Mockito.mock(Tuple.class); + Mockito.when(mockTuple.getFields()).thenReturn(new Fields("ts")); + Mockito.when(mockTuple.getValues()).thenReturn(new Values(tupleTs)); executor.initState(null); executor.waterMarkEventGenerator = mockWaterMarkEventGenerator; executor.execute(mockTuple); @@ -147,7 +151,7 @@ public void testExecuteLatetuple() { .emit(stringCaptor.capture(), anchorCaptor.capture(), valuesCaptor.capture()); assertEquals(LATE_STREAM, stringCaptor.getValue()); assertEquals(Collections.singletonList(mockTuple), anchorCaptor.getValue()); - assertEquals(new Values(mockTuple), valuesCaptor.getValue()); + assertEquals(new Values(new DetachedTuple(mockTuple)), valuesCaptor.getValue()); } @Test diff --git a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java index 2f028fc8f46..98c8be8de49 100644 --- a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java +++ b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java @@ -22,23 +22,29 @@ import org.apache.storm.Config; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.Grouping; +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.DetachedTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; import org.apache.storm.windowing.TupleWindow; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -215,7 +221,51 @@ public void testExecuteWithLateTupleStream() { } System.out.println(testWindowedBolt.tupleWindows); Tuple tuple = tuples.get(tuples.size() - 1); - Mockito.verify(outputCollector).emit("$late", Collections.singletonList(tuple), new Values(tuple)); + Mockito.verify(outputCollector).emit("$late", Collections.singletonList(tuple), new Values(new DetachedTuple(tuple))); + } + + @Test + public void testLateTupleStreamEmitsSerializableTuple() throws Exception { + testWindowedBolt = new TestWindowedBolt(); + testWindowedBolt.withTimestampField("ts"); + executor = new WindowedBoltExecutor(testWindowedBolt); + TopologyContext context = getTopologyContext(); + Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default", "$late"))); + + OutputCollector outputCollector = Mockito.mock(OutputCollector.class); + Map conf = new HashMap<>(); + conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000); + conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20); + conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10); + conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late"); + conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5); + //Trigger manually to avoid timing issues + conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 1_000_000); + executor.prepare(conf, context, outputCollector); + + long[] timestamps = { 603, 605, 607, 618, 626, 636, 600 }; + Tuple lateTuple = null; + for (long ts : timestamps) { + lateTuple = getTuple("s1", new Fields("ts"), new Values(ts), "s1Src"); + executor.execute(lateTuple); + executor.waterMarkEventGenerator.run(); + } + + ArgumentCaptor> valuesCaptor = ArgumentCaptor.forClass(List.class); + Mockito.verify(outputCollector).emit(Mockito.eq("$late"), Mockito.anyCollection(), valuesCaptor.capture()); + Object lateValue = valuesCaptor.getValue().get(0); + assertInstanceOf(DetachedTuple.class, lateValue); + DetachedTuple detached = (DetachedTuple) lateValue; + assertEquals(lateTuple.getValues(), detached.getValues()); + assertEquals(lateTuple.getSourceComponent(), detached.getSourceComponent()); + assertEquals(lateTuple.getSourceStreamId(), detached.getSourceStreamId()); + + // STORM-4000 regression: the late tuple must survive Kryo serialization without java fallback + Map serConf = Utils.readDefaultConfig(); + serConf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false); + byte[] serialized = new KryoValuesSerializer(serConf).serialize(new Values(lateValue)); + List roundTripped = new KryoValuesDeserializer(serConf).deserialize(serialized); + assertEquals(detached, roundTripped.get(0)); } @Test diff --git a/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java b/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java new file mode 100644 index 00000000000..6d56f143142 --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/tuple/DetachedTupleTest.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.tuple; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.apache.storm.task.GeneralTopologyContext; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for {@link DetachedTuple}. + */ +public class DetachedTupleTest { + + private Tuple sourceTuple; + private DetachedTuple detached; + + private GeneralTopologyContext getContext(final Fields fields) { + TopologyBuilder builder = new TopologyBuilder(); + return new GeneralTopologyContext( + builder.createTopology(), new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { + + @Override + public Fields getComponentOutputFields(String componentId, String streamId) { + return fields; + } + + }; + } + + @BeforeEach + public void setUp() { + Fields fields = new Fields("id", "ts"); + sourceTuple = new TupleImpl(getContext(fields), new Values(42, 1000L), "srcComponent", 7, "srcStream"); + detached = new DetachedTuple(sourceTuple); + } + + @Test + public void testSnapshotsSourceMetadata() { + assertEquals("srcComponent", detached.getSourceComponent()); + assertEquals(7, detached.getSourceTask()); + assertEquals("srcStream", detached.getSourceStreamId()); + assertEquals(new GlobalStreamId("srcComponent", "srcStream"), detached.getSourceGlobalStreamId()); + } + + @Test + public void testSnapshotsFieldsAndValues() { + assertEquals(2, detached.size()); + assertEquals(Arrays.asList("id", "ts"), detached.getFields().toList()); + assertEquals(sourceTuple.getValues(), detached.getValues()); + assertTrue(detached.contains("id")); + assertEquals(1, detached.fieldIndex("ts")); + assertEquals(42, detached.getValue(0)); + assertEquals(Integer.valueOf(42), detached.getIntegerByField("id")); + assertEquals(Long.valueOf(1000L), detached.getLongByField("ts")); + assertEquals(Arrays.asList(1000L), detached.select(new Fields("ts"))); + } + + @Test + public void testIsUnanchoredAndDetachedFromContext() { + assertEquals(MessageId.makeUnanchored(), detached.getMessageId()); + assertThrows(UnsupportedOperationException.class, () -> detached.getContext()); + } + + @Test + public void testValueBasedEquality() { + DetachedTuple other = new DetachedTuple(sourceTuple); + assertEquals(detached, other); + assertEquals(detached.hashCode(), other.hashCode()); + + Tuple differentTuple = new TupleImpl(getContext(new Fields("id", "ts")), new Values(43, 1000L), "srcComponent", 7, "srcStream"); + assertNotEquals(detached, new DetachedTuple(differentTuple)); + } + + @Test + public void testNotEqualToNullOrOtherType() { + assertFalse(detached.equals(null)); + assertFalse(detached.equals("not a tuple")); + assertFalse(detached.equals(sourceTuple)); + } + + @Test + public void testTypedGetters() { + Fields fields = new Fields("bool", "byte", "short"); + Tuple typed = new TupleImpl(getContext(fields), new Values(true, (byte) 1, (short) 2), "srcComponent", 0, "srcStream"); + DetachedTuple typedDetached = new DetachedTuple(typed); + + assertEquals(Boolean.TRUE, typedDetached.getBoolean(0)); + assertEquals(Byte.valueOf((byte) 1), typedDetached.getByte(1)); + assertEquals(Short.valueOf((short) 2), typedDetached.getShort(2)); + assertEquals(Boolean.TRUE, typedDetached.getBooleanByField("bool")); + assertEquals(Byte.valueOf((byte) 1), typedDetached.getByteByField("byte")); + assertEquals(Short.valueOf((short) 2), typedDetached.getShortByField("short")); + } + + @Test + public void testKryoRoundTripWithNullValue() { + Fields fields = new Fields("id", "ts"); + Tuple withNull = new TupleImpl(getContext(fields), new Values(42, null), "srcComponent", 7, "srcStream"); + DetachedTuple detachedWithNull = new DetachedTuple(withNull); + + Map conf = Utils.readDefaultConfig(); + conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false); + + byte[] serialized = new KryoValuesSerializer(conf).serialize(new Values(detachedWithNull)); + List roundTripped = new KryoValuesDeserializer(conf).deserialize(serialized); + + DetachedTuple deserialized = (DetachedTuple) roundTripped.get(0); + assertEquals(detachedWithNull, deserialized); + assertNull(deserialized.getValue(1)); + assertNull(deserialized.getLongByField("ts")); + } + + /** + * The {@code fields} member is transient and rebuilt lazily. Populate it via {@link DetachedTuple#getFields()} + * before serializing so the transient-skip path is actually exercised: the deserialized copy must rebuild its + * fields from scratch rather than carry a serialized {@link Fields} instance. + */ + @Test + public void testKryoRoundTripWithPopulatedFields() { + // force the lazy transient Fields to be built before serialization + detached.getFields(); + + Map conf = Utils.readDefaultConfig(); + conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false); + + byte[] serialized = new KryoValuesSerializer(conf).serialize(new Values(detached)); + List roundTripped = new KryoValuesDeserializer(conf).deserialize(serialized); + + DetachedTuple deserialized = (DetachedTuple) roundTripped.get(0); + assertEquals(detached, deserialized); + assertEquals(Arrays.asList("id", "ts"), deserialized.getFields().toList()); + assertEquals(Long.valueOf(1000L), deserialized.getLongByField("ts")); + } + + /** + * STORM-4000 regression: a tuple emitted on the late tuple stream must survive Kryo serialization. With + * {@link Config#TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION} disabled, serializing a {@link TupleImpl} (with its + * attached topology context) fails, while a {@link DetachedTuple} round-trips cleanly. + */ + @Test + public void testKryoRoundTripWithoutJavaFallback() { + Map conf = Utils.readDefaultConfig(); + conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false); + + byte[] serialized = new KryoValuesSerializer(conf).serialize(new Values(detached)); + List roundTripped = new KryoValuesDeserializer(conf).deserialize(serialized); + + assertInstanceOf(DetachedTuple.class, roundTripped.get(0)); + DetachedTuple deserialized = (DetachedTuple) roundTripped.get(0); + assertEquals(detached, deserialized); + // Fields are rebuilt lazily after deserialization + assertEquals(Arrays.asList("id", "ts"), deserialized.getFields().toList()); + assertEquals(Long.valueOf(1000L), deserialized.getLongByField("ts")); + } +}