diff --git a/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java b/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java index 34446efd..9309929e 100644 --- a/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java +++ b/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java @@ -40,7 +40,7 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure protected abstract StreamsOuterClass.ReadReq.Options.Builder createOptions(); public CompletableFuture execute() { - return this.client.run(channel -> { + return this.client.runWithArgs(args -> { CompletableFuture future = new CompletableFuture<>(); StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder() @@ -48,12 +48,13 @@ public CompletableFuture execute() { .build(); StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub( - StreamsGrpc.newStub(channel), + StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options ); - ReadResponseObserver observer = createObserver(channel, future); + ReadResponseObserver observer = createObserver(args.getChannel(), future); + observer.onConnected(args); streamsClient.read(readReq, observer); return future; diff --git a/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java b/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java index 22a36804..93241510 100644 --- a/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java +++ b/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java @@ -9,10 +9,13 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; abstract class AbstractSubscribePersistentSubscription { + private final static Logger logger = LoggerFactory.getLogger(AbstractSubscribePersistentSubscription.class); protected static final Persistent.ReadReq.Options.Builder defaultReadOptions; private final GrpcClient client; private final String group; @@ -119,11 +122,20 @@ public void onError(Throwable throwable) { return; } - String leaderHost = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); - String leaderPort = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); - - if (leaderHost != null && leaderPort != null) { - error = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); + Metadata trailers = sre.getTrailers(); + if (trailers != null) { + String leaderHost = trailers.get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); + String leaderPort = trailers.get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); + + if (leaderHost != null && leaderPort != null) { + try { + int port = Integer.parseInt(leaderPort); + args.reportNewLeader(leaderHost, port); + error = new NotLeaderException(leaderHost, port); + } catch (RuntimeException ex) { + logger.warn("failed to handle leader change notification", ex); + } + } } } diff --git a/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java b/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java index 468929db..328c8967 100644 --- a/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java +++ b/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java @@ -172,9 +172,14 @@ public void onError(Throwable t) { String leaderPort = trailers.get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); if (leaderHost != null && leaderPort != null) { - int port = Integer.parseInt(leaderPort); - this.args.reportNewLeader(leaderHost, port); - t = new NotLeaderException(leaderHost, port); + try { + int port = Integer.parseInt(leaderPort); + if (this.args != null) + this.args.reportNewLeader(leaderHost, port); + t = new NotLeaderException(leaderHost, port); + } catch (RuntimeException ex) { + logger.warn("failed to handle leader change notification", ex); + } } } } diff --git a/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java b/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java new file mode 100644 index 00000000..0a37246a --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java @@ -0,0 +1,88 @@ +package io.kurrent.dbclient; + +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A NotLeader redirect on a subscription must not throw and must still deliver onCancelled. + * The bug: onError called reportNewLeader(...) on a null args, throwing before + * consumer.onCancelled(...) and swallowing the failure. + */ +public class LeaderRedirectUnitTest { + @Test + public void onErrorLeaderRedirectStillInvokesOnCancelled() { + AtomicBoolean cancelledFired = new AtomicBoolean(false); + AtomicReference cancelErr = new AtomicReference<>(); + + SubscriptionStreamConsumer consumer = new SubscriptionStreamConsumer( + new SubscriptionListener() { + @Override + public void onCancelled(Subscription subscription, Throwable exception) { + cancelledFired.set(true); + cancelErr.set(exception); + } + }, + null, + new CompletableFuture<>(), + (id, ev, action) -> action.run() + ); + + // no onConnected(): args stays null + ReadResponseObserver observer = new ReadResponseObserver(SubscribeToStreamOptions.get(), consumer); + + Metadata trailers = new Metadata(); + trailers.put(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER), "127.0.0.1"); + trailers.put(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER), "2113"); + StatusRuntimeException leaderRedirect = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + + Assertions.assertDoesNotThrow(() -> observer.onError(leaderRedirect)); + Assertions.assertTrue(cancelledFired.get(), "onCancelled must fire on a leader redirect"); + Assertions.assertTrue(cancelErr.get() instanceof NotLeaderException, + "listener should receive a NotLeaderException, got: " + cancelErr.get()); + } + + @Test + public void onErrorLeaderRedirectReportsNewLeaderWhenConnected() { + AtomicReference cancelErr = new AtomicReference<>(); + + SubscriptionStreamConsumer consumer = new SubscriptionStreamConsumer( + new SubscriptionListener() { + @Override + public void onCancelled(Subscription subscription, Throwable exception) { + cancelErr.set(exception); + } + }, + null, + new CompletableFuture<>(), + (id, ev, action) -> action.run() + ); + + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + WorkItemArgs args = new WorkItemArgs(UUID.randomUUID(), null, null, null, queue); + + ReadResponseObserver observer = new ReadResponseObserver(SubscribeToStreamOptions.get(), consumer); + observer.onConnected(args); + + Metadata trailers = new Metadata(); + trailers.put(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER), "127.0.0.1"); + trailers.put(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER), "2113"); + + Assertions.assertDoesNotThrow(() -> + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE, trailers))); + + Msg enqueued = queue.poll(); + Assertions.assertTrue(enqueued instanceof CreateChannel, + "leader redirect should enqueue a CreateChannel for the new leader, got: " + enqueued); + Assertions.assertTrue(cancelErr.get() instanceof NotLeaderException, + "listener should receive a NotLeaderException, got: " + cancelErr.get()); + } +} diff --git a/src/test/java/io/kurrent/dbclient/MiscTests.java b/src/test/java/io/kurrent/dbclient/MiscTests.java index 27566c09..658a2781 100644 --- a/src/test/java/io/kurrent/dbclient/MiscTests.java +++ b/src/test/java/io/kurrent/dbclient/MiscTests.java @@ -6,5 +6,5 @@ @Suite @SelectPackages("io.kurrent.dbclient.misc") -@SelectClasses(SubscriptionStreamConsumerTests.class) +@SelectClasses({SubscriptionStreamConsumerTests.class, LeaderRedirectUnitTest.class}) public class MiscTests {}