From cda206c3225b6d1aa6d6277c147cd2746076c9dc Mon Sep 17 00:00:00 2001 From: w1am <33353798+w1am@users.noreply.github.com> Date: Thu, 25 Jun 2026 06:59:55 +0000 Subject: [PATCH 1/2] fix: invoke onCancelled on leader change for catch-up subscriptions --- .../dbclient/AbstractRegularSubscription.java | 7 ++- .../dbclient/ReadResponseObserver.java | 11 +++- .../dbclient/LeaderRedirectUnitTest.java | 55 +++++++++++++++++++ .../java/io/kurrent/dbclient/MiscTests.java | 2 +- 4 files changed, 68 insertions(+), 7 deletions(-) create mode 100644 src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java diff --git a/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java b/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java index 34446efd..9309929e 100644 --- a/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java +++ b/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java @@ -40,7 +40,7 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure protected abstract StreamsOuterClass.ReadReq.Options.Builder createOptions(); public CompletableFuture execute() { - return this.client.run(channel -> { + return this.client.runWithArgs(args -> { CompletableFuture future = new CompletableFuture<>(); StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder() @@ -48,12 +48,13 @@ public CompletableFuture execute() { .build(); StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub( - StreamsGrpc.newStub(channel), + StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options ); - ReadResponseObserver observer = createObserver(channel, future); + ReadResponseObserver observer = createObserver(args.getChannel(), future); + observer.onConnected(args); streamsClient.read(readReq, observer); return future; diff --git a/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java b/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java index 468929db..328c8967 100644 --- a/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java +++ b/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java @@ -172,9 +172,14 @@ public void onError(Throwable t) { String leaderPort = trailers.get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); if (leaderHost != null && leaderPort != null) { - int port = Integer.parseInt(leaderPort); - this.args.reportNewLeader(leaderHost, port); - t = new NotLeaderException(leaderHost, port); + try { + int port = Integer.parseInt(leaderPort); + if (this.args != null) + this.args.reportNewLeader(leaderHost, port); + t = new NotLeaderException(leaderHost, port); + } catch (RuntimeException ex) { + logger.warn("failed to handle leader change notification", ex); + } } } } diff --git a/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java b/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java new file mode 100644 index 00000000..c3e69910 --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java @@ -0,0 +1,55 @@ +package io.kurrent.dbclient; + +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Regression guard for the leader-change NPE. When a subscription's onError receives + * leader-endpoint trailers (the NotLeader redirect), it must NOT throw and must still + * deliver onCancelled to the listener so the application can react/reconnect. + * + * Before the fix, ReadResponseObserver.onError called this.args.reportNewLeader(...) with + * args == null on the subscription path, throwing a NullPointerException before reaching + * consumer.onCancelled(...) and swallowing the failure. This test constructs the observer + * without onConnected (args == null, the worst case) to lock that behavior down. + */ +public class LeaderRedirectUnitTest { + @Test + public void onErrorLeaderRedirectStillInvokesOnCancelled() { + AtomicBoolean cancelledFired = new AtomicBoolean(false); + AtomicReference cancelErr = new AtomicReference<>(); + + SubscriptionStreamConsumer consumer = new SubscriptionStreamConsumer( + new SubscriptionListener() { + @Override + public void onCancelled(Subscription subscription, Throwable exception) { + cancelledFired.set(true); + cancelErr.set(exception); + } + }, + null, + new CompletableFuture<>(), + (id, ev, action) -> action.run() + ); + + // Subscription path with args unset (worst case): must not NPE, must notify listener. + ReadResponseObserver observer = new ReadResponseObserver(SubscribeToStreamOptions.get(), consumer); + + Metadata trailers = new Metadata(); + trailers.put(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER), "127.0.0.1"); + trailers.put(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER), "2113"); + StatusRuntimeException leaderRedirect = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + + Assertions.assertDoesNotThrow(() -> observer.onError(leaderRedirect)); + Assertions.assertTrue(cancelledFired.get(), "onCancelled must fire on a leader redirect"); + Assertions.assertTrue(cancelErr.get() instanceof NotLeaderException, + "listener should receive a NotLeaderException, got: " + cancelErr.get()); + } +} diff --git a/src/test/java/io/kurrent/dbclient/MiscTests.java b/src/test/java/io/kurrent/dbclient/MiscTests.java index 27566c09..658a2781 100644 --- a/src/test/java/io/kurrent/dbclient/MiscTests.java +++ b/src/test/java/io/kurrent/dbclient/MiscTests.java @@ -6,5 +6,5 @@ @Suite @SelectPackages("io.kurrent.dbclient.misc") -@SelectClasses(SubscriptionStreamConsumerTests.class) +@SelectClasses({SubscriptionStreamConsumerTests.class, LeaderRedirectUnitTest.class}) public class MiscTests {} From 794173e3fbf99f041c85b0279be2f20cd2ae273f Mon Sep 17 00:00:00 2001 From: w1am <33353798+w1am@users.noreply.github.com> Date: Thu, 25 Jun 2026 07:42:22 +0000 Subject: [PATCH 2/2] fix: handle NotLeader redirect in persistent subscription onError --- ...stractSubscribePersistentSubscription.java | 22 ++++++-- .../dbclient/LeaderRedirectUnitTest.java | 51 +++++++++++++++---- 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java b/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java index 22a36804..93241510 100644 --- a/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java +++ b/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java @@ -9,10 +9,13 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; abstract class AbstractSubscribePersistentSubscription { + private final static Logger logger = LoggerFactory.getLogger(AbstractSubscribePersistentSubscription.class); protected static final Persistent.ReadReq.Options.Builder defaultReadOptions; private final GrpcClient client; private final String group; @@ -119,11 +122,20 @@ public void onError(Throwable throwable) { return; } - String leaderHost = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); - String leaderPort = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); - - if (leaderHost != null && leaderPort != null) { - error = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); + Metadata trailers = sre.getTrailers(); + if (trailers != null) { + String leaderHost = trailers.get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); + String leaderPort = trailers.get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); + + if (leaderHost != null && leaderPort != null) { + try { + int port = Integer.parseInt(leaderPort); + args.reportNewLeader(leaderHost, port); + error = new NotLeaderException(leaderHost, port); + } catch (RuntimeException ex) { + logger.warn("failed to handle leader change notification", ex); + } + } } } diff --git a/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java b/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java index c3e69910..0a37246a 100644 --- a/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java +++ b/src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java @@ -6,19 +6,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** - * Regression guard for the leader-change NPE. When a subscription's onError receives - * leader-endpoint trailers (the NotLeader redirect), it must NOT throw and must still - * deliver onCancelled to the listener so the application can react/reconnect. - * - * Before the fix, ReadResponseObserver.onError called this.args.reportNewLeader(...) with - * args == null on the subscription path, throwing a NullPointerException before reaching - * consumer.onCancelled(...) and swallowing the failure. This test constructs the observer - * without onConnected (args == null, the worst case) to lock that behavior down. + * A NotLeader redirect on a subscription must not throw and must still deliver onCancelled. + * The bug: onError called reportNewLeader(...) on a null args, throwing before + * consumer.onCancelled(...) and swallowing the failure. */ public class LeaderRedirectUnitTest { @Test @@ -39,7 +36,7 @@ public void onCancelled(Subscription subscription, Throwable exception) { (id, ev, action) -> action.run() ); - // Subscription path with args unset (worst case): must not NPE, must notify listener. + // no onConnected(): args stays null ReadResponseObserver observer = new ReadResponseObserver(SubscribeToStreamOptions.get(), consumer); Metadata trailers = new Metadata(); @@ -52,4 +49,40 @@ public void onCancelled(Subscription subscription, Throwable exception) { Assertions.assertTrue(cancelErr.get() instanceof NotLeaderException, "listener should receive a NotLeaderException, got: " + cancelErr.get()); } + + @Test + public void onErrorLeaderRedirectReportsNewLeaderWhenConnected() { + AtomicReference cancelErr = new AtomicReference<>(); + + SubscriptionStreamConsumer consumer = new SubscriptionStreamConsumer( + new SubscriptionListener() { + @Override + public void onCancelled(Subscription subscription, Throwable exception) { + cancelErr.set(exception); + } + }, + null, + new CompletableFuture<>(), + (id, ev, action) -> action.run() + ); + + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + WorkItemArgs args = new WorkItemArgs(UUID.randomUUID(), null, null, null, queue); + + ReadResponseObserver observer = new ReadResponseObserver(SubscribeToStreamOptions.get(), consumer); + observer.onConnected(args); + + Metadata trailers = new Metadata(); + trailers.put(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER), "127.0.0.1"); + trailers.put(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER), "2113"); + + Assertions.assertDoesNotThrow(() -> + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE, trailers))); + + Msg enqueued = queue.poll(); + Assertions.assertTrue(enqueued instanceof CreateChannel, + "leader redirect should enqueue a CreateChannel for the new leader, got: " + enqueued); + Assertions.assertTrue(cancelErr.get() instanceof NotLeaderException, + "listener should receive a NotLeaderException, got: " + cancelErr.get()); + } }