diff --git a/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java b/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java index c345fb35d0a..f26caedc1b3 100644 --- a/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java +++ b/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java @@ -74,7 +74,7 @@ public void configureServerBuilder(ServerBuilder serverBuilder) { } @VisibleForTesting - void configureChannelBuilder(ManagedChannelBuilder builder) { + public void configureChannelBuilder(ManagedChannelBuilder builder) { delegate.configureChannelBuilder(builder); } @@ -115,6 +115,14 @@ public Builder sdk(OpenTelemetry sdk) { return this; } + /** + * Enables or disables tracing. + */ + public Builder enableTracing(boolean enable) { + InternalGrpcOpenTelemetry.enableTracing(delegate, enable); + return this; + } + /** * Adds optionalLabelKey to all the metrics that can provide value for the * optionalLabelKey. diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 705026a3fe3..df18ce249a1 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -123,6 +123,7 @@ opencensus-exporter-trace-stackdriver = { module = "io.opencensus:opencensus-exp opencensus-impl = { module = "io.opencensus:opencensus-impl", version.ref = "opencensus" } opentelemetry-api = "io.opentelemetry:opentelemetry-api:1.60.1" opentelemetry-exporter-prometheus = "io.opentelemetry:opentelemetry-exporter-prometheus:1.60.1-alpha" +opentelemetry-exporter-otlp = "io.opentelemetry:opentelemetry-exporter-otlp:1.60.1" opentelemetry-gcp-resources = "io.opentelemetry.contrib:opentelemetry-gcp-resources:1.54.0-alpha" opentelemetry-sdk-extension-autoconfigure = "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.60.1" opentelemetry-sdk-testing = "io.opentelemetry:opentelemetry-sdk-testing:1.60.1" diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 5160759460c..b7b68482e0a 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -42,6 +42,7 @@ dependencies { libraries.netty.tcnative, libraries.netty.tcnative.classes, libraries.opentelemetry.exporter.prometheus, // For xds interop client + libraries.opentelemetry.exporter.otlp, project(':grpc-googleapis'), project(':grpc-grpclb'), project(':grpc-rls') diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 125d876b705..8b4582308ab 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -79,6 +79,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import io.grpc.gcp.csm.observability.CsmObservability; /** * Application that starts a client for the {@link TestServiceGrpc.TestServiceImplBase} and runs @@ -99,6 +100,13 @@ public class TestServiceClient { public static void main(String[] args) throws Exception { final TestServiceClient client = new TestServiceClient(); client.parseArgs(args); + if (client.enableOpentelemetryTracing) { + io.opentelemetry.api.OpenTelemetry otel = io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + io.grpc.opentelemetry.GrpcOpenTelemetry gotel = io.grpc.opentelemetry.GrpcOpenTelemetry.newBuilder() + .sdk(otel) + .build(); + gotel.registerGlobal(); + } customBackendMetricsLoadBalancerProvider = new CustomBackendMetricsLoadBalancerProvider(); LoadBalancerRegistry.getDefaultRegistry().register(customBackendMetricsLoadBalancerProvider); client.setUp(); @@ -107,6 +115,10 @@ public static void main(String[] args) throws Exception { client.run(); } finally { client.tearDown(); + if (client.enableOpentelemetryTracing) { + System.out.println("Sleeping to flush spans..."); + Thread.sleep(2000); + } } } @@ -136,6 +148,7 @@ public static void main(String[] args) throws Exception { private int soakResponseSize = 314159; private int numThreads = 1; private String additionalMetadata = ""; + private boolean enableOpentelemetryTracing = false; private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider; private Tester tester = new Tester(); @@ -167,6 +180,8 @@ void parseArgs(String[] args) throws Exception { serverHostOverride = value; } else if ("server_port".equals(key)) { serverPort = Integer.parseInt(value); + } else if ("enable_opentelemetry_tracing".equals(key)) { + enableOpentelemetryTracing = Boolean.parseBoolean(value); } else if ("test_case".equals(key)) { testCase = value; } else if ("num_times".equals(key)) { @@ -599,6 +614,9 @@ private class Tester extends AbstractInteropTest { @Override protected ManagedChannelBuilder createChannelBuilder() { boolean useGeneric = false; + if (enableOpentelemetryTracing) { + useGeneric = true; + } ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index fc4cdf9178f..9431790ee74 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -17,6 +17,7 @@ package io.grpc.testing.integration; import com.google.common.annotations.VisibleForTesting; +import io.grpc.gcp.csm.observability.CsmObservability; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.BindableService; import io.grpc.Grpc; @@ -46,6 +47,13 @@ public class TestServiceServer { public static void main(String[] args) throws Exception { final TestServiceServer server = new TestServiceServer(); server.parseArgs(args); + if (server.enableOpentelemetryTracing) { + io.opentelemetry.api.OpenTelemetry otel = io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + io.grpc.opentelemetry.GrpcOpenTelemetry gotel = io.grpc.opentelemetry.GrpcOpenTelemetry.newBuilder() + .sdk(otel) + .build(); + gotel.registerGlobal(); + } if (server.useTls) { System.out.println( "\nUsing fake CA for TLS certificate. Test clients should expect host\n" @@ -75,6 +83,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; + private boolean enableOpentelemetryTracing = false; private ScheduledExecutorService executor; private Server server; @@ -106,6 +115,8 @@ void parseArgs(String[] args) { port = Integer.parseInt(value); } else if ("use_tls".equals(key)) { useTls = Boolean.parseBoolean(value); + } else if ("enable_opentelemetry_tracing".equals(key)) { + enableOpentelemetryTracing = Boolean.parseBoolean(value); } else if ("use_alts".equals(key)) { useAlts = Boolean.parseBoolean(value); } else if ("local_handshaker_port".equals(key)) { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java index 89519041a79..26a7b187064 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java @@ -39,6 +39,7 @@ import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureServerCredentials; import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Server; @@ -60,6 +61,7 @@ import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.xds.XdsChannelCredentials; +import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import java.util.ArrayList; import java.util.Collections; @@ -104,6 +106,7 @@ public final class XdsTestClient { private long currentRequestId; private ListeningScheduledExecutorService exec; private CsmObservability csmObservability; + private OpenTelemetrySdk openTelemetrySdk; /** * The main application allowing this client to be launched from the command line. @@ -265,14 +268,23 @@ private static RpcType parseRpc(String rpc) { @IgnoreJRERequirement // OpenTelemetry uses Java 8+ APIs private void run() { if (enableCsmObservability) { + Map props = new HashMap<>(); + props.put("otel.logs.exporter", "none"); + props.put("otel.metrics.exporter", "otlp"); + String tracesExporter = System.getenv("OTEL_TRACES_EXPORTER"); + if (tracesExporter != null) { + props.put("otel.traces.exporter", tracesExporter); + } else { + props.put("otel.traces.exporter", "none"); + } + + AutoConfiguredOpenTelemetrySdk autoSdk = AutoConfiguredOpenTelemetrySdk.builder() + .addPropertiesSupplier(() -> props) + .build(); + openTelemetrySdk = autoSdk.getOpenTelemetrySdk(); csmObservability = CsmObservability.newBuilder() - .sdk(AutoConfiguredOpenTelemetrySdk.builder() - .addPropertiesSupplier(() -> ImmutableMap.of( - "otel.logs.exporter", "none", - "otel.metrics.exporter", "prometheus", - "otel.traces.exporter", "none")) - .build() - .getOpenTelemetrySdk()) + .sdk(openTelemetrySdk) + .enableTracing(!"none".equals(props.get("otel.traces.exporter"))) .build(); csmObservability.registerGlobal(); } @@ -289,14 +301,16 @@ private void run() { try { statsServer.start(); for (int i = 0; i < numChannels; i++) { - channels.add( - Grpc.newChannelBuilder( + ManagedChannelBuilder builder = Grpc.newChannelBuilder( server, secureMode ? XdsChannelCredentials.create(InsecureChannelCredentials.create()) : InsecureChannelCredentials.create()) - .enableRetry() - .build()); + .enableRetry(); + if (enableCsmObservability) { + csmObservability.configureChannelBuilder(builder); + } + channels.add(builder.build()); } exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor()); Payload requestPayload = Payload.newBuilder() @@ -325,6 +339,9 @@ private void stop() throws InterruptedException { if (csmObservability != null) { csmObservability.close(); } + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } } @@ -373,6 +390,13 @@ public void start(Listener responseListener, Metadata headers) { @Override public void onHeaders(Metadata headers) { hostnameRef.set(headers.get(XdsTestServer.HOSTNAME_KEY)); + io.opentelemetry.api.trace.Span currentSpan = io.opentelemetry.api.trace.Span.current(); + for (String key : config.metadata.keys()) { + String value = config.metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + if (value != null) { + currentSpan.setAttribute("custom.metadata." + key, value); + } + } super.onHeaders(headers); } }, @@ -406,44 +430,56 @@ public void onNext(EmptyProtos.Empty response) {} .setPayload(requestPayload) .setResponseSize(responseSize) .build(); - stub.unaryCall( - request, - new StreamObserver() { - @Override - public void onCompleted() { - handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers); - } - @Override - public void onError(Throwable t) { - if (printResponse) { - logger.log(Level.WARNING, "Rpc failed", t); + io.opentelemetry.api.baggage.BaggageBuilder baggageBuilder = io.opentelemetry.api.baggage.Baggage.builder(); + for (String key : config.metadata.keys()) { + String value = config.metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + if (value != null) { + baggageBuilder.put(key, value); + } + } + io.opentelemetry.api.baggage.Baggage baggage = baggageBuilder.build(); + + try (io.opentelemetry.context.Scope scope = io.opentelemetry.context.Context.current().with(baggage).makeCurrent()) { + stub.unaryCall( + request, + new StreamObserver() { + @Override + public void onCompleted() { + handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers); } - handleRpcError(requestId, config.rpcType, Status.fromThrowable(t), - savedWatchers); - } - @Override - public void onNext(SimpleResponse response) { - // TODO(ericgribkoff) Currently some test environments cannot access the stats RPC - // service and rely on parsing stdout. - if (printResponse) { - System.out.println( - "Greeting: Hello world, this is " - + response.getHostname() - + ", from " - + clientCallRef - .get() - .getAttributes() - .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + @Override + public void onError(Throwable t) { + if (printResponse) { + logger.log(Level.WARNING, "Rpc failed", t); + } + handleRpcError(requestId, config.rpcType, Status.fromThrowable(t), + savedWatchers); } - // Use the hostname from the response if not present in the metadata. - // TODO(ericgribkoff) Delete when server is deployed that sets metadata value. - if (hostnameRef.get() == null) { - hostnameRef.set(response.getHostname()); + + @Override + public void onNext(SimpleResponse response) { + // TODO(ericgribkoff) Currently some test environments cannot access the stats RPC + // service and rely on parsing stdout. + if (printResponse) { + System.out.println( + "Greeting: Hello world, this is " + + response.getHostname() + + ", from " + + clientCallRef + .get() + .getAttributes() + .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + } + // Use the hostname from the response if not present in the metadata. + // TODO(ericgribkoff) Delete when server is deployed that sets metadata value. + if (hostnameRef.get() == null) { + hostnameRef.set(response.getHostname()); + } } - } - }); + }); + } } else { throw new AssertionError("Unknown RPC type: " + config.rpcType); } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java index 88f1bf468b6..5b48e59f8c2 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java @@ -46,13 +46,16 @@ import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.xds.XdsServerBuilder; import io.grpc.xds.XdsServerCredentials; +import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Locale; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -92,6 +95,7 @@ public final class XdsTestServer { private String host; private Util.AddressType addressType = Util.AddressType.IPV4_IPV6; private CsmObservability csmObservability; + private OpenTelemetrySdk openTelemetrySdk; /** * The main application allowing this client to be launched from the command line. @@ -197,14 +201,23 @@ void parseArgs(String[] args) { @IgnoreJRERequirement // OpenTelemetry uses Java 8+ APIs void start() throws Exception { if (enableCsmObservability) { + Map props = new HashMap<>(); + props.put("otel.logs.exporter", "none"); + props.put("otel.metrics.exporter", "otlp"); + String tracesExporter = System.getenv("OTEL_TRACES_EXPORTER"); + if (tracesExporter != null) { + props.put("otel.traces.exporter", tracesExporter); + } else { + props.put("otel.traces.exporter", "none"); + } + + AutoConfiguredOpenTelemetrySdk autoSdk = AutoConfiguredOpenTelemetrySdk.builder() + .addPropertiesSupplier(() -> props) + .build(); + openTelemetrySdk = autoSdk.getOpenTelemetrySdk(); csmObservability = CsmObservability.newBuilder() - .sdk(AutoConfiguredOpenTelemetrySdk.builder() - .addPropertiesSupplier(() -> ImmutableMap.of( - "otel.logs.exporter", "none", - "otel.metrics.exporter", "prometheus", - "otel.traces.exporter", "none")) - .build() - .getOpenTelemetrySdk()) + .sdk(openTelemetrySdk) + .enableTracing(!"none".equals(props.get("otel.traces.exporter"))) .build(); csmObservability.registerGlobal(); } @@ -301,6 +314,9 @@ void stop() throws Exception { if (csmObservability != null) { csmObservability.close(); } + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } } private void blockUntilShutdown() throws InterruptedException { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java new file mode 100644 index 00000000000..8232f7a00c0 --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java @@ -0,0 +1,139 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static org.junit.Assert.assertTrue; + +import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.opentelemetry.GrpcOpenTelemetry; +import io.grpc.opentelemetry.InternalGrpcOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OpenTelemetryTracingInteropPocTest extends AbstractInteropTest { + + private TestSpanExporter spanExporter; + private OpenTelemetrySdk openTelemetrySdk; + private GrpcOpenTelemetry grpcOpenTelemetry; + + private static class TestSpanExporter implements SpanExporter { + private final List spans = new ArrayList<>(); + + @Override + public CompletableResultCode export(Collection spans) { + this.spans.addAll(spans); + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } + + public List getSpans() { + return spans; + } + } + + @Before + @Override + public void setUp() { + spanExporter = new TestSpanExporter(); + openTelemetrySdk = OpenTelemetrySdk.builder() + .setTracerProvider(SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build()) + .build(); + + GrpcOpenTelemetry.Builder grpcOpentelemetryBuilder = GrpcOpenTelemetry.newBuilder() + .sdk(openTelemetrySdk); + InternalGrpcOpenTelemetry.enableTracing(grpcOpentelemetryBuilder, true); + grpcOpenTelemetry = grpcOpentelemetryBuilder.build(); + + super.setUp(); + } + + @After + @Override + public void tearDown() { + super.tearDown(); + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } + } + + @Override + protected ServerBuilder getServerBuilder() { + NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create()) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + grpcOpenTelemetry.configureServerBuilder(builder); + return builder; + } + + @Override + protected ManagedChannelBuilder createChannelBuilder() { + NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) + .usePlaintext(); + grpcOpenTelemetry.configureChannelBuilder(builder); + return builder; + } + + @Override + protected boolean metricsExpected() { + return false; + } + + @Test + public void verifySpansGenerated() throws Exception { + blockingStub.emptyCall(io.grpc.testing.integration.EmptyProtos.Empty.getDefaultInstance()); + + // Wait a bit for spans to be exported (SimpleSpanProcessor is synchronous, so they should be there) + Thread.sleep(500); + + List spans = spanExporter.getSpans(); + System.out.println("Captured spans: " + spans.size()); + for (SpanData span : spans) { + System.out.println("Span: " + span.getName()); + } + + assertTrue("Expected at least one span", spans.size() > 0); + } +}