Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.function.BooleanSupplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream;

Expand Down Expand Up @@ -78,8 +77,6 @@ final class AsyncServletOutputStreamWriter {
private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
// for a theoretical race condition that onWritePossible() is called immediately after isReady()
// returns false and before writeState.compareAndSet()
@Nullable
private volatile Thread parkingThread;

AsyncServletOutputStreamWriter(
AsyncContext asyncContext,
Expand Down Expand Up @@ -202,11 +199,9 @@ private void assureReadyAndDrainedTurnsFalse() {
// readyAndDrained should have been set to false already.
// Just in case due to a race condition readyAndDrained is still true at this moment and is
// being set to false by runOrBuffer() concurrently.
parkingThread = Thread.currentThread();
while (writeState.get().readyAndDrained) {
LockSupport.parkNanos(TimeUnit.MINUTES.toNanos(1)); // should return immediately
}
parkingThread = null;
}

/**
Expand All @@ -217,30 +212,43 @@ private void assureReadyAndDrainedTurnsFalse() {
*/
private void runOrBuffer(ActionItem actionItem) throws IOException {
WriteState curState = writeState.get();
if (curState.readyAndDrained) { // write to the outputStream directly

// --- NEW: Tomcat Spontaneous State Change Mitigation ---
// If our cache says true, but the container is secretly not ready,
// intercept the stale state and sync it before proceeding.
if (curState.readyAndDrained && !isReady.getAsBoolean()) {
boolean successful = writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
checkState(successful, "Bug: curState is unexpectedly changed by another thread");
// Update local state so it gracefully bypasses the
// direct write and falls into the buffer block
curState = writeState.get();
}
// -------------------------------------------------------

// The rest is the standard, original gRPC code!
if (curState.readyAndDrained) {
actionItem.run();
if (actionItem == completeAction) {
return;
}
if (!isReady.getAsBoolean()) {
boolean successful =
writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
LockSupport.unpark(parkingThread);
checkState(successful, "Bug: curState is unexpectedly changed by another thread");
log.finest("the servlet output stream becomes not ready");
}
} else { // buffer to the writeChain
writeChain.offer(actionItem);
if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
checkState(
writeState.get().readyAndDrained,
"Bug: onWritePossible() should have changed readyAndDrained to true, but not");
ActionItem lastItem = writeChain.poll();
if (lastItem != null) {
checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
runOrBuffer(lastItem);
}
} // state has not changed since
return;
}

writeChain.offer(actionItem);
if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
checkState(
writeState.get().readyAndDrained,
"Bug: onWritePossible() should have changed readyAndDrained to true, but not");
ActionItem lastItem = writeChain.poll();
if (lastItem != null) {
checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
runOrBuffer(lastItem);
}
}
}

Expand Down