diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 8e11e781e7c..92b2f052bd7 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -57,6 +57,32 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat public void createPendingStream() { } + /** + * A delay segment started with a canonical root cause. + * + * @param delayType the canonical root cause label (e.g., "connecting", "client_channel_init") + * @since 1.82.0 + */ + public void delayTypeStarted(String delayType) { + } + + /** + * High-cardinality diagnostic context attached to the active delay span. + * + * @param delayReason verbose diagnostic description of the delay + * @since 1.82.0 + */ + public void delayReasonAttached(String delayReason) { + } + + /** + * The current delay segment ended. + * + * @since 1.82.0 + */ + public void delayEnded() { + } + /** * Headers has been sent to the socket. */ diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 5dd44a492ee..e5c3d053ee7 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -547,25 +547,32 @@ public static final class PickResult { // True if the result is created by withDrop() private final boolean drop; @Nullable private final String authorityOverride; + @Nullable private final String delayType; + @Nullable private final String delayReason; private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop) { - this.subchannel = subchannel; - this.streamTracerFactory = streamTracerFactory; - this.status = checkNotNull(status, "status"); - this.drop = drop; - this.authorityOverride = null; + this(subchannel, streamTracerFactory, status, drop, null, null, null); } private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop, @Nullable String authorityOverride) { + this(subchannel, streamTracerFactory, status, drop, authorityOverride, null, null); + } + + private PickResult( + @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, + Status status, boolean drop, @Nullable String authorityOverride, + @Nullable String delayType, @Nullable String delayReason) { this.subchannel = subchannel; this.streamTracerFactory = streamTracerFactory; this.status = checkNotNull(status, "status"); this.drop = drop; this.authorityOverride = authorityOverride; + this.delayType = delayType; + this.delayReason = delayReason; } /** @@ -677,7 +684,7 @@ public static PickResult withSubchannel(Subchannel subchannel) { */ public PickResult copyWithSubchannel(Subchannel subchannel) { return new PickResult(checkNotNull(subchannel, "subchannel"), streamTracerFactory, - status, drop, authorityOverride); + status, drop, authorityOverride, delayType, delayReason); } /** @@ -688,7 +695,9 @@ public PickResult copyWithSubchannel(Subchannel subchannel) { */ public PickResult copyWithStreamTracerFactory( @Nullable ClientStreamTracer.Factory streamTracerFactory) { - return new PickResult(subchannel, streamTracerFactory, status, drop, authorityOverride); + return new PickResult( + subchannel, streamTracerFactory, status, drop, authorityOverride, delayType, + delayReason); } /** @@ -725,6 +734,31 @@ public static PickResult withNoResult() { return NO_RESULT; } + /** + * No decision could be made. The RPC will stay buffered with a specific delay type and reason. + * + * @param delayType low-cardinality root cause label (e.g., "connecting") + * @param delayReason high-cardinality diagnostic string for trace events + * @since 1.82.0 + */ + public static PickResult withNoResult(String delayType, String delayReason) { + Preconditions.checkNotNull(delayType, "delayType"); + Preconditions.checkNotNull(delayReason, "delayReason"); + return new PickResult(null, null, Status.OK, false, null, delayType, delayReason); + } + + /** Returns the delay type label if any. */ + @Nullable + public String getDelayType() { + return delayType; + } + + /** Returns the diagnostic delay reason if any. */ + @Nullable + public String getDelayReason() { + return delayReason; + } + /** Returns the authority override if any. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656") @Nullable diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 5569e1eecf8..e5ad2f0e912 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.Objects; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -157,7 +158,9 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - return createPendingStream(args, tracers, pickResult); + String delayType = determineQueuingDelayType(pickResult); + String delayReason = determineQueuingDelayReason(pickResult); + return createPendingStream(args, tracers, pickResult, delayType, delayReason); } state = newerState; } @@ -173,8 +176,8 @@ public final ClientStream newStream( */ @GuardedBy("lock") private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, - PickResult pickResult) { - PendingStream pendingStream = new PendingStream(args, tracers); + PickResult pickResult, @Nullable String delayType, @Nullable String delayReason) { + PendingStream pendingStream = new PendingStream(args, tracers, delayType, delayReason); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { pendingStream.lastPickStatus = pickResult.getStatus(); } @@ -245,7 +248,7 @@ public final void shutdownNow(Status status) { } if (savedReportTransportTerminated != null) { for (PendingStream stream : savedPendingStreams) { - Runnable runnable = stream.setStream( + Runnable runnable = stream.setStreamAndEndDelay( new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers)); if (runnable != null) { // Drain in-line instead of using an executor as failing stream just throws everything @@ -303,6 +306,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { + stream.endDelay(); Executor executor = defaultAppExecutor; // createRealStream may be expensive. It will start real streams on the transport. If // there are pending requests, they will be serialized too, which may be expensive. Since @@ -315,7 +319,11 @@ final void reprocess(@Nullable SubchannelPicker picker) { executor.execute(runnable); } toRemove.add(stream); - } // else: stay pending + } else { // stay pending + String delayType = determineQueuingDelayType(pickResult); + String delayReason = determineQueuingDelayReason(pickResult); + stream.updateDelay(delayType, delayReason); + } } synchronized (lock) { @@ -356,16 +364,101 @@ public InternalLogId getLogId() { return logId; } + private static String determineQueuingDelayType(@Nullable PickResult pickResult) { + if (pickResult == null) { + return "client_channel_init"; + } + if (pickResult.getSubchannel() != null) { + return "subchannel_state_mismatch"; + } + if (!pickResult.getStatus().isOk()) { + return "wait_for_ready_failed"; + } + if (pickResult.getDelayType() != null) { + return pickResult.getDelayType(); + } + return "client_channel_init"; + } + + private static String determineQueuingDelayReason(@Nullable PickResult pickResult) { + if (pickResult == null) { + return "client channel: created LB policy."; + } + if (pickResult.getSubchannel() != null) { + return "subchannel returned by LB picker has no connected subchannel"; + } + if (!pickResult.getStatus().isOk()) { + return "wait_for_ready RPC failed with status: " + pickResult.getStatus(); + } + if (pickResult.getDelayReason() != null) { + return pickResult.getDelayReason(); + } + return "client channel: waiting for picker"; + } + private class PendingStream extends DelayedStream { private final PickSubchannelArgs args; private final Context context = Context.current(); private final ClientStreamTracer[] tracers; private volatile Status lastPickStatus; + @Nullable private String activeDelayType; + @Nullable private String activeDelayReason; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, + @Nullable String initialType, @Nullable String initialReason) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; + this.activeDelayType = initialType; + this.activeDelayReason = initialReason; + if (initialType != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayTypeStarted(initialType); + } + } + if (initialReason != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayReasonAttached(initialReason); + } + } + } + + void updateDelay(@Nullable String newType, @Nullable String newReason) { + if (!Objects.equals(activeDelayType, newType)) { + if (activeDelayType != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + } + activeDelayType = newType; + activeDelayReason = null; + if (newType != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayTypeStarted(newType); + } + } + } + if (newType != null && newReason != null && !Objects.equals(activeDelayReason, newReason)) { + activeDelayReason = newReason; + for (ClientStreamTracer tracer : tracers) { + tracer.delayReasonAttached(newReason); + } + } + } + + void endDelay() { + if (activeDelayType != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + activeDelayType = null; + activeDelayReason = null; + } + } + + Runnable setStreamAndEndDelay(ClientStream stream) { + endDelay(); + return setStream(stream); } /** Runnable may be null. */ @@ -386,11 +479,12 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve // been called on the delayed stream. realStream.setAuthority(authorityOverride); } - return setStream(realStream); + return setStreamAndEndDelay(realStream); } @Override public void cancel(Status reason) { + endDelay(); super.cancel(reason); synchronized (lock) { if (reportTransportTerminated != null) { diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index e7679ea14cc..2553f1ea6ec 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -39,6 +39,21 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayTypeStarted(String delayType) { + delegate().delayTypeStarted(delayType); + } + + @Override + public void delayReasonAttached(String delayReason) { + delegate().delayReasonAttached(delayReason); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index ab60a024e1f..6c880aeb353 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -167,7 +167,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (noOldAddrs) { // Make tests happy; they don't properly assume starting in CONNECTING rawConnectivityState = CONNECTING; - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "pick_first: address list updated"))); } if (rawConnectivityState == READY) { @@ -340,10 +343,13 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo // the current address of a valid index exists. if ((!enableHappyEyeballs && !addressIndex.isValid()) || (addressIndex.isValid() && !subchannels.containsKey( - addressIndex.getCurrentAddress()))) { + addressIndex.getCurrentAddress()))) { addressIndex.seekTo(getAddress(subchannelData.subchannel)); } - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "pick_first: attempting to connect"))); break; case READY: @@ -668,7 +674,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection); } - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "pick_first: requesting connection"); } } diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index cf4b4c94e04..562debfd862 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,6 +38,8 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("connecting", "pick_first: attempting to connect"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; @@ -83,7 +85,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -135,7 +137,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave // the current picker in-place. But ignoring the potential optimization is simpler. - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker(CONNECTING_RESULT); break; case READY: picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); @@ -178,7 +180,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { helper.getSynchronizationContext().execute(PickFirstLoadBalancer.this::requestConnection); } - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "pick_first: requesting connection"); } } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index d7e1d4ca4f6..66a0c090be0 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -772,6 +772,157 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure( + " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]"); } + @Test + public void streamDelayMetrics() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("connecting", "pick_first: attempting to connect")); + + delayedTransport.reprocess(connectingPicker); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + InOrder inOrder = inOrder(mockTracer); + inOrder.verify(mockTracer).delayTypeStarted("connecting"); + inOrder.verify(mockTracer).delayReasonAttached("pick_first: attempting to connect"); + + SubchannelPicker customDelayPicker = mock(SubchannelPicker.class); + when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("rls_lookup_pending", "RLS request pending.")); + + delayedTransport.reprocess(customDelayPicker); + + inOrder.verify(mockTracer).delayEnded(); + inOrder.verify(mockTracer).delayTypeStarted("rls_lookup_pending"); + inOrder.verify(mockTracer).delayReasonAttached("RLS request pending."); + + delayedTransport.reprocess(mockPicker); + + inOrder.verify(mockTracer).delayEnded(); + } + + @Test + public void streamDelayMetrics_cancelled() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("connecting", "pick_first: attempting to connect")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); + + verify(mockTracer).delayTypeStarted("connecting"); + verify(mockTracer).delayReasonAttached("pick_first: attempting to connect"); + + stream.cancel(Status.CANCELLED); + + verify(mockTracer).delayEnded(); + } + + @Test + public void streamDelayMetrics_shutdownNow() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("connecting", "pick_first: attempting to connect")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); + + verify(mockTracer).delayTypeStarted("connecting"); + verify(mockTracer).delayReasonAttached("pick_first: attempting to connect"); + + delayedTransport.shutdownNow(Status.UNAVAILABLE); + + verify(mockTracer).delayEnded(); + } + + @Test + public void streamDelayMetrics_cadenceReasonUpdate_doesNotStartNewTypeSegment() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker picker1 = mock(SubchannelPicker.class); + when(picker1.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("connecting", "attempt 1")); + + delayedTransport.reprocess(picker1); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + verify(mockTracer, times(1)).delayTypeStarted("connecting"); + verify(mockTracer).delayReasonAttached("attempt 1"); + + SubchannelPicker picker2 = mock(SubchannelPicker.class); + when(picker2.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("connecting", "attempt 2")); + + delayedTransport.reprocess(picker2); + + verify(mockTracer, times(1)).delayTypeStarted("connecting"); + verify(mockTracer).delayReasonAttached("attempt 2"); + verify(mockTracer, never()).delayEnded(); + } + + @Test + public void streamDelayMetrics_channelFallback_clientChannelInit() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + // No picker reprocessed yet (lastPicker == null) + delayedTransport.newStream(method, headers, callOptions, customTracers); + + verify(mockTracer).delayTypeStarted("client_channel_init"); + verify(mockTracer).delayReasonAttached("client channel: created LB policy."); + } + + @Test + public void streamDelayMetrics_channelFallback_subchannelStateMismatch() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + io.grpc.LoadBalancer.Subchannel disconnectedSubchannel = + mock(io.grpc.LoadBalancer.Subchannel.class); + when(disconnectedSubchannel.getInternalSubchannel()) + .thenReturn(newTransportProvider(null)); + + SubchannelPicker stalePicker = mock(SubchannelPicker.class); + when(stalePicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withSubchannel(disconnectedSubchannel)); + + delayedTransport.reprocess(stalePicker); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + verify(mockTracer).delayTypeStarted("subchannel_state_mismatch"); + verify(mockTracer).delayReasonAttached( + "subchannel returned by LB picker has no connected subchannel"); + } + + @Test + public void streamDelayMetrics_channelFallback_waitForReadyFailed() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker failPicker = mock(SubchannelPicker.class); + when(failPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withError(Status.UNAVAILABLE)); + + delayedTransport.reprocess(failPicker); + CallOptions wfrOptions = callOptions.withWaitForReady(); + delayedTransport.newStream(method, headers, wfrOptions, customTracers); + + verify(mockTracer).delayTypeStarted("wait_for_ready_failed"); + verify(mockTracer).delayReasonAttached( + "wait_for_ready RPC failed with status: " + Status.UNAVAILABLE); + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 1e130423a45..1c3182237d3 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -147,8 +147,10 @@ public void pickAfterResolved() throws Exception { verify(mockSubchannel).requestConnection(); // Calling pickSubchannel() twice gave the same result - assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), - pickerCaptor.getValue().pickSubchannel(mockArgs)); + PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("pick_first: attempting to connect"); + assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 5ed84ade2f8..024ea84f60e 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -124,7 +124,8 @@ final class GrpclbState { static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { @Override public PickResult picked(PickSubchannelArgs args) { - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "grpclb: waiting for backend server list"); } @Override diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 87ad61c9f27..b4ac28eebf3 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -232,6 +232,16 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter, .build()); } + if (isMetricEnabled("grpc.client.attempt.delay", enableMetrics, disableDefault)) { + builder.clientAttemptDelayCounter( + meter.histogramBuilder( + "grpc.client.attempt.delay") + .setUnit("s") + .setDescription("Time taken to complete a client call attempt delay") + .setExplicitBucketBoundariesAdvice(LATENCY_BUCKETS) + .build()); + } + if (isMetricEnabled("grpc.client.attempt.sent_total_compressed_message_size", enableMetrics, disableDefault)) { builder.clientTotalSentCompressedMessageSizeCounter( diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index f783b9495dd..14be5b917c9 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -203,6 +203,8 @@ private static final class ClientTracer extends ClientStreamTracer { volatile String backendService; long attemptNanos; Code statusCode; + @Nullable private volatile Stopwatch activeDelayStopwatch; + @Nullable private volatile String activeDelayType; ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module, StreamInfo info, String target, String fullMethodName, @@ -216,6 +218,59 @@ private static final class ClientTracer extends ClientStreamTracer { this.stopwatch = module.stopwatchSupplier.get().start(); } + @Override + public void streamCreated(io.grpc.Attributes transportAtts, Metadata headers) { + delayEnded(); + } + + @Override + public void delayTypeStarted(String delayType) { + delayEnded(); + activeDelayType = delayType; + activeDelayStopwatch = module.stopwatchSupplier.get().start(); + } + + @Override + public void delayEnded() { + Stopwatch delayStopwatch = activeDelayStopwatch; + String delayType = activeDelayType; + if (delayStopwatch != null && delayType != null) { + delayStopwatch.stop(); + long delayNanos = delayStopwatch.elapsed(TimeUnit.NANOSECONDS); + activeDelayStopwatch = null; + activeDelayType = null; + if (module.resource.clientAttemptDelayCounter() != null) { + AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, fullMethodName) + .put(TARGET_KEY, target) + .put("grpc.delay_type", delayType); + if (module.localityEnabled) { + String savedLocality = locality; + if (savedLocality == null) { + savedLocality = ""; + } + builder.put(LOCALITY_KEY, savedLocality); + } + if (module.backendServiceEnabled) { + String savedBackendService = backendService; + if (savedBackendService == null) { + savedBackendService = ""; + } + builder.put(BACKEND_SERVICE_KEY, savedBackendService); + } + if (module.customLabelEnabled) { + builder.put( + CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL)); + } + for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { + plugin.addLabels(builder); + } + module.resource.clientAttemptDelayCounter() + .record(delayNanos * SECONDS_PER_NANO, builder.build(), attemptsState.otelContext); + } + } + } + @Override public void inboundHeaders(Metadata headers) { for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { @@ -262,6 +317,7 @@ public void inboundTrailers(Metadata trailers) { @Override public void streamClosed(Status status) { + delayEnded(); stopwatch.stop(); attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); Deadline deadline = info.getCallOptions().getDeadline(); diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java index d32ae1e67f5..085498d746e 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java @@ -35,6 +35,9 @@ abstract class OpenTelemetryMetricsResource { @Nullable abstract DoubleHistogram clientAttemptDurationCounter(); + @Nullable + abstract DoubleHistogram clientAttemptDelayCounter(); + @Nullable abstract LongHistogram clientTotalSentCompressedMessageSizeCounter(); @@ -79,6 +82,8 @@ abstract static class Builder { abstract Builder clientAttemptDurationCounter(DoubleHistogram counter); + abstract Builder clientAttemptDelayCounter(DoubleHistogram counter); + abstract Builder clientTotalSentCompressedMessageSizeCounter(LongHistogram counter); abstract Builder clientTotalReceivedCompressedMessageSizeCounter( diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index d214e99bd75..3fc9e3767f5 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -192,6 +192,7 @@ private final class ClientTracer extends ClientStreamTracer { private final Span parentSpan; volatile int seqNo; boolean isPendingStream; + @Nullable private volatile Span activeDelaySpan; ClientTracer(Span span, Span parentSpan) { this.span = checkNotNull(span, "span"); @@ -200,6 +201,7 @@ private final class ClientTracer extends ClientStreamTracer { @Override public void streamCreated(Attributes transportAtts, Metadata headers) { + delayEnded(); contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers, metadataSetter); if (isPendingStream) { @@ -212,6 +214,36 @@ public void createPendingStream() { isPendingStream = true; } + @Override + public void delayTypeStarted(String delayType) { + if (activeDelaySpan != null) { + activeDelaySpan.end(); + } + activeDelaySpan = otelTracer.spanBuilder("Attempt Delay: " + delayType) + .setParent(Context.current().with(span)) + .setAttribute("grpc.delay_type", delayType) + .startSpan(); + } + + @Override + public void delayReasonAttached(String delayReason) { + Span delaySpan = activeDelaySpan; + if (delaySpan != null) { + delaySpan.addEvent(delayReason); + } else { + span.addEvent("delay_reason: " + delayReason); + } + } + + @Override + public void delayEnded() { + Span delaySpan = activeDelaySpan; + if (delaySpan != null) { + delaySpan.end(); + activeDelaySpan = null; + } + } + @Override public void outboundMessageSent( int seqNo, long optionalWireSize, long optionalUncompressedSize) { @@ -238,6 +270,7 @@ public void inboundUncompressedSize(long bytes) { @Override public void streamClosed(io.grpc.Status status) { + delayEnded(); endSpanWithStatus(span, status); } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java index e6759aadb1e..dcbbe760ff2 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java @@ -274,6 +274,31 @@ public void clientBasicTracingMocking() { inOrder.verifyNoMoreInteractions(); } + @Test + public void clientDelayTracingMocking() { + Span mockDelaySpan = mock(Span.class); + when(mockSpanBuilder.setAttribute( + org.mockito.ArgumentMatchers.anyString(), + org.mockito.ArgumentMatchers.anyString())) + .thenReturn(mockSpanBuilder); + when(mockSpanBuilder.startSpan()).thenReturn(mockAttemptSpan, mockDelaySpan); + + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(mockClientSpan, method); + ClientStreamTracer clientStreamTracer = + callTracer.newClientStreamTracer(STREAM_INFO, new Metadata()); + + clientStreamTracer.delayTypeStarted("connecting"); + clientStreamTracer.delayReasonAttached("pick_first: attempting to connect"); + clientStreamTracer.delayEnded(); + + verify(mockTracer).spanBuilder(eq("Attempt Delay: connecting")); + verify(mockSpanBuilder).setAttribute(eq("grpc.delay_type"), eq("connecting")); + verify(mockDelaySpan).addEvent(eq("pick_first: attempting to connect")); + verify(mockDelaySpan).end(); + } + @Test public void clientBasicTracingRule() { OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( @@ -381,6 +406,34 @@ public void clientBasicTracingRule() { assertEquals(attemptSpanData.hasEnded(), true); } + @Test + public void clientDelayTracingRule() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + Span clientSpan = tracerRule.spanBuilder("test-client-span").startSpan(); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(clientSpan, method); + ClientStreamTracer clientStreamTracer = + callTracer.newClientStreamTracer(STREAM_INFO, new Metadata()); + + clientStreamTracer.delayTypeStarted("connecting"); + clientStreamTracer.delayReasonAttached("pick_first: attempting to connect"); + clientStreamTracer.delayEnded(); + clientStreamTracer.streamClosed(Status.OK); + callTracer.callEnded(Status.OK); + clientSpan.end(); + + List spans = openTelemetryRule.getSpans(); + assertEquals(3, spans.size()); + SpanData delaySpanData = spans.get(0); + + assertEquals("Attempt Delay: connecting", delaySpanData.getName()); + assertEquals("connecting", delaySpanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("grpc.delay_type"))); + assertEquals(1, delaySpanData.getEvents().size()); + assertEquals("pick_first: attempting to connect", delaySpanData.getEvents().get(0).getName()); + } + @Test public void clientInterceptor() { testClientInterceptors(false); diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index a2846fd04c8..92bf052ba51 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -1050,7 +1050,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); } else { - return PickResult.withNoResult(); + return PickResult.withNoResult("rls_lookup_pending", "RLS request pending."); } } diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index a52390743a6..e7925ff8986 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -262,6 +262,8 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayType()).isEqualTo("rls_lookup_pending"); + assertThat(res.getDelayReason()).isEqualTo("RLS request pending."); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); @@ -493,6 +495,8 @@ public void lb_working_withoutDefaultTarget() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayType()).isEqualTo("rls_lookup_pending"); + assertThat(res.getDelayReason()).isEqualTo("RLS request pending."); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); diff --git a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 9c9998571e5..5f58b4321ae 100644 --- a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -38,6 +38,21 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayTypeStarted(String delayType) { + delegate().delayTypeStarted(delayType); + } + + @Override + public void delayReasonAttached(String delayReason) { + delegate().delayReasonAttached(delayReason); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 22940e875ac..8adce59407e 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,8 +41,10 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("connecting", "round_robin: attempting to connect"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); public RoundRobinLoadBalancer(Helper helper) { super(helper); @@ -68,7 +70,7 @@ protected void updateOverallBalancingState() { } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); } else { updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); } diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 18854ca1bb6..d9b8b4a85f2 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -86,7 +86,8 @@ public class RoundRobinLoadBalancerTest { private static final Attributes.Key MAJOR_KEY = Attributes.Key.create("major-key"); private static final SubchannelPicker EMPTY_PICKER = - new FixedResultPicker(PickResult.withNoResult()); + new FixedResultPicker( + PickResult.withNoResult("connecting", "round_robin: attempting to connect")); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); @@ -576,6 +577,16 @@ private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo testHelperInst.deliverSubchannelState(subchannel, newState); } + @Test + public void roundRobin_delayAttributes() { + acceptAddresses(servers, affinity); + verify(mockHelper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + PickResult res = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(res.getDelayType()).isEqualTo("connecting"); + assertThat(res.getDelayReason()).contains("attempting to connect"); + } + private static class FakeSocketAddress extends SocketAddress { final String name; diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 29b18fb6aa7..3e81cfdf46b 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; @@ -118,6 +119,11 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet + helper.updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult( + "cds_dynamic_discovery", "cds: fetching xDS cluster metadata"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 64105144240..3a0eaf2638e 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -200,7 +200,8 @@ public void shutdown() { private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper { private final AtomicLong inFlights; private ConnectivityState currentState = ConnectivityState.IDLE; - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "cluster_impl: initializing")); private List dropPolicies = Collections.emptyList(); private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; @Nullable diff --git a/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java index b5f09c4ea93..8dbf021775b 100644 --- a/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java @@ -107,7 +107,8 @@ private final class LazyPicker extends SubchannelPicker { public PickResult pickSubchannel(PickSubchannelArgs args) { // activate() is a no-op after shutdown() helper.getSynchronizationContext().execute(LazyDelegate.this::activate); - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "lazy: waiting for connection"); } } } diff --git a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java index ddaeb4f4be5..f638b23b565 100644 --- a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java @@ -54,7 +54,8 @@ final class LeastRequestLoadBalancer extends MultiChildLoadBalancer { private final ThreadSafeRandom random; - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "least_request: initializing")); private int choiceCount = DEFAULT_CHOICE_COUNT; LeastRequestLoadBalancer(Helper helper) { @@ -113,7 +114,10 @@ protected void updateOverallBalancingState() { } } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "least_request: connecting"))); } else { // Give it all the failing children and let it randomly pick among them updateBalancingState(TRANSIENT_FAILURE, @@ -246,7 +250,8 @@ public boolean equals(Object o) { static final class EmptyPicker extends SubchannelPicker { @Override public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "least_request: waiting for subchannel"); } @Override diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index ca142af0af3..031c8b23ac1 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -330,7 +330,11 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = newPicker; + if (newState == CONNECTING || newState == IDLE) { + picker = new PriorityPicker(newPicker, priority); + } else { + picker = newPicker; + } if (deletionTimer != null && deletionTimer.isPending()) { return; @@ -365,4 +369,42 @@ protected Helper delegate() { } } } + + private static final class PriorityPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final String priority; + + PriorityPicker(SubchannelPicker delegate, String priority) { + this.delegate = checkNotNull(delegate, "delegate"); + this.priority = checkNotNull(priority, "priority"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = delegate.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayType() != null) { + String childReason = childResult.getDelayReason(); + String reason = "priority_" + priority + ":" + (childReason != null ? childReason : ""); + return PickResult.withNoResult(childResult.getDelayType(), reason); + } + return childResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PriorityPicker that = (PriorityPicker) o; + return delegate.equals(that.delegate) && priority.equals(that.priority); + } + + @Override + public int hashCode() { + return Objects.hash(delegate, priority); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 513f4d643ea..eb8ba235d82 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -356,6 +356,8 @@ public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { } private static final class RingHashPicker extends SubchannelPicker { + private static final PickResult RING_HASH_CONNECTING_RESULT = + PickResult.withNoResult("connecting", "ring_hash: waiting for connection"); private final SynchronizationContext syncContext; private final List ring; // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, @@ -453,7 +455,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs // are failed unless there is a READY connection. if (subchannelView.connectivityState == CONNECTING) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } if (subchannelView.connectivityState == IDLE) { @@ -463,7 +465,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return PickResult.withNoResult(); // Indicates that this should be retried after backoff + // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; } } } else { @@ -487,7 +490,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } } if (requestedConnection) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } } diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index 75a8411b5a4..656f358308f 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -107,7 +107,8 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer { private final Ticker ticker; private String locality = ""; private String backendService = ""; - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_round_robin: initializing")); // The metric instruments are only registered once and shared by all instances of this LB. static { @@ -227,7 +228,9 @@ protected void updateOverallBalancingState() { if (isConnecting) { updateBalancingState( - ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + ConnectivityState.CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_round_robin: connecting"))); } else { updateBalancingState( ConnectivityState.TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); diff --git a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java index 9468a9daf9d..1901e4ed0d5 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java @@ -165,7 +165,8 @@ private void updateOverallBalancingState() { if (overallState == TRANSIENT_FAILURE) { picker = new WeightedRandomPicker(errorPickers); } else { - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_target: connecting")); } } else { picker = new WeightedRandomPicker(childPickers); @@ -197,7 +198,8 @@ private static ConnectivityState aggregateState( private final class ChildHelper extends ForwardingLoadBalancerHelper { String name; ConnectivityState currentState = CONNECTING; - SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_target: initializing")); private ChildHelper(String name) { this.name = name; diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index ff4813fe6a8..83a169ba2c7 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -343,6 +343,35 @@ public void dynamicCluster() { assertThat(this.lastXdsConfig.getClusters()).doesNotContainKey(clusterName); } + @Test + public void discoverDynamicCluster_pending_emitsToken() { + String clusterName = "cluster2"; + CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); + + XdsConfig xdsConfig = new XdsConfig(null, null, null, ImmutableMap.of()); + + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, xdsConfig) + .set( + XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, + new XdsConfig.XdsClusterSubscriptionRegistry() { + @Override + public XdsConfig.Subscription subscribeToCluster(String clusterName) { + return mock(XdsConfig.Subscription.class); + } + }) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + + verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getDelayType()).isEqualTo("cds_dynamic_discovery"); + assertThat(result.getDelayReason()).isEqualTo("cds: fetching xDS cluster metadata"); + } + @Test public void discoverAggregateCluster_createsPriorityLbPolicy() { CdsLoadBalancerProvider cdsLoadBalancerProvider = new CdsLoadBalancerProvider(lbRegistry); diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 988bc720e45..364d2cbeb9c 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -634,7 +634,8 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - verify(helper).updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); + verify(helper, times(2)) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); Helper helper0 = Iterables.getOnlyElement(fooHelpers); @@ -650,7 +651,7 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { helper0.updateBalancingState( CONNECTING, EMPTY_PICKER); - verify(helper, times(2)) + verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // failover happens @@ -676,7 +677,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - inOrder.verify(helper) + inOrder.verify(helper, times(2)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); @@ -694,7 +695,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { fakeClock.forwardTime(5, TimeUnit.SECONDS); assertThat(fooBalancers).hasSize(2); assertThat(fooHelpers).hasSize(2); - inOrder.verify(helper, times(2)) + inOrder.verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); Helper helper1 = Iterables.getLast(fooHelpers); @@ -972,7 +973,7 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. @@ -1010,7 +1011,38 @@ public void noDuplicateOverallBalancingStateUpdate() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper, times(4)).updateBalancingState(any(), any()); + verify(helper, times(6)).updateBalancingState(any(), any()); + } + + @Test + public void priorityPicker_prependsToken() throws Exception { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(newChildConfig(fooLbProvider, new Object()), true); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0")); + + priorityLb.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + + Helper helper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + + SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); + when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("connecting", "child_reason")); + + helper0.updateBalancingState(CONNECTING, mockChildPicker); + + verify(helper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + SubchannelPicker priorityPicker = pickerCaptor.getValue(); + PickResult result = priorityPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("priority_p0:child_reason"); } private void assertLatestConnectivityState(ConnectivityState expectedState) { diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index b515ed81158..da11df24af4 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -160,6 +160,8 @@ public void subchannelLazyConnectUntilPicked() { PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("ring_hash: waiting for connection"); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() && !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; @@ -524,6 +526,8 @@ public void pickWithRandomHash_atLeastOneSubchannelConnecting() { PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("ring_hash: waiting for connection"); verifyConnection(0); } @@ -546,6 +550,8 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("ring_hash: waiting for connection"); verifyConnection(1); }