Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions api/src/main/java/io/grpc/ClientStreamTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,32 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat
public void createPendingStream() {
}

/**
* A delay segment started with a canonical root cause.
*
* @param delayType the canonical root cause label (e.g., "connecting", "client_channel_init")
* @since 1.82.0
*/
public void delayTypeStarted(String delayType) {
}

/**
* High-cardinality diagnostic context attached to the active delay span.
*
* @param delayReason verbose diagnostic description of the delay
* @since 1.82.0
*/
public void delayReasonAttached(String delayReason) {
}

/**
* The current delay segment ended.
*
* @since 1.82.0
*/
public void delayEnded() {
}

/**
* Headers has been sent to the socket.
*/
Expand Down
48 changes: 41 additions & 7 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -547,25 +547,32 @@ public static final class PickResult {
// True if the result is created by withDrop()
private final boolean drop;
@Nullable private final String authorityOverride;
@Nullable private final String delayType;
@Nullable private final String delayReason;

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop) {
this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory;
this.status = checkNotNull(status, "status");
this.drop = drop;
this.authorityOverride = null;
this(subchannel, streamTracerFactory, status, drop, null, null, null);
}

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop, @Nullable String authorityOverride) {
this(subchannel, streamTracerFactory, status, drop, authorityOverride, null, null);
}

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop, @Nullable String authorityOverride,
@Nullable String delayType, @Nullable String delayReason) {
this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory;
this.status = checkNotNull(status, "status");
this.drop = drop;
this.authorityOverride = authorityOverride;
this.delayType = delayType;
this.delayReason = delayReason;
}

/**
Expand Down Expand Up @@ -677,7 +684,7 @@ public static PickResult withSubchannel(Subchannel subchannel) {
*/
public PickResult copyWithSubchannel(Subchannel subchannel) {
return new PickResult(checkNotNull(subchannel, "subchannel"), streamTracerFactory,
status, drop, authorityOverride);
status, drop, authorityOverride, delayType, delayReason);
}

/**
Expand All @@ -688,7 +695,9 @@ public PickResult copyWithSubchannel(Subchannel subchannel) {
*/
public PickResult copyWithStreamTracerFactory(
@Nullable ClientStreamTracer.Factory streamTracerFactory) {
return new PickResult(subchannel, streamTracerFactory, status, drop, authorityOverride);
return new PickResult(
subchannel, streamTracerFactory, status, drop, authorityOverride, delayType,
delayReason);
}

/**
Expand Down Expand Up @@ -725,6 +734,31 @@ public static PickResult withNoResult() {
return NO_RESULT;
}

/**
* No decision could be made. The RPC will stay buffered with a specific delay type and reason.
*
* @param delayType low-cardinality root cause label (e.g., "connecting")
* @param delayReason high-cardinality diagnostic string for trace events
* @since 1.82.0
*/
public static PickResult withNoResult(String delayType, String delayReason) {
Preconditions.checkNotNull(delayType, "delayType");
Preconditions.checkNotNull(delayReason, "delayReason");
return new PickResult(null, null, Status.OK, false, null, delayType, delayReason);
}

/** Returns the delay type label if any. */
@Nullable
public String getDelayType() {
return delayType;
}

/** Returns the diagnostic delay reason if any. */
@Nullable
public String getDelayReason() {
return delayReason;
}

/** Returns the authority override if any. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656")
@Nullable
Expand Down
108 changes: 101 additions & 7 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -157,7 +158,9 @@ public final ClientStream newStream(
synchronized (lock) {
PickerState newerState = pickerState;
if (state == newerState) {
return createPendingStream(args, tracers, pickResult);
String delayType = determineQueuingDelayType(pickResult);
String delayReason = determineQueuingDelayReason(pickResult);
return createPendingStream(args, tracers, pickResult, delayType, delayReason);
}
state = newerState;
}
Expand All @@ -173,8 +176,8 @@ public final ClientStream newStream(
*/
@GuardedBy("lock")
private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
PickResult pickResult) {
PendingStream pendingStream = new PendingStream(args, tracers);
PickResult pickResult, @Nullable String delayType, @Nullable String delayReason) {
PendingStream pendingStream = new PendingStream(args, tracers, delayType, delayReason);
if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
pendingStream.lastPickStatus = pickResult.getStatus();
}
Expand Down Expand Up @@ -245,7 +248,7 @@ public final void shutdownNow(Status status) {
}
if (savedReportTransportTerminated != null) {
for (PendingStream stream : savedPendingStreams) {
Runnable runnable = stream.setStream(
Runnable runnable = stream.setStreamAndEndDelay(
new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
if (runnable != null) {
// Drain in-line instead of using an executor as failing stream just throws everything
Expand Down Expand Up @@ -303,6 +306,7 @@ final void reprocess(@Nullable SubchannelPicker picker) {
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
stream.endDelay();
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
Expand All @@ -315,7 +319,11 @@ final void reprocess(@Nullable SubchannelPicker picker) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
} else { // stay pending
String delayType = determineQueuingDelayType(pickResult);
String delayReason = determineQueuingDelayReason(pickResult);
stream.updateDelay(delayType, delayReason);
}
}

synchronized (lock) {
Expand Down Expand Up @@ -356,16 +364,101 @@ public InternalLogId getLogId() {
return logId;
}

private static String determineQueuingDelayType(@Nullable PickResult pickResult) {
if (pickResult == null) {
return "client_channel_init";
}
if (pickResult.getSubchannel() != null) {
return "subchannel_state_mismatch";
}
if (!pickResult.getStatus().isOk()) {
return "wait_for_ready_failed";
}
if (pickResult.getDelayType() != null) {
return pickResult.getDelayType();
}
return "client_channel_init";
}

private static String determineQueuingDelayReason(@Nullable PickResult pickResult) {
if (pickResult == null) {
return "client channel: created LB policy.";
}
if (pickResult.getSubchannel() != null) {
return "subchannel returned by LB picker has no connected subchannel";
}
if (!pickResult.getStatus().isOk()) {
return "wait_for_ready RPC failed with status: " + pickResult.getStatus();
}
if (pickResult.getDelayReason() != null) {
return pickResult.getDelayReason();
}
return "client channel: waiting for picker";
}

private class PendingStream extends DelayedStream {
private final PickSubchannelArgs args;
private final Context context = Context.current();
private final ClientStreamTracer[] tracers;
private volatile Status lastPickStatus;
@Nullable private String activeDelayType;
@Nullable private String activeDelayReason;

private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
@Nullable String initialType, @Nullable String initialReason) {
super("connecting_and_lb");
this.args = args;
this.tracers = tracers;
this.activeDelayType = initialType;
this.activeDelayReason = initialReason;
if (initialType != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayTypeStarted(initialType);
}
}
if (initialReason != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayReasonAttached(initialReason);
}
}
}

void updateDelay(@Nullable String newType, @Nullable String newReason) {
if (!Objects.equals(activeDelayType, newType)) {
if (activeDelayType != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayEnded();
}
}
activeDelayType = newType;
activeDelayReason = null;
if (newType != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayTypeStarted(newType);
}
}
}
if (newType != null && newReason != null && !Objects.equals(activeDelayReason, newReason)) {
activeDelayReason = newReason;
for (ClientStreamTracer tracer : tracers) {
tracer.delayReasonAttached(newReason);
}
}
}

void endDelay() {
if (activeDelayType != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayEnded();
}
activeDelayType = null;
activeDelayReason = null;
}
}

Runnable setStreamAndEndDelay(ClientStream stream) {
endDelay();
return setStream(stream);
}

/** Runnable may be null. */
Expand All @@ -386,11 +479,12 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve
// been called on the delayed stream.
realStream.setAuthority(authorityOverride);
}
return setStream(realStream);
return setStreamAndEndDelay(realStream);
}

@Override
public void cancel(Status reason) {
endDelay();
super.cancel(reason);
synchronized (lock) {
if (reportTransportTerminated != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ public void createPendingStream() {
delegate().createPendingStream();
}

@Override
public void delayTypeStarted(String delayType) {
delegate().delayTypeStarted(delayType);
}

@Override
public void delayReasonAttached(String delayReason) {
delegate().delayReasonAttached(delayReason);
}

@Override
public void delayEnded() {
delegate().delayEnded();
}

@Override
public void outboundHeaders() {
delegate().outboundHeaders();
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (noOldAddrs) {
// Make tests happy; they don't properly assume starting in CONNECTING
rawConnectivityState = CONNECTING;
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
updateBalancingState(
CONNECTING,
new FixedResultPicker(
PickResult.withNoResult("connecting", "pick_first: address list updated")));
}

if (rawConnectivityState == READY) {
Expand Down Expand Up @@ -340,10 +343,13 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
// the current address of a valid index exists.
if ((!enableHappyEyeballs && !addressIndex.isValid())
|| (addressIndex.isValid() && !subchannels.containsKey(
addressIndex.getCurrentAddress()))) {
addressIndex.getCurrentAddress()))) {
addressIndex.seekTo(getAddress(subchannelData.subchannel));
}
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
updateBalancingState(
CONNECTING,
new FixedResultPicker(
PickResult.withNoResult("connecting", "pick_first: attempting to connect")));
break;

case READY:
Expand Down Expand Up @@ -668,7 +674,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (connectionRequested.compareAndSet(false, true)) {
helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection);
}
return PickResult.withNoResult();
return PickResult.withNoResult(
"connecting", "pick_first: requesting connection");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
* list and sticking to the first that works.
*/
final class PickFirstLoadBalancer extends LoadBalancer {
private static final PickResult CONNECTING_RESULT =
PickResult.withNoResult("connecting", "pick_first: attempting to connect");
private final Helper helper;
private Subchannel subchannel;
private ConnectivityState currentState = IDLE;
Expand Down Expand Up @@ -83,7 +85,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {

// The channel state does not get updated when doing name resolving today, so for the moment
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT));
subchannel.requestConnection();
} else {
subchannel.updateAddresses(servers);
Expand Down Expand Up @@ -135,7 +137,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
case CONNECTING:
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
// the current picker in-place. But ignoring the potential optimization is simpler.
picker = new FixedResultPicker(PickResult.withNoResult());
picker = new FixedResultPicker(CONNECTING_RESULT);
break;
case READY:
picker = new FixedResultPicker(PickResult.withSubchannel(subchannel));
Expand Down Expand Up @@ -178,7 +180,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (connectionRequested.compareAndSet(false, true)) {
helper.getSynchronizationContext().execute(PickFirstLoadBalancer.this::requestConnection);
}
return PickResult.withNoResult();
return PickResult.withNoResult(
"connecting", "pick_first: requesting connection");
}
}

Expand Down
Loading