Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/Windowing.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,14 @@ public BaseWindowedBolt withLateTupleStream(String streamId)

```
This behaviour can be changed by specifying the above `streamId`. In this case late tuples are going to be emitted on the specified stream and accessible
via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`.
via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`. The value of this field is a `org.apache.storm.tuple.DetachedTuple`, a serializable copy of the
original tuple (source component, task, stream, fields and values) detached from the topology context, so that the late tuple stream can also be
consumed by bolts running in different workers.

> **Behavior change:** previously the value in `LATE_TUPLE_FIELD` was the original input tuple (`TupleImpl`), which could not be serialized and therefore
> only worked when the late tuple stream was consumed in the same worker. It is now a `DetachedTuple`. Read it through the `Tuple` interface as before;
> however the detached tuple is unanchored and carries no topology context, so `getContext()` throws `UnsupportedOperationException` and the value can no
> longer be cast to `TupleImpl`.


### Watermarks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.storm.serialization.types.HashSetSerializer;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.trident.tuple.ConsList;
import org.apache.storm.tuple.DetachedTuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.ListDelegate;
import org.apache.storm.utils.ReflectionUtils;
Expand Down Expand Up @@ -77,6 +78,7 @@ public static Kryo getKryo(Map<String, Object> conf) {
k.register(ConsList.class);
k.register(BackPressureStatus.class);
k.register(NodeInfo.class);
k.register(DetachedTuple.class);

synchronized (loader) {
for (SerializationRegister sr : loader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public SlidingWindows<L, I> withTimestampField(String fieldName) {
* org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. It must be defined on a per-component basis, and in
* conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
*
* <p>The late tuple is emitted as a {@link org.apache.storm.tuple.DetachedTuple}, a serializable copy of the original
* tuple detached from the topology context, so it can be consumed by bolts running in other workers.
*
* @param streamId the name of the stream used to emit late tuples on
*/
public SlidingWindows<L, I> withLateTupleStream(String streamId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public TumblingWindows<L> withTimestampField(String fieldName) {
* org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. It must be defined on a per-component basis, and in
* conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
*
* <p>The late tuple is emitted as a {@link org.apache.storm.tuple.DetachedTuple}, a serializable copy of the original
* tuple detached from the topology context, so it can be consumed by bolts running in other workers.
*
* @param streamId the name of the stream used to emit late tuples on
*/
public TumblingWindows<L> withLateTupleStream(String streamId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.DetachedTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
Expand Down Expand Up @@ -61,6 +62,10 @@
* An {@link IWindowedBolt} wrapper that does the windowing of tuples.
*/
public class WindowedBoltExecutor implements IRichBolt {
/**
* Name of the field carrying a late tuple on the late tuple stream. The value is a {@link DetachedTuple},
* a serializable copy of the original tuple detached from the topology context.
*/
public static final String LATE_TUPLE_FIELD = "late_tuple";
private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
Expand Down Expand Up @@ -310,7 +315,8 @@ public void execute(Tuple input) {
windowManager.add(input, ts);
} else {
if (lateTupleStream != null) {
windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
// emit a detached copy: the original tuple references the topology context and cannot be serialized
windowedOutputCollector.emit(lateTupleStream, input, new Values(new DetachedTuple(input)));
} else {
LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ public TimestampExtractor getTimestampExtractor() {
* org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. It must be defined on a per-component basis, and in
* conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
*
* <p>The late tuple is emitted as a {@link org.apache.storm.tuple.DetachedTuple}, a serializable copy of the original
* tuple detached from the topology context, so it can be consumed by bolts running in other workers.
*
* @param streamId the name of the stream used to emit late tuples on
*/
public BaseWindowedBolt withLateTupleStream(String streamId) {
Expand Down
262 changes: 262 additions & 0 deletions storm-client/src/jvm/org/apache/storm/tuple/DetachedTuple.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

package org.apache.storm.tuple;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.GeneralTopologyContext;

/**
* A self-contained, serializable copy of a {@link Tuple} that is detached from the topology context.
*
* <p>A regular {@link TupleImpl} holds a reference to the {@link GeneralTopologyContext} it was created in, which cannot be
* serialized. A {@code DetachedTuple} instead snapshots the source component, task, stream, output fields and values of the
* original tuple, so it can be emitted as a value inside another tuple and cross worker boundaries (see STORM-4000).
*
* <p>Detached tuples are unanchored: {@link #getMessageId()} always returns an unanchored message id, and
* {@link #getContext()} throws {@link UnsupportedOperationException} since no topology context is available.
*
* <p>Unlike {@link TupleImpl}, which uses identity-based equality, two detached tuples are equal if they snapshot
* the same source metadata, fields and values, so a detached tuple keeps comparing equal after a
* serialization round-trip.
*/
public class DetachedTuple implements Tuple, Serializable {
private static final long serialVersionUID = 1L;

private final String srcComponent;
private final int srcTask;
private final String srcStream;
private final List<String> fieldNames;
private final List<Object> values;
private transient Fields fields;

/**
* Creates a detached copy of the given tuple. The tuple must still be attached to its topology context, since the
* output fields of the source component are resolved through it.
*
* @param tuple the tuple to detach
*/
public DetachedTuple(Tuple tuple) {
this.srcComponent = tuple.getSourceComponent();
this.srcTask = tuple.getSourceTask();
this.srcStream = tuple.getSourceStreamId();
this.fieldNames = new ArrayList<>(tuple.getFields().toList());
this.values = new ArrayList<>(tuple.getValues());
}

/**
* No-arg constructor for serialization frameworks only.
*/
private DetachedTuple() {
this.srcComponent = null;
this.srcTask = 0;
this.srcStream = null;
this.fieldNames = null;
this.values = null;
}

@Override
public int size() {
return values.size();
}

@Override
public boolean contains(String field) {
return getFields().contains(field);
}

@Override
public Fields getFields() {
if (fields == null) {
fields = new Fields(fieldNames);
}
return fields;
}

@Override
public int fieldIndex(String field) {
return getFields().fieldIndex(field);
}

@Override
public List<Object> select(Fields selector) {
return getFields().select(selector, values);
}

@Override
public Object getValue(int i) {
return values.get(i);
}

@Override
public String getString(int i) {
return (String) values.get(i);
}

@Override
public Integer getInteger(int i) {
return (Integer) values.get(i);
}

@Override
public Long getLong(int i) {
return (Long) values.get(i);
}

@Override
public Boolean getBoolean(int i) {
return (Boolean) values.get(i);
}

@Override
public Short getShort(int i) {
return (Short) values.get(i);
}

@Override
public Byte getByte(int i) {
return (Byte) values.get(i);
}

@Override
public Double getDouble(int i) {
return (Double) values.get(i);
}

@Override
public Float getFloat(int i) {
return (Float) values.get(i);
}

@Override
public byte[] getBinary(int i) {
return (byte[]) values.get(i);
}

@Override
public Object getValueByField(String field) {
return values.get(fieldIndex(field));
}

@Override
public String getStringByField(String field) {
return (String) values.get(fieldIndex(field));
}

@Override
public Integer getIntegerByField(String field) {
return (Integer) values.get(fieldIndex(field));
}

@Override
public Long getLongByField(String field) {
return (Long) values.get(fieldIndex(field));
}

@Override
public Boolean getBooleanByField(String field) {
return (Boolean) values.get(fieldIndex(field));
}

@Override
public Short getShortByField(String field) {
return (Short) values.get(fieldIndex(field));
}

@Override
public Byte getByteByField(String field) {
return (Byte) values.get(fieldIndex(field));
}

@Override
public Double getDoubleByField(String field) {
return (Double) values.get(fieldIndex(field));
}

@Override
public Float getFloatByField(String field) {
return (Float) values.get(fieldIndex(field));
}

@Override
public byte[] getBinaryByField(String field) {
return (byte[]) values.get(fieldIndex(field));
}

@Override
public List<Object> getValues() {
return values;
}

@Override
public GlobalStreamId getSourceGlobalStreamId() {
return new GlobalStreamId(srcComponent, srcStream);
}

@Override
public String getSourceComponent() {
return srcComponent;
}

@Override
public int getSourceTask() {
return srcTask;
}

@Override
public String getSourceStreamId() {
return srcStream;
}

@Override
public MessageId getMessageId() {
return MessageId.makeUnanchored();
}

@Override
public GeneralTopologyContext getContext() {
throw new UnsupportedOperationException("A detached tuple has no topology context");
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DetachedTuple)) {
return false;
}
DetachedTuple other = (DetachedTuple) o;
return srcTask == other.srcTask
&& Objects.equals(srcComponent, other.srcComponent)
&& Objects.equals(srcStream, other.srcStream)
&& Objects.equals(fieldNames, other.fieldNames)
&& Objects.equals(values, other.values);
}

@Override
public int hashCode() {
return Objects.hash(srcComponent, srcTask, srcStream, fieldNames, values);
}

@Override
public String toString() {
return "source: " + srcComponent + ":" + srcTask
+ ", stream: " + srcStream
+ ", fields: " + fieldNames
+ ", " + values;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.storm.streams.Pair;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.DetachedTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.Event;
Expand Down Expand Up @@ -136,6 +138,8 @@ public void testExecuteLatetuple() {
Mockito.when(mockWaterMarkEventGenerator.track(Mockito.any(), Mockito.anyLong())).thenReturn(false);
Mockito.when(mockTimestampExtractor.extractTimestamp(Mockito.any())).thenReturn(tupleTs);
Tuple mockTuple = Mockito.mock(Tuple.class);
Mockito.when(mockTuple.getFields()).thenReturn(new Fields("ts"));
Mockito.when(mockTuple.getValues()).thenReturn(new Values(tupleTs));
executor.initState(null);
executor.waterMarkEventGenerator = mockWaterMarkEventGenerator;
executor.execute(mockTuple);
Expand All @@ -147,7 +151,7 @@ public void testExecuteLatetuple() {
.emit(stringCaptor.capture(), anchorCaptor.capture(), valuesCaptor.capture());
assertEquals(LATE_STREAM, stringCaptor.getValue());
assertEquals(Collections.singletonList(mockTuple), anchorCaptor.getValue());
assertEquals(new Values(mockTuple), valuesCaptor.getValue());
assertEquals(new Values(new DetachedTuple(mockTuple)), valuesCaptor.getValue());
}

@Test
Expand Down
Loading
Loading