From b50ca84e30c151d7f30c8917a470acbfc7ffa126 Mon Sep 17 00:00:00 2001 From: agravator Date: Wed, 13 May 2026 22:15:34 +0530 Subject: [PATCH 1/7] core,api,xds: Implement load balancing policy delay plumbing This commit implements the plumbing required to propagate delay reason tokens from load balancing policies up to the transport layer and tracers, as specified in the LB policy delay design. --- .../main/java/io/grpc/ClientStreamTracer.java | 17 +++++++ api/src/main/java/io/grpc/LoadBalancer.java | 31 ++++++++++-- .../grpc/internal/DelayedClientTransport.java | 47 ++++++++++++++++-- .../grpc/internal/PickFirstLoadBalancer.java | 5 +- .../internal/DelayedClientTransportTest.java | 48 +++++++++++++++++++ .../internal/PickFirstLoadBalancerTest.java | 5 +- .../java/io/grpc/rls/CachingRlsLbClient.java | 2 +- .../java/io/grpc/rls/RlsLoadBalancerTest.java | 2 + .../io/grpc/util/RoundRobinLoadBalancer.java | 5 +- .../grpc/util/RoundRobinLoadBalancerTest.java | 2 +- .../java/io/grpc/xds/CdsLoadBalancer2.java | 1 + .../io/grpc/xds/PriorityLoadBalancer.java | 12 ++++- .../io/grpc/xds/RingHashLoadBalancer.java | 8 ++-- .../io/grpc/xds/CdsLoadBalancer2Test.java | 22 +++++++++ .../io/grpc/xds/PriorityLoadBalancerTest.java | 30 ++++++++++++ .../io/grpc/xds/RingHashLoadBalancerTest.java | 41 ++++++++++++++++ 16 files changed, 256 insertions(+), 22 deletions(-) diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 42e1fdfebea..07ceb11fa59 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -57,6 +57,23 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat public void createPendingStream() { } + /** + * A delay segment started with a specific reason during load balancing. + * + * @param reasonToken the reason for the delay, e.g., "pick_first:connecting" + * @since 1.82.0 + */ + public void delayStarted(String reasonToken) { + } + + /** + * The current delay segment ended. + * + * @since 1.82.0 + */ + public void delayEnded() { + } + /** * Headers has been sent to the socket. */ diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 3187ae8ef1b..d3af8822058 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -549,25 +549,30 @@ public static final class PickResult { // True if the result is created by withDrop() private final boolean drop; @Nullable private final String authorityOverride; + @Nullable private final String delayReasonToken; private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop) { - this.subchannel = subchannel; - this.streamTracerFactory = streamTracerFactory; - this.status = checkNotNull(status, "status"); - this.drop = drop; - this.authorityOverride = null; + this(subchannel, streamTracerFactory, status, drop, null, null); } private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop, @Nullable String authorityOverride) { + this(subchannel, streamTracerFactory, status, drop, authorityOverride, null); + } + + private PickResult( + @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, + Status status, boolean drop, @Nullable String authorityOverride, + @Nullable String delayReasonToken) { this.subchannel = subchannel; this.streamTracerFactory = streamTracerFactory; this.status = checkNotNull(status, "status"); this.drop = drop; this.authorityOverride = authorityOverride; + this.delayReasonToken = delayReasonToken; } /** @@ -727,6 +732,22 @@ public static PickResult withNoResult() { return NO_RESULT; } + /** + * No decision could be made. The RPC will stay buffered with a specific reason. + * + * @since 1.82.0 + */ + public static PickResult withNoResult(String delayReasonToken) { + Preconditions.checkNotNull(delayReasonToken, "delayReasonToken"); + return new PickResult(null, null, Status.OK, false, null, delayReasonToken); + } + + /** Returns the delay reason token if any. */ + @Nullable + public String getDelayReasonToken() { + return delayReasonToken; + } + /** Returns the authority override if any. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656") @Nullable diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 5569e1eecf8..d979f50a648 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -157,7 +157,8 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - return createPendingStream(args, tracers, pickResult); + String token = pickResult != null ? pickResult.getDelayReasonToken() : null; + return createPendingStream(args, tracers, pickResult, token); } state = newerState; } @@ -173,8 +174,8 @@ public final ClientStream newStream( */ @GuardedBy("lock") private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, - PickResult pickResult) { - PendingStream pendingStream = new PendingStream(args, tracers); + PickResult pickResult, @Nullable String delayReasonToken) { + PendingStream pendingStream = new PendingStream(args, tracers, delayReasonToken); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { pendingStream.lastPickStatus = pickResult.getStatus(); } @@ -303,6 +304,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { + stream.endDelay(); Executor executor = defaultAppExecutor; // createRealStream may be expensive. It will start real streams on the transport. If // there are pending requests, they will be serialized too, which may be expensive. Since @@ -315,7 +317,9 @@ final void reprocess(@Nullable SubchannelPicker picker) { executor.execute(runnable); } toRemove.add(stream); - } // else: stay pending + } else { // stay pending + stream.updateDelayReason(pickResult.getDelayReasonToken()); + } } synchronized (lock) { @@ -361,11 +365,43 @@ private class PendingStream extends DelayedStream { private final Context context = Context.current(); private final ClientStreamTracer[] tracers; private volatile Status lastPickStatus; + @Nullable private String delayReasonToken; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, @Nullable String initialToken) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; + this.delayReasonToken = initialToken; + if (initialToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(initialToken); + } + } + } + + void updateDelayReason(String newToken) { + if (!java.util.Objects.equals(delayReasonToken, newToken)) { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + } + delayReasonToken = newToken; + if (newToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(newToken); + } + } + } + } + + void endDelay() { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + delayReasonToken = null; + } } /** Runnable may be null. */ @@ -391,6 +427,7 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve @Override public void cancel(Status reason) { + endDelay(); super.cancel(reason); synchronized (lock) { if (reportTransportTerminated != null) { diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index cf4b4c94e04..b8e501da561 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,6 +38,7 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { + private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("pick_first:connecting"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; @@ -83,7 +84,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -135,7 +136,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave // the current picker in-place. But ignoring the potential optimization is simpler. - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker(CONNECTING_RESULT); break; case READY: picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index d7e1d4ca4f6..5891d26f344 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -772,6 +772,54 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure( + " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]"); } + @Test + public void streamDelayMetrics() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + + InOrder inOrder = inOrder(mockTracer); + inOrder.verify(mockTracer).delayStarted("pick_first:connecting"); + + SubchannelPicker customDelayPicker = mock(SubchannelPicker.class); + when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("rls:lookup_pending")); + + delayedTransport.reprocess(customDelayPicker); + + inOrder.verify(mockTracer).delayEnded(); + inOrder.verify(mockTracer).delayStarted("rls:lookup_pending"); + + delayedTransport.reprocess(mockPicker); + + inOrder.verify(mockTracer).delayEnded(); + } + + @Test + public void streamDelayMetrics_cancelled() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + + verify(mockTracer).delayStarted("pick_first:connecting"); + + stream.cancel(Status.CANCELLED); + + verify(mockTracer).delayEnded(); + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 1e130423a45..5bfabd7ea0e 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -147,8 +147,9 @@ public void pickAfterResolved() throws Exception { verify(mockSubchannel).requestConnection(); // Calling pickSubchannel() twice gave the same result - assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), - pickerCaptor.getValue().pickSubchannel(mockArgs)); + PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(result.getDelayReasonToken()).isEqualTo("pick_first:connecting"); + assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); } diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index a2846fd04c8..2748a2679f1 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -1050,7 +1050,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); } else { - return PickResult.withNoResult(); + return PickResult.withNoResult("rls:lookup_pending"); } } diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index a52390743a6..d5d94c4dd6e 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -262,6 +262,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); @@ -493,6 +494,7 @@ public void lb_working_withoutDefaultTarget() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 22940e875ac..33097cce31f 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,8 +41,9 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { + private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("round_robin:connecting"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); public RoundRobinLoadBalancer(Helper helper) { super(helper); @@ -68,7 +69,7 @@ protected void updateOverallBalancingState() { } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); } else { updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); } diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 18854ca1bb6..895cf9b4251 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -86,7 +86,7 @@ public class RoundRobinLoadBalancerTest { private static final Attributes.Key MAJOR_KEY = Attributes.Key.create("major-key"); private static final SubchannelPicker EMPTY_PICKER = - new FixedResultPicker(PickResult.withNoResult()); + new FixedResultPicker(PickResult.withNoResult("round_robin:connecting")); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index f6ee60ab1ef..8bda76a5e68 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -119,6 +119,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet + helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index 6e4566de76d..d5d000c0dec 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -322,7 +322,17 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = newPicker; + picker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = newPicker.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { + return PickResult.withNoResult( + "priority_" + priority + ":" + childResult.getDelayReasonToken()); + } + return childResult; + } + }; if (deletionTimer != null && deletionTimer.isPending()) { return; diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 513f4d643ea..5f15658128b 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -356,6 +356,8 @@ public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { } private static final class RingHashPicker extends SubchannelPicker { + private static final PickResult RING_HASH_CONNECTING_RESULT = + PickResult.withNoResult("ring_hash:connecting"); private final SynchronizationContext syncContext; private final List ring; // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, @@ -453,7 +455,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs // are failed unless there is a READY connection. if (subchannelView.connectivityState == CONNECTING) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } if (subchannelView.connectivityState == IDLE) { @@ -463,7 +465,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return PickResult.withNoResult(); // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; // Indicates that this should be retried after backoff } } } else { @@ -487,7 +489,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } } if (requestedConnection) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index ff4813fe6a8..e179446f715 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -343,6 +343,28 @@ public void dynamicCluster() { assertThat(this.lastXdsConfig.getClusters()).doesNotContainKey(clusterName); } + @Test + public void discoverDynamicCluster_pending_emitsToken() { + String clusterName = "cluster2"; + CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); + + XdsConfig mockXdsConfig = mock(XdsConfig.class); + when(mockXdsConfig.getClusters()).thenReturn(ImmutableMap.of()); + + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, mockXdsConfig) + .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, xdsDepManager) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getDelayReasonToken()).isEqualTo("cds:discovery_pending"); + } + @Test public void discoverAggregateCluster_createsPriorityLbPolicy() { CdsLoadBalancerProvider cdsLoadBalancerProvider = new CdsLoadBalancerProvider(lbRegistry); diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index beb568be9ce..56feb08f02b 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -910,6 +910,36 @@ public void noDuplicateOverallBalancingStateUpdate() { verify(helper, times(4)).updateBalancingState(any(), any()); } + @Test + public void priorityPicker_prependsToken() throws Exception { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(newChildConfig(fooLbProvider, new Object()), true); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0")); + + priorityLb.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + + Helper helper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + + SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); + when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("child_token")); + + helper0.updateBalancingState(CONNECTING, mockChildPicker); + + verify(helper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + SubchannelPicker priorityPicker = pickerCaptor.getValue(); + PickResult result = priorityPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + + assertThat(result.getDelayReasonToken()).isEqualTo("priority_p0:child_token"); + } + private void assertLatestConnectivityState(ConnectivityState expectedState) { verify(helper, atLeastOnce()) .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index b515ed81158..d7a7892c57f 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -160,6 +160,7 @@ public void subchannelLazyConnectUntilPicked() { PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() && !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; @@ -524,6 +525,7 @@ public void pickWithRandomHash_atLeastOneSubchannelConnecting() { PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(0); } @@ -546,6 +548,7 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(1); } @@ -1161,6 +1164,44 @@ public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { assertThat(connectionRequestedQueue.poll()).isNull(); } + @Test + public void ringHashPicker_passesThroughChildToken() throws Exception { + final SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); + when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("child_delay_token")); + + loadBalancer = new RingHashLoadBalancer(helper, random) { + @Override + protected ChildLbState createChildLbState(Object key) { + return new ChildLbState(key, pickFirstLbProvider) { + @Override + public SubchannelPicker getCurrentPicker() { + return mockChildPicker; + } + + @Override + public ConnectivityState getCurrentState() { + return READY; + } + }; + } + }; + + RingHashConfig config = new RingHashConfig(10, 100, ""); + List servers = createWeightedServerAddrs(1); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + + verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + + assertThat(result.getDelayReasonToken()).isEqualTo("child_delay_token"); + } + private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) { From c38ce1dd8f2df6c00a566db8f6cf7b50ee2d3c51 Mon Sep 17 00:00:00 2001 From: agravator Date: Thu, 14 May 2026 09:55:07 +0530 Subject: [PATCH 2/7] fix: tests --- .../grpc/internal/DelayedClientTransport.java | 3 +- .../ForwardingClientStreamTracer.java | 10 ++++ .../grpc/internal/PickFirstLoadBalancer.java | 3 +- .../internal/DelayedClientTransportTest.java | 3 +- .../util/ForwardingClientStreamTracer.java | 10 ++++ .../io/grpc/util/RoundRobinLoadBalancer.java | 3 +- .../java/io/grpc/xds/CdsLoadBalancer2.java | 4 +- .../io/grpc/xds/PriorityLoadBalancer.java | 53 +++++++++++++++---- .../io/grpc/xds/RingHashLoadBalancer.java | 3 +- .../io/grpc/xds/CdsLoadBalancer2Test.java | 16 ++++-- .../io/grpc/xds/PriorityLoadBalancerTest.java | 13 ++--- .../io/grpc/xds/RingHashLoadBalancerTest.java | 37 ------------- 12 files changed, 93 insertions(+), 65 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index d979f50a648..b9269350088 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -367,7 +367,8 @@ private class PendingStream extends DelayedStream { private volatile Status lastPickStatus; @Nullable private String delayReasonToken; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, @Nullable String initialToken) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, + @Nullable String initialToken) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index e7679ea14cc..e4c2b3b9933 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -39,6 +39,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index b8e501da561..4111700fffe 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,7 +38,8 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { - private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("pick_first:connecting"); + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("pick_first:connecting"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 5891d26f344..8f34295a701 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -782,7 +782,7 @@ public void streamDelayMetrics() { .thenReturn(PickResult.withNoResult("pick_first:connecting")); delayedTransport.reprocess(connectingPicker); - ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + delayedTransport.newStream(method, headers, callOptions, customTracers); InOrder inOrder = inOrder(mockTracer); inOrder.verify(mockTracer).delayStarted("pick_first:connecting"); @@ -812,6 +812,7 @@ public void streamDelayMetrics_cancelled() { delayedTransport.reprocess(connectingPicker); ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); verify(mockTracer).delayStarted("pick_first:connecting"); diff --git a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 9c9998571e5..1bf24b12a19 100644 --- a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -38,6 +38,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 33097cce31f..ab0b2c49c21 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,7 +41,8 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { - private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("round_robin:connecting"); + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("round_robin:connecting"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 8bda76a5e68..8be155ec0f8 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; @@ -119,7 +120,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet - helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); + helper.updateBalancingState( + CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index d5d000c0dec..ea26c8cc2bc 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -322,17 +322,11 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = new SubchannelPicker() { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - PickResult childResult = newPicker.pickSubchannel(args); - if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { - return PickResult.withNoResult( - "priority_" + priority + ":" + childResult.getDelayReasonToken()); - } - return childResult; - } - }; + if (newState == CONNECTING || newState == IDLE) { + picker = new PriorityPicker(newPicker, priority); + } else { + picker = newPicker; + } if (deletionTimer != null && deletionTimer.isPending()) { return; @@ -367,4 +361,41 @@ protected Helper delegate() { } } } + + private static final class PriorityPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final String priority; + + PriorityPicker(SubchannelPicker delegate, String priority) { + this.delegate = com.google.common.base.Preconditions.checkNotNull(delegate, "delegate"); + this.priority = com.google.common.base.Preconditions.checkNotNull(priority, "priority"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = delegate.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { + return PickResult.withNoResult( + "priority_" + priority + ":" + childResult.getDelayReasonToken()); + } + return childResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PriorityPicker that = (PriorityPicker) o; + return delegate.equals(that.delegate) && priority.equals(that.priority); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(delegate, priority); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 5f15658128b..15cd5dba621 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -465,7 +465,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return RING_HASH_CONNECTING_RESULT; // Indicates that this should be retried after backoff + // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; } } } else { diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index e179446f715..51e0d08f223 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -348,19 +348,25 @@ public void discoverDynamicCluster_pending_emitsToken() { String clusterName = "cluster2"; CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); - XdsConfig mockXdsConfig = mock(XdsConfig.class); - when(mockXdsConfig.getClusters()).thenReturn(ImmutableMap.of()); + XdsConfig xdsConfig = new XdsConfig(null, null, null, ImmutableMap.of()); loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(Collections.emptyList()) .setAttributes(Attributes.newBuilder() - .set(XdsAttributes.XDS_CONFIG, mockXdsConfig) - .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, xdsDepManager) + .set(XdsAttributes.XDS_CONFIG, xdsConfig) + .set( + XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, + new XdsConfig.XdsClusterSubscriptionRegistry() { + @Override + public XdsConfig.Subscription subscribeToCluster(String clusterName) { + return mock(XdsConfig.Subscription.class); + } + }) .build()) .setLoadBalancingPolicyConfig(cdsConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); assertThat(result.getDelayReasonToken()).isEqualTo("cds:discovery_pending"); } diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 56feb08f02b..6f0db55a8a7 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -531,7 +531,8 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - verify(helper).updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); + verify(helper, times(2)) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); Helper helper0 = Iterables.getOnlyElement(fooHelpers); @@ -547,7 +548,7 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { helper0.updateBalancingState( CONNECTING, EMPTY_PICKER); - verify(helper, times(2)) + verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // failover happens @@ -573,7 +574,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - inOrder.verify(helper) + inOrder.verify(helper, times(2)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); @@ -591,7 +592,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { fakeClock.forwardTime(5, TimeUnit.SECONDS); assertThat(fooBalancers).hasSize(2); assertThat(fooHelpers).hasSize(2); - inOrder.verify(helper, times(2)) + inOrder.verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); Helper helper1 = Iterables.getLast(fooHelpers); @@ -869,7 +870,7 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. @@ -907,7 +908,7 @@ public void noDuplicateOverallBalancingStateUpdate() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper, times(4)).updateBalancingState(any(), any()); + verify(helper, times(6)).updateBalancingState(any(), any()); } @Test diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index d7a7892c57f..931d1f4df8e 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -1164,43 +1164,6 @@ public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { assertThat(connectionRequestedQueue.poll()).isNull(); } - @Test - public void ringHashPicker_passesThroughChildToken() throws Exception { - final SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); - when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withNoResult("child_delay_token")); - - loadBalancer = new RingHashLoadBalancer(helper, random) { - @Override - protected ChildLbState createChildLbState(Object key) { - return new ChildLbState(key, pickFirstLbProvider) { - @Override - public SubchannelPicker getCurrentPicker() { - return mockChildPicker; - } - - @Override - public ConnectivityState getCurrentState() { - return READY; - } - }; - } - }; - - RingHashConfig config = new RingHashConfig(10, 100, ""); - List servers = createWeightedServerAddrs(1); - - loadBalancer.acceptResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); - - verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); - PickResult result = pickerCaptor.getValue().pickSubchannel(args); - - assertThat(result.getDelayReasonToken()).isEqualTo("child_delay_token"); - } private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) { From a992bdfb45d39d831f6ee19958f13ab750fcedce Mon Sep 17 00:00:00 2001 From: agravator Date: Tue, 19 May 2026 13:40:22 +0530 Subject: [PATCH 3/7] fix: minor changes --- .../main/java/io/grpc/internal/DelayedClientTransport.java | 3 ++- xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java | 6 +++--- xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java | 1 - 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index b9269350088..bde573c7508 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.Objects; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -381,7 +382,7 @@ private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, } void updateDelayReason(String newToken) { - if (!java.util.Objects.equals(delayReasonToken, newToken)) { + if (!Objects.equals(delayReasonToken, newToken)) { if (delayReasonToken != null) { for (ClientStreamTracer tracer : tracers) { tracer.delayEnded(); diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index ea26c8cc2bc..ab84c2b96e5 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -367,8 +367,8 @@ private static final class PriorityPicker extends SubchannelPicker { private final String priority; PriorityPicker(SubchannelPicker delegate, String priority) { - this.delegate = com.google.common.base.Preconditions.checkNotNull(delegate, "delegate"); - this.priority = com.google.common.base.Preconditions.checkNotNull(priority, "priority"); + this.delegate = checkNotNull(delegate, "delegate"); + this.priority = checkNotNull(priority, "priority"); } @Override @@ -395,7 +395,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return java.util.Objects.hash(delegate, priority); + return Objects.hash(delegate, priority); } } } diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 931d1f4df8e..387bc525043 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -1164,7 +1164,6 @@ public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { assertThat(connectionRequestedQueue.poll()).isNull(); } - private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) { From 6a55ff212359e70a7940624c8e8d28f7450b296e Mon Sep 17 00:00:00 2001 From: agrawalabhi Date: Mon, 8 Jun 2026 11:06:05 +0000 Subject: [PATCH 4/7] add missing endDelay() --- .../grpc/internal/DelayedClientTransport.java | 9 +++++++-- .../internal/DelayedClientTransportTest.java | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index bde573c7508..6032147d0b1 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -247,7 +247,7 @@ public final void shutdownNow(Status status) { } if (savedReportTransportTerminated != null) { for (PendingStream stream : savedPendingStreams) { - Runnable runnable = stream.setStream( + Runnable runnable = stream.setStreamAndEndDelay( new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers)); if (runnable != null) { // Drain in-line instead of using an executor as failing stream just throws everything @@ -406,6 +406,11 @@ void endDelay() { } } + Runnable setStreamAndEndDelay(ClientStream stream) { + endDelay(); + return setStream(stream); + } + /** Runnable may be null. */ private Runnable createRealStream(ClientTransport transport, String authorityOverride) { ClientStream realStream; @@ -424,7 +429,7 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve // been called on the delayed stream. realStream.setAuthority(authorityOverride); } - return setStream(realStream); + return setStreamAndEndDelay(realStream); } @Override diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 8f34295a701..aebf0bf5b96 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -821,6 +821,26 @@ public void streamDelayMetrics_cancelled() { verify(mockTracer).delayEnded(); } + @Test + public void streamDelayMetrics_shutdownNow() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); + + verify(mockTracer).delayStarted("pick_first:connecting"); + + delayedTransport.shutdownNow(Status.UNAVAILABLE); + + verify(mockTracer).delayEnded(); + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override From 389b96f3f0fb52cc404e97874361ce721acb4f0c Mon Sep 17 00:00:00 2001 From: agrawalabhi Date: Fri, 19 Jun 2026 10:59:06 +0000 Subject: [PATCH 5/7] core,api,rls,util,xds: Implement dual Load Balancer delay APIs and cadence invariants - Refactor ClientStreamTracer to expose delayTypeStarted(String) and delayReasonAttached(String) - Enhance PickResult with separate delayType and delayReason diagnostic fields - Implement Mark Roth's hybrid telemetry cadence model in DelayedClientTransport.PendingStream - Support channel fallback delay states (client_channel_init, subchannel_state_mismatch, wait_for_ready_failed) - Simplify leaf and container LB policies to emit canonical unified connecting metric labels --- .../main/java/io/grpc/ClientStreamTracer.java | 15 +++- api/src/main/java/io/grpc/LoadBalancer.java | 39 +++++--- .../grpc/internal/DelayedClientTransport.java | 90 +++++++++++++++---- .../ForwardingClientStreamTracer.java | 9 +- .../grpc/internal/PickFirstLoadBalancer.java | 4 +- .../internal/DelayedClientTransportTest.java | 20 +++-- .../internal/PickFirstLoadBalancerTest.java | 3 +- .../java/io/grpc/rls/CachingRlsLbClient.java | 2 +- .../java/io/grpc/rls/RlsLoadBalancerTest.java | 6 +- .../util/ForwardingClientStreamTracer.java | 9 +- .../io/grpc/util/RoundRobinLoadBalancer.java | 4 +- .../grpc/util/RoundRobinLoadBalancerTest.java | 3 +- .../java/io/grpc/xds/CdsLoadBalancer2.java | 5 +- .../io/grpc/xds/PriorityLoadBalancer.java | 7 +- .../io/grpc/xds/RingHashLoadBalancer.java | 4 +- .../io/grpc/xds/CdsLoadBalancer2Test.java | 3 +- .../io/grpc/xds/PriorityLoadBalancerTest.java | 5 +- .../io/grpc/xds/RingHashLoadBalancerTest.java | 9 +- 18 files changed, 169 insertions(+), 68 deletions(-) diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 07ceb11fa59..833716b6412 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -58,12 +58,21 @@ public void createPendingStream() { } /** - * A delay segment started with a specific reason during load balancing. + * A delay segment started with a canonical root cause. * - * @param reasonToken the reason for the delay, e.g., "pick_first:connecting" + * @param delayType the canonical root cause label (e.g., "connecting", "client_channel_init") * @since 1.82.0 */ - public void delayStarted(String reasonToken) { + public void delayTypeStarted(String delayType) { + } + + /** + * High-cardinality diagnostic context attached to the active delay span. + * + * @param delayReason verbose diagnostic description of the delay + * @since 1.82.0 + */ + public void delayReasonAttached(String delayReason) { } /** diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index d3af8822058..65c3eba5e2c 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -549,30 +549,32 @@ public static final class PickResult { // True if the result is created by withDrop() private final boolean drop; @Nullable private final String authorityOverride; - @Nullable private final String delayReasonToken; + @Nullable private final String delayType; + @Nullable private final String delayReason; private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop) { - this(subchannel, streamTracerFactory, status, drop, null, null); + this(subchannel, streamTracerFactory, status, drop, null, null, null); } private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop, @Nullable String authorityOverride) { - this(subchannel, streamTracerFactory, status, drop, authorityOverride, null); + this(subchannel, streamTracerFactory, status, drop, authorityOverride, null, null); } private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop, @Nullable String authorityOverride, - @Nullable String delayReasonToken) { + @Nullable String delayType, @Nullable String delayReason) { this.subchannel = subchannel; this.streamTracerFactory = streamTracerFactory; this.status = checkNotNull(status, "status"); this.drop = drop; this.authorityOverride = authorityOverride; - this.delayReasonToken = delayReasonToken; + this.delayType = delayType; + this.delayReason = delayReason; } /** @@ -684,7 +686,7 @@ public static PickResult withSubchannel(Subchannel subchannel) { */ public PickResult copyWithSubchannel(Subchannel subchannel) { return new PickResult(checkNotNull(subchannel, "subchannel"), streamTracerFactory, - status, drop, authorityOverride); + status, drop, authorityOverride, delayType, delayReason); } /** @@ -695,7 +697,7 @@ public PickResult copyWithSubchannel(Subchannel subchannel) { */ public PickResult copyWithStreamTracerFactory( @Nullable ClientStreamTracer.Factory streamTracerFactory) { - return new PickResult(subchannel, streamTracerFactory, status, drop, authorityOverride); + return new PickResult(subchannel, streamTracerFactory, status, drop, authorityOverride, delayType, delayReason); } /** @@ -733,19 +735,28 @@ public static PickResult withNoResult() { } /** - * No decision could be made. The RPC will stay buffered with a specific reason. + * No decision could be made. The RPC will stay buffered with a specific delay type and reason. * + * @param delayType low-cardinality root cause label (e.g., "connecting") + * @param delayReason high-cardinality diagnostic string for trace events * @since 1.82.0 */ - public static PickResult withNoResult(String delayReasonToken) { - Preconditions.checkNotNull(delayReasonToken, "delayReasonToken"); - return new PickResult(null, null, Status.OK, false, null, delayReasonToken); + public static PickResult withNoResult(String delayType, String delayReason) { + Preconditions.checkNotNull(delayType, "delayType"); + Preconditions.checkNotNull(delayReason, "delayReason"); + return new PickResult(null, null, Status.OK, false, null, delayType, delayReason); } - /** Returns the delay reason token if any. */ + /** Returns the delay type label if any. */ @Nullable - public String getDelayReasonToken() { - return delayReasonToken; + public String getDelayType() { + return delayType; + } + + /** Returns the diagnostic delay reason if any. */ + @Nullable + public String getDelayReason() { + return delayReason; } /** Returns the authority override if any. */ diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 6032147d0b1..5c4fd4b2f95 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -158,8 +158,9 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - String token = pickResult != null ? pickResult.getDelayReasonToken() : null; - return createPendingStream(args, tracers, pickResult, token); + String delayType = determineQueuingDelayType(pickResult, callOptions.isWaitForReady()); + String delayReason = determineQueuingDelayReason(pickResult, callOptions.isWaitForReady()); + return createPendingStream(args, tracers, pickResult, delayType, delayReason); } state = newerState; } @@ -175,8 +176,8 @@ public final ClientStream newStream( */ @GuardedBy("lock") private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, - PickResult pickResult, @Nullable String delayReasonToken) { - PendingStream pendingStream = new PendingStream(args, tracers, delayReasonToken); + PickResult pickResult, @Nullable String delayType, @Nullable String delayReason) { + PendingStream pendingStream = new PendingStream(args, tracers, delayType, delayReason); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { pendingStream.lastPickStatus = pickResult.getStatus(); } @@ -319,7 +320,11 @@ final void reprocess(@Nullable SubchannelPicker picker) { } toRemove.add(stream); } else { // stay pending - stream.updateDelayReason(pickResult.getDelayReasonToken()); + String delayType = determineQueuingDelayType( + pickResult, stream.args.getCallOptions().isWaitForReady()); + String delayReason = determineQueuingDelayReason( + pickResult, stream.args.getCallOptions().isWaitForReady()); + stream.updateDelay(delayType, delayReason); } } @@ -361,48 +366,97 @@ public InternalLogId getLogId() { return logId; } + private static String determineQueuingDelayType( + @Nullable PickResult pickResult, boolean isWaitForReady) { + if (pickResult == null) { + return "client_channel_init"; + } + if (pickResult.getSubchannel() != null) { + return "subchannel_state_mismatch"; + } + if (!pickResult.getStatus().isOk()) { + return "wait_for_ready_failed"; + } + if (pickResult.getDelayType() != null) { + return pickResult.getDelayType(); + } + return "client_channel_init"; + } + + private static String determineQueuingDelayReason( + @Nullable PickResult pickResult, boolean isWaitForReady) { + if (pickResult == null) { + return "client channel: created LB policy."; + } + if (pickResult.getSubchannel() != null) { + return "subchannel returned by LB picker has no connected subchannel"; + } + if (!pickResult.getStatus().isOk()) { + return "wait_for_ready RPC failed with status: " + pickResult.getStatus(); + } + if (pickResult.getDelayReason() != null) { + return pickResult.getDelayReason(); + } + return "client channel: waiting for picker"; + } + private class PendingStream extends DelayedStream { private final PickSubchannelArgs args; private final Context context = Context.current(); private final ClientStreamTracer[] tracers; private volatile Status lastPickStatus; - @Nullable private String delayReasonToken; + @Nullable private String activeDelayType; + @Nullable private String activeDelayReason; private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, - @Nullable String initialToken) { + @Nullable String initialType, @Nullable String initialReason) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; - this.delayReasonToken = initialToken; - if (initialToken != null) { + this.activeDelayType = initialType; + this.activeDelayReason = initialReason; + if (initialType != null) { for (ClientStreamTracer tracer : tracers) { - tracer.delayStarted(initialToken); + tracer.delayTypeStarted(initialType); + } + } + if (initialReason != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayReasonAttached(initialReason); } } } - void updateDelayReason(String newToken) { - if (!Objects.equals(delayReasonToken, newToken)) { - if (delayReasonToken != null) { + void updateDelay(@Nullable String newType, @Nullable String newReason) { + if (!Objects.equals(activeDelayType, newType)) { + if (activeDelayType != null) { for (ClientStreamTracer tracer : tracers) { tracer.delayEnded(); } } - delayReasonToken = newToken; - if (newToken != null) { + activeDelayType = newType; + activeDelayReason = null; + if (newType != null) { for (ClientStreamTracer tracer : tracers) { - tracer.delayStarted(newToken); + tracer.delayTypeStarted(newType); } } } + if (newType != null && newReason != null && !Objects.equals(activeDelayReason, newReason)) { + activeDelayReason = newReason; + for (ClientStreamTracer tracer : tracers) { + tracer.delayReasonAttached(newReason); + } + } } void endDelay() { - if (delayReasonToken != null) { + if (activeDelayType != null) { for (ClientStreamTracer tracer : tracers) { tracer.delayEnded(); } - delayReasonToken = null; + activeDelayType = null; + activeDelayReason = null; } } diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index e4c2b3b9933..2553f1ea6ec 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -40,8 +40,13 @@ public void createPendingStream() { } @Override - public void delayStarted(String reasonToken) { - delegate().delayStarted(reasonToken); + public void delayTypeStarted(String delayType) { + delegate().delayTypeStarted(delayType); + } + + @Override + public void delayReasonAttached(String delayReason) { + delegate().delayReasonAttached(delayReason); } @Override diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index 4111700fffe..7b12b1254ce 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,8 +38,8 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { - private static final PickResult CONNECTING_RESULT = - PickResult.withNoResult("pick_first:connecting"); + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("connecting", "pick_first: attempting to connect"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index aebf0bf5b96..2d0fc729c9a 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -779,22 +779,24 @@ public void streamDelayMetrics() { SubchannelPicker connectingPicker = mock(SubchannelPicker.class); when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withNoResult("pick_first:connecting")); + .thenReturn(PickResult.withNoResult("connecting", "pick_first: attempting to connect")); delayedTransport.reprocess(connectingPicker); delayedTransport.newStream(method, headers, callOptions, customTracers); InOrder inOrder = inOrder(mockTracer); - inOrder.verify(mockTracer).delayStarted("pick_first:connecting"); + inOrder.verify(mockTracer).delayTypeStarted("connecting"); + inOrder.verify(mockTracer).delayReasonAttached("pick_first: attempting to connect"); SubchannelPicker customDelayPicker = mock(SubchannelPicker.class); when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withNoResult("rls:lookup_pending")); + .thenReturn(PickResult.withNoResult("rls_lookup_pending", "RLS request pending.")); delayedTransport.reprocess(customDelayPicker); inOrder.verify(mockTracer).delayEnded(); - inOrder.verify(mockTracer).delayStarted("rls:lookup_pending"); + inOrder.verify(mockTracer).delayTypeStarted("rls_lookup_pending"); + inOrder.verify(mockTracer).delayReasonAttached("RLS request pending."); delayedTransport.reprocess(mockPicker); @@ -808,13 +810,14 @@ public void streamDelayMetrics_cancelled() { SubchannelPicker connectingPicker = mock(SubchannelPicker.class); when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withNoResult("pick_first:connecting")); + .thenReturn(PickResult.withNoResult("connecting", "pick_first: attempting to connect")); delayedTransport.reprocess(connectingPicker); ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); stream.start(streamListener); - verify(mockTracer).delayStarted("pick_first:connecting"); + verify(mockTracer).delayTypeStarted("connecting"); + verify(mockTracer).delayReasonAttached("pick_first: attempting to connect"); stream.cancel(Status.CANCELLED); @@ -828,13 +831,14 @@ public void streamDelayMetrics_shutdownNow() { SubchannelPicker connectingPicker = mock(SubchannelPicker.class); when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withNoResult("pick_first:connecting")); + .thenReturn(PickResult.withNoResult("connecting", "pick_first: attempting to connect")); delayedTransport.reprocess(connectingPicker); ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); stream.start(streamListener); - verify(mockTracer).delayStarted("pick_first:connecting"); + verify(mockTracer).delayTypeStarted("connecting"); + verify(mockTracer).delayReasonAttached("pick_first: attempting to connect"); delayedTransport.shutdownNow(Status.UNAVAILABLE); diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 5bfabd7ea0e..1c3182237d3 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -148,7 +148,8 @@ public void pickAfterResolved() throws Exception { // Calling pickSubchannel() twice gave the same result PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs); - assertThat(result.getDelayReasonToken()).isEqualTo("pick_first:connecting"); + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("pick_first: attempting to connect"); assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index 2748a2679f1..92bf052ba51 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -1050,7 +1050,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); } else { - return PickResult.withNoResult("rls:lookup_pending"); + return PickResult.withNoResult("rls_lookup_pending", "RLS request pending."); } } diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index d5d94c4dd6e..e7925ff8986 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -262,7 +262,8 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); - assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); + assertThat(res.getDelayType()).isEqualTo("rls_lookup_pending"); + assertThat(res.getDelayReason()).isEqualTo("RLS request pending."); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); @@ -494,7 +495,8 @@ public void lb_working_withoutDefaultTarget() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); - assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); + assertThat(res.getDelayType()).isEqualTo("rls_lookup_pending"); + assertThat(res.getDelayReason()).isEqualTo("RLS request pending."); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); diff --git a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 1bf24b12a19..5f58b4321ae 100644 --- a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -39,8 +39,13 @@ public void createPendingStream() { } @Override - public void delayStarted(String reasonToken) { - delegate().delayStarted(reasonToken); + public void delayTypeStarted(String delayType) { + delegate().delayTypeStarted(delayType); + } + + @Override + public void delayReasonAttached(String delayReason) { + delegate().delayReasonAttached(delayReason); } @Override diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index ab0b2c49c21..8adce59407e 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,8 +41,8 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { - private static final PickResult CONNECTING_RESULT = - PickResult.withNoResult("round_robin:connecting"); + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("connecting", "round_robin: attempting to connect"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 895cf9b4251..b3c1454ead5 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -86,7 +86,8 @@ public class RoundRobinLoadBalancerTest { private static final Attributes.Key MAJOR_KEY = Attributes.Key.create("major-key"); private static final SubchannelPicker EMPTY_PICKER = - new FixedResultPicker(PickResult.withNoResult("round_robin:connecting")); + new FixedResultPicker( + PickResult.withNoResult("connecting", "round_robin: attempting to connect")); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 8be155ec0f8..f03b29651e0 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -121,7 +121,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { } // The dynamic cluster must not have loaded yet helper.updateBalancingState( - CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult( + "cds_dynamic_discovery", "cds: fetching xDS cluster metadata"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index ab84c2b96e5..dde9bc9011c 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -374,9 +374,10 @@ private static final class PriorityPicker extends SubchannelPicker { @Override public PickResult pickSubchannel(PickSubchannelArgs args) { PickResult childResult = delegate.pickSubchannel(args); - if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { - return PickResult.withNoResult( - "priority_" + priority + ":" + childResult.getDelayReasonToken()); + if (!childResult.hasResult() && childResult.getDelayType() != null) { + String childReason = childResult.getDelayReason(); + String reason = "priority_" + priority + ":" + (childReason != null ? childReason : ""); + return PickResult.withNoResult(childResult.getDelayType(), reason); } return childResult; } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 15cd5dba621..eb8ba235d82 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -356,8 +356,8 @@ public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { } private static final class RingHashPicker extends SubchannelPicker { - private static final PickResult RING_HASH_CONNECTING_RESULT = - PickResult.withNoResult("ring_hash:connecting"); + private static final PickResult RING_HASH_CONNECTING_RESULT = + PickResult.withNoResult("connecting", "ring_hash: waiting for connection"); private final SynchronizationContext syncContext; private final List ring; // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 51e0d08f223..83a169ba2c7 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -368,7 +368,8 @@ public XdsConfig.Subscription subscribeToCluster(String clusterName) { verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getDelayReasonToken()).isEqualTo("cds:discovery_pending"); + assertThat(result.getDelayType()).isEqualTo("cds_dynamic_discovery"); + assertThat(result.getDelayReason()).isEqualTo("cds: fetching xDS cluster metadata"); } @Test diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 6f0db55a8a7..b4be288c6f0 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -928,7 +928,7 @@ public void priorityPicker_prependsToken() throws Exception { SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withNoResult("child_token")); + .thenReturn(PickResult.withNoResult("connecting", "child_reason")); helper0.updateBalancingState(CONNECTING, mockChildPicker); @@ -938,7 +938,8 @@ public void priorityPicker_prependsToken() throws Exception { SubchannelPicker priorityPicker = pickerCaptor.getValue(); PickResult result = priorityPicker.pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getDelayReasonToken()).isEqualTo("priority_p0:child_token"); + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("priority_p0:child_reason"); } private void assertLatestConnectivityState(ConnectivityState expectedState) { diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 387bc525043..da11df24af4 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -160,7 +160,8 @@ public void subchannelLazyConnectUntilPicked() { PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); - assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("ring_hash: waiting for connection"); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() && !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; @@ -525,7 +526,8 @@ public void pickWithRandomHash_atLeastOneSubchannelConnecting() { PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request - assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("ring_hash: waiting for connection"); verifyConnection(0); } @@ -548,7 +550,8 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request - assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("ring_hash: waiting for connection"); verifyConnection(1); } From 5e56f3883137fac6daf36b2d19f9057d850071a9 Mon Sep 17 00:00:00 2001 From: agrawalabhi Date: Fri, 19 Jun 2026 11:06:13 +0000 Subject: [PATCH 6/7] core: Add 100% test coverage for dual LB delay APIs and cadence rules --- .../internal/DelayedClientTransportTest.java | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 2d0fc729c9a..72096a43b5e 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -845,6 +845,83 @@ public void streamDelayMetrics_shutdownNow() { verify(mockTracer).delayEnded(); } + @Test + public void streamDelayMetrics_cadenceReasonUpdate_doesNotStartNewTypeSegment() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker picker1 = mock(SubchannelPicker.class); + when(picker1.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("connecting", "attempt 1")); + + delayedTransport.reprocess(picker1); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + verify(mockTracer, times(1)).delayTypeStarted("connecting"); + verify(mockTracer).delayReasonAttached("attempt 1"); + + SubchannelPicker picker2 = mock(SubchannelPicker.class); + when(picker2.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("connecting", "attempt 2")); + + delayedTransport.reprocess(picker2); + + verify(mockTracer, times(1)).delayTypeStarted("connecting"); + verify(mockTracer).delayReasonAttached("attempt 2"); + verify(mockTracer, never()).delayEnded(); + } + + @Test + public void streamDelayMetrics_channelFallback_clientChannelInit() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + // No picker reprocessed yet (lastPicker == null) + delayedTransport.newStream(method, headers, callOptions, customTracers); + + verify(mockTracer).delayTypeStarted("client_channel_init"); + verify(mockTracer).delayReasonAttached("client channel: created LB policy."); + } + + @Test + public void streamDelayMetrics_channelFallback_subchannelStateMismatch() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + io.grpc.LoadBalancer.Subchannel disconnectedSubchannel = mock(io.grpc.LoadBalancer.Subchannel.class); + when(disconnectedSubchannel.getInternalSubchannel()) + .thenReturn(newTransportProvider(null)); + + SubchannelPicker stalePicker = mock(SubchannelPicker.class); + when(stalePicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withSubchannel(disconnectedSubchannel)); + + delayedTransport.reprocess(stalePicker); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + verify(mockTracer).delayTypeStarted("subchannel_state_mismatch"); + verify(mockTracer).delayReasonAttached( + "subchannel returned by LB picker has no connected subchannel"); + } + + @Test + public void streamDelayMetrics_channelFallback_waitForReadyFailed() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker failPicker = mock(SubchannelPicker.class); + when(failPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withError(Status.UNAVAILABLE)); + + delayedTransport.reprocess(failPicker); + CallOptions wfrOptions = callOptions.withWaitForReady(); + delayedTransport.newStream(method, headers, wfrOptions, customTracers); + + verify(mockTracer).delayTypeStarted("wait_for_ready_failed"); + verify(mockTracer).delayReasonAttached( + "wait_for_ready RPC failed with status: " + Status.UNAVAILABLE); + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override From 6a3572bec7edcb3add972423ecf6abc43413e583 Mon Sep 17 00:00:00 2001 From: agrawalabhi Date: Mon, 22 Jun 2026 11:08:40 +0000 Subject: [PATCH 7/7] opentelemetry: Implement dual Load Balancer delay spans and metrics --- api/src/main/java/io/grpc/LoadBalancer.java | 4 +- .../grpc/internal/DelayedClientTransport.java | 16 ++---- .../internal/PickFirstLeafLoadBalancer.java | 13 ++++- .../grpc/internal/PickFirstLoadBalancer.java | 3 +- .../internal/DelayedClientTransportTest.java | 3 +- .../main/java/io/grpc/grpclb/GrpclbState.java | 3 +- .../grpc/opentelemetry/GrpcOpenTelemetry.java | 10 ++++ .../OpenTelemetryMetricsModule.java | 56 +++++++++++++++++++ .../OpenTelemetryMetricsResource.java | 5 ++ .../OpenTelemetryTracingModule.java | 33 +++++++++++ .../OpenTelemetryTracingModuleTest.java | 53 ++++++++++++++++++ .../grpc/util/RoundRobinLoadBalancerTest.java | 10 ++++ .../io/grpc/xds/ClusterImplLoadBalancer.java | 3 +- .../java/io/grpc/xds/LazyLoadBalancer.java | 3 +- .../io/grpc/xds/LeastRequestLoadBalancer.java | 11 +++- .../xds/WeightedRoundRobinLoadBalancer.java | 7 ++- .../grpc/xds/WeightedTargetLoadBalancer.java | 6 +- 17 files changed, 213 insertions(+), 26 deletions(-) diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 65c3eba5e2c..2b64973e169 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -697,7 +697,9 @@ public PickResult copyWithSubchannel(Subchannel subchannel) { */ public PickResult copyWithStreamTracerFactory( @Nullable ClientStreamTracer.Factory streamTracerFactory) { - return new PickResult(subchannel, streamTracerFactory, status, drop, authorityOverride, delayType, delayReason); + return new PickResult( + subchannel, streamTracerFactory, status, drop, authorityOverride, delayType, + delayReason); } /** diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 5c4fd4b2f95..e5ad2f0e912 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -158,8 +158,8 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - String delayType = determineQueuingDelayType(pickResult, callOptions.isWaitForReady()); - String delayReason = determineQueuingDelayReason(pickResult, callOptions.isWaitForReady()); + String delayType = determineQueuingDelayType(pickResult); + String delayReason = determineQueuingDelayReason(pickResult); return createPendingStream(args, tracers, pickResult, delayType, delayReason); } state = newerState; @@ -320,10 +320,8 @@ final void reprocess(@Nullable SubchannelPicker picker) { } toRemove.add(stream); } else { // stay pending - String delayType = determineQueuingDelayType( - pickResult, stream.args.getCallOptions().isWaitForReady()); - String delayReason = determineQueuingDelayReason( - pickResult, stream.args.getCallOptions().isWaitForReady()); + String delayType = determineQueuingDelayType(pickResult); + String delayReason = determineQueuingDelayReason(pickResult); stream.updateDelay(delayType, delayReason); } } @@ -366,8 +364,7 @@ public InternalLogId getLogId() { return logId; } - private static String determineQueuingDelayType( - @Nullable PickResult pickResult, boolean isWaitForReady) { + private static String determineQueuingDelayType(@Nullable PickResult pickResult) { if (pickResult == null) { return "client_channel_init"; } @@ -383,8 +380,7 @@ private static String determineQueuingDelayType( return "client_channel_init"; } - private static String determineQueuingDelayReason( - @Nullable PickResult pickResult, boolean isWaitForReady) { + private static String determineQueuingDelayReason(@Nullable PickResult pickResult) { if (pickResult == null) { return "client channel: created LB policy."; } diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index f8f5c94f5ba..d947526114c 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -167,7 +167,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (noOldAddrs) { // Make tests happy; they don't properly assume starting in CONNECTING rawConnectivityState = CONNECTING; - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "pick_first: address list updated"))); } if (rawConnectivityState == READY) { @@ -333,7 +336,10 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo case CONNECTING: rawConnectivityState = CONNECTING; - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "pick_first: attempting to connect"))); break; case READY: @@ -653,7 +659,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection); } - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "pick_first: requesting connection"); } } diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index 7b12b1254ce..562debfd862 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -180,7 +180,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { helper.getSynchronizationContext().execute(PickFirstLoadBalancer.this::requestConnection); } - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "pick_first: requesting connection"); } } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 72096a43b5e..66a0c090be0 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -888,7 +888,8 @@ public void streamDelayMetrics_channelFallback_subchannelStateMismatch() { ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; - io.grpc.LoadBalancer.Subchannel disconnectedSubchannel = mock(io.grpc.LoadBalancer.Subchannel.class); + io.grpc.LoadBalancer.Subchannel disconnectedSubchannel = + mock(io.grpc.LoadBalancer.Subchannel.class); when(disconnectedSubchannel.getInternalSubchannel()) .thenReturn(newTransportProvider(null)); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 5ed84ade2f8..024ea84f60e 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -124,7 +124,8 @@ final class GrpclbState { static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { @Override public PickResult picked(PickSubchannelArgs args) { - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "grpclb: waiting for backend server list"); } @Override diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 87ad61c9f27..b4ac28eebf3 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -232,6 +232,16 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter, .build()); } + if (isMetricEnabled("grpc.client.attempt.delay", enableMetrics, disableDefault)) { + builder.clientAttemptDelayCounter( + meter.histogramBuilder( + "grpc.client.attempt.delay") + .setUnit("s") + .setDescription("Time taken to complete a client call attempt delay") + .setExplicitBucketBoundariesAdvice(LATENCY_BUCKETS) + .build()); + } + if (isMetricEnabled("grpc.client.attempt.sent_total_compressed_message_size", enableMetrics, disableDefault)) { builder.clientTotalSentCompressedMessageSizeCounter( diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index f783b9495dd..14be5b917c9 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -203,6 +203,8 @@ private static final class ClientTracer extends ClientStreamTracer { volatile String backendService; long attemptNanos; Code statusCode; + @Nullable private volatile Stopwatch activeDelayStopwatch; + @Nullable private volatile String activeDelayType; ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module, StreamInfo info, String target, String fullMethodName, @@ -216,6 +218,59 @@ private static final class ClientTracer extends ClientStreamTracer { this.stopwatch = module.stopwatchSupplier.get().start(); } + @Override + public void streamCreated(io.grpc.Attributes transportAtts, Metadata headers) { + delayEnded(); + } + + @Override + public void delayTypeStarted(String delayType) { + delayEnded(); + activeDelayType = delayType; + activeDelayStopwatch = module.stopwatchSupplier.get().start(); + } + + @Override + public void delayEnded() { + Stopwatch delayStopwatch = activeDelayStopwatch; + String delayType = activeDelayType; + if (delayStopwatch != null && delayType != null) { + delayStopwatch.stop(); + long delayNanos = delayStopwatch.elapsed(TimeUnit.NANOSECONDS); + activeDelayStopwatch = null; + activeDelayType = null; + if (module.resource.clientAttemptDelayCounter() != null) { + AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, fullMethodName) + .put(TARGET_KEY, target) + .put("grpc.delay_type", delayType); + if (module.localityEnabled) { + String savedLocality = locality; + if (savedLocality == null) { + savedLocality = ""; + } + builder.put(LOCALITY_KEY, savedLocality); + } + if (module.backendServiceEnabled) { + String savedBackendService = backendService; + if (savedBackendService == null) { + savedBackendService = ""; + } + builder.put(BACKEND_SERVICE_KEY, savedBackendService); + } + if (module.customLabelEnabled) { + builder.put( + CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL)); + } + for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { + plugin.addLabels(builder); + } + module.resource.clientAttemptDelayCounter() + .record(delayNanos * SECONDS_PER_NANO, builder.build(), attemptsState.otelContext); + } + } + } + @Override public void inboundHeaders(Metadata headers) { for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { @@ -262,6 +317,7 @@ public void inboundTrailers(Metadata trailers) { @Override public void streamClosed(Status status) { + delayEnded(); stopwatch.stop(); attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); Deadline deadline = info.getCallOptions().getDeadline(); diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java index d32ae1e67f5..085498d746e 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java @@ -35,6 +35,9 @@ abstract class OpenTelemetryMetricsResource { @Nullable abstract DoubleHistogram clientAttemptDurationCounter(); + @Nullable + abstract DoubleHistogram clientAttemptDelayCounter(); + @Nullable abstract LongHistogram clientTotalSentCompressedMessageSizeCounter(); @@ -79,6 +82,8 @@ abstract static class Builder { abstract Builder clientAttemptDurationCounter(DoubleHistogram counter); + abstract Builder clientAttemptDelayCounter(DoubleHistogram counter); + abstract Builder clientTotalSentCompressedMessageSizeCounter(LongHistogram counter); abstract Builder clientTotalReceivedCompressedMessageSizeCounter( diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index d214e99bd75..3fc9e3767f5 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -192,6 +192,7 @@ private final class ClientTracer extends ClientStreamTracer { private final Span parentSpan; volatile int seqNo; boolean isPendingStream; + @Nullable private volatile Span activeDelaySpan; ClientTracer(Span span, Span parentSpan) { this.span = checkNotNull(span, "span"); @@ -200,6 +201,7 @@ private final class ClientTracer extends ClientStreamTracer { @Override public void streamCreated(Attributes transportAtts, Metadata headers) { + delayEnded(); contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers, metadataSetter); if (isPendingStream) { @@ -212,6 +214,36 @@ public void createPendingStream() { isPendingStream = true; } + @Override + public void delayTypeStarted(String delayType) { + if (activeDelaySpan != null) { + activeDelaySpan.end(); + } + activeDelaySpan = otelTracer.spanBuilder("Attempt Delay: " + delayType) + .setParent(Context.current().with(span)) + .setAttribute("grpc.delay_type", delayType) + .startSpan(); + } + + @Override + public void delayReasonAttached(String delayReason) { + Span delaySpan = activeDelaySpan; + if (delaySpan != null) { + delaySpan.addEvent(delayReason); + } else { + span.addEvent("delay_reason: " + delayReason); + } + } + + @Override + public void delayEnded() { + Span delaySpan = activeDelaySpan; + if (delaySpan != null) { + delaySpan.end(); + activeDelaySpan = null; + } + } + @Override public void outboundMessageSent( int seqNo, long optionalWireSize, long optionalUncompressedSize) { @@ -238,6 +270,7 @@ public void inboundUncompressedSize(long bytes) { @Override public void streamClosed(io.grpc.Status status) { + delayEnded(); endSpanWithStatus(span, status); } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java index e6759aadb1e..dcbbe760ff2 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java @@ -274,6 +274,31 @@ public void clientBasicTracingMocking() { inOrder.verifyNoMoreInteractions(); } + @Test + public void clientDelayTracingMocking() { + Span mockDelaySpan = mock(Span.class); + when(mockSpanBuilder.setAttribute( + org.mockito.ArgumentMatchers.anyString(), + org.mockito.ArgumentMatchers.anyString())) + .thenReturn(mockSpanBuilder); + when(mockSpanBuilder.startSpan()).thenReturn(mockAttemptSpan, mockDelaySpan); + + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(mockClientSpan, method); + ClientStreamTracer clientStreamTracer = + callTracer.newClientStreamTracer(STREAM_INFO, new Metadata()); + + clientStreamTracer.delayTypeStarted("connecting"); + clientStreamTracer.delayReasonAttached("pick_first: attempting to connect"); + clientStreamTracer.delayEnded(); + + verify(mockTracer).spanBuilder(eq("Attempt Delay: connecting")); + verify(mockSpanBuilder).setAttribute(eq("grpc.delay_type"), eq("connecting")); + verify(mockDelaySpan).addEvent(eq("pick_first: attempting to connect")); + verify(mockDelaySpan).end(); + } + @Test public void clientBasicTracingRule() { OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( @@ -381,6 +406,34 @@ public void clientBasicTracingRule() { assertEquals(attemptSpanData.hasEnded(), true); } + @Test + public void clientDelayTracingRule() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + Span clientSpan = tracerRule.spanBuilder("test-client-span").startSpan(); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(clientSpan, method); + ClientStreamTracer clientStreamTracer = + callTracer.newClientStreamTracer(STREAM_INFO, new Metadata()); + + clientStreamTracer.delayTypeStarted("connecting"); + clientStreamTracer.delayReasonAttached("pick_first: attempting to connect"); + clientStreamTracer.delayEnded(); + clientStreamTracer.streamClosed(Status.OK); + callTracer.callEnded(Status.OK); + clientSpan.end(); + + List spans = openTelemetryRule.getSpans(); + assertEquals(3, spans.size()); + SpanData delaySpanData = spans.get(0); + + assertEquals("Attempt Delay: connecting", delaySpanData.getName()); + assertEquals("connecting", delaySpanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("grpc.delay_type"))); + assertEquals(1, delaySpanData.getEvents().size()); + assertEquals("pick_first: attempting to connect", delaySpanData.getEvents().get(0).getName()); + } + @Test public void clientInterceptor() { testClientInterceptors(false); diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index b3c1454ead5..d9b8b4a85f2 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -577,6 +577,16 @@ private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo testHelperInst.deliverSubchannelState(subchannel, newState); } + @Test + public void roundRobin_delayAttributes() { + acceptAddresses(servers, affinity); + verify(mockHelper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + PickResult res = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(res.getDelayType()).isEqualTo("connecting"); + assertThat(res.getDelayReason()).contains("attempting to connect"); + } + private static class FakeSocketAddress extends SocketAddress { final String name; diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 64105144240..3a0eaf2638e 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -200,7 +200,8 @@ public void shutdown() { private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper { private final AtomicLong inFlights; private ConnectivityState currentState = ConnectivityState.IDLE; - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "cluster_impl: initializing")); private List dropPolicies = Collections.emptyList(); private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; @Nullable diff --git a/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java index b5f09c4ea93..8dbf021775b 100644 --- a/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java @@ -107,7 +107,8 @@ private final class LazyPicker extends SubchannelPicker { public PickResult pickSubchannel(PickSubchannelArgs args) { // activate() is a no-op after shutdown() helper.getSynchronizationContext().execute(LazyDelegate.this::activate); - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "lazy: waiting for connection"); } } } diff --git a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java index 1f23f2a4af5..e71d1dbe403 100644 --- a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java @@ -54,7 +54,8 @@ final class LeastRequestLoadBalancer extends MultiChildLoadBalancer { private final ThreadSafeRandom random; - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "least_request: initializing")); private int choiceCount = DEFAULT_CHOICE_COUNT; LeastRequestLoadBalancer(Helper helper) { @@ -113,7 +114,10 @@ protected void updateOverallBalancingState() { } } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "least_request: connecting"))); } else { // Give it all the failing children and let it randomly pick among them updateBalancingState(TRANSIENT_FAILURE, @@ -246,7 +250,8 @@ public boolean equals(Object o) { static final class EmptyPicker extends SubchannelPicker { @Override public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "least_request: waiting for subchannel"); } @Override diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index 6744903de35..5a60a91ff31 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -107,7 +107,8 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer { private final Ticker ticker; private String locality = ""; private String backendService = ""; - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_round_robin: initializing")); // The metric instruments are only registered once and shared by all instances of this LB. static { @@ -227,7 +228,9 @@ protected void updateOverallBalancingState() { if (isConnecting) { updateBalancingState( - ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + ConnectivityState.CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_round_robin: connecting"))); } else { updateBalancingState( ConnectivityState.TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); diff --git a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java index 9468a9daf9d..1901e4ed0d5 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java @@ -165,7 +165,8 @@ private void updateOverallBalancingState() { if (overallState == TRANSIENT_FAILURE) { picker = new WeightedRandomPicker(errorPickers); } else { - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_target: connecting")); } } else { picker = new WeightedRandomPicker(childPickers); @@ -197,7 +198,8 @@ private static ConnectivityState aggregateState( private final class ChildHelper extends ForwardingLoadBalancerHelper { String name; ConnectivityState currentState = CONNECTING; - SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_target: initializing")); private ChildHelper(String name) { this.name = name;