Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,21 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure
protected abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();

public CompletableFuture<Subscription> execute() {
return this.client.run(channel -> {
return this.client.runWithArgs(args -> {
CompletableFuture<Subscription> future = new CompletableFuture<>();

StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
.build();

StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(
StreamsGrpc.newStub(channel),
StreamsGrpc.newStub(args.getChannel()),
this.client.getSettings(),
this.options
);

ReadResponseObserver observer = createObserver(channel, future);
ReadResponseObserver observer = createObserver(args.getChannel(), future);
observer.onConnected(args);
streamsClient.read(readReq, observer);

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

abstract class AbstractSubscribePersistentSubscription {
private final static Logger logger = LoggerFactory.getLogger(AbstractSubscribePersistentSubscription.class);
protected static final Persistent.ReadReq.Options.Builder defaultReadOptions;
private final GrpcClient client;
private final String group;
Expand Down Expand Up @@ -119,11 +122,20 @@ public void onError(Throwable throwable) {
return;
}

String leaderHost = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
String leaderPort = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));

if (leaderHost != null && leaderPort != null) {
error = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
Metadata trailers = sre.getTrailers();
if (trailers != null) {
String leaderHost = trailers.get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
String leaderPort = trailers.get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));

if (leaderHost != null && leaderPort != null) {
try {
int port = Integer.parseInt(leaderPort);
args.reportNewLeader(leaderHost, port);
error = new NotLeaderException(leaderHost, port);
} catch (RuntimeException ex) {
logger.warn("failed to handle leader change notification", ex);
}
}
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/main/java/io/kurrent/dbclient/ReadResponseObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,14 @@ public void onError(Throwable t) {
String leaderPort = trailers.get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));

if (leaderHost != null && leaderPort != null) {
int port = Integer.parseInt(leaderPort);
this.args.reportNewLeader(leaderHost, port);
t = new NotLeaderException(leaderHost, port);
try {
int port = Integer.parseInt(leaderPort);
if (this.args != null)
this.args.reportNewLeader(leaderHost, port);
t = new NotLeaderException(leaderHost, port);
} catch (RuntimeException ex) {
logger.warn("failed to handle leader change notification", ex);
}
}
}
}
Expand Down
88 changes: 88 additions & 0 deletions src/test/java/io/kurrent/dbclient/LeaderRedirectUnitTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.kurrent.dbclient;

import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* A NotLeader redirect on a subscription must not throw and must still deliver onCancelled.
* The bug: onError called reportNewLeader(...) on a null args, throwing before
* consumer.onCancelled(...) and swallowing the failure.
*/
public class LeaderRedirectUnitTest {
@Test
public void onErrorLeaderRedirectStillInvokesOnCancelled() {
AtomicBoolean cancelledFired = new AtomicBoolean(false);
AtomicReference<Throwable> cancelErr = new AtomicReference<>();

SubscriptionStreamConsumer consumer = new SubscriptionStreamConsumer(
new SubscriptionListener() {
@Override
public void onCancelled(Subscription subscription, Throwable exception) {
cancelledFired.set(true);
cancelErr.set(exception);
}
},
null,
new CompletableFuture<>(),
(id, ev, action) -> action.run()
);

// no onConnected(): args stays null
ReadResponseObserver observer = new ReadResponseObserver(SubscribeToStreamOptions.get(), consumer);

Metadata trailers = new Metadata();
trailers.put(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER), "127.0.0.1");
trailers.put(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER), "2113");
StatusRuntimeException leaderRedirect = new StatusRuntimeException(Status.UNAVAILABLE, trailers);

Assertions.assertDoesNotThrow(() -> observer.onError(leaderRedirect));
Assertions.assertTrue(cancelledFired.get(), "onCancelled must fire on a leader redirect");
Assertions.assertTrue(cancelErr.get() instanceof NotLeaderException,
"listener should receive a NotLeaderException, got: " + cancelErr.get());
}

@Test
public void onErrorLeaderRedirectReportsNewLeaderWhenConnected() {
AtomicReference<Throwable> cancelErr = new AtomicReference<>();

SubscriptionStreamConsumer consumer = new SubscriptionStreamConsumer(
new SubscriptionListener() {
@Override
public void onCancelled(Subscription subscription, Throwable exception) {
cancelErr.set(exception);
}
},
null,
new CompletableFuture<>(),
(id, ev, action) -> action.run()
);

LinkedBlockingQueue<Msg> queue = new LinkedBlockingQueue<>();
WorkItemArgs args = new WorkItemArgs(UUID.randomUUID(), null, null, null, queue);

ReadResponseObserver observer = new ReadResponseObserver(SubscribeToStreamOptions.get(), consumer);
observer.onConnected(args);

Metadata trailers = new Metadata();
trailers.put(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER), "127.0.0.1");
trailers.put(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER), "2113");

Assertions.assertDoesNotThrow(() ->
observer.onError(new StatusRuntimeException(Status.UNAVAILABLE, trailers)));

Msg enqueued = queue.poll();
Assertions.assertTrue(enqueued instanceof CreateChannel,
"leader redirect should enqueue a CreateChannel for the new leader, got: " + enqueued);
Assertions.assertTrue(cancelErr.get() instanceof NotLeaderException,
"listener should receive a NotLeaderException, got: " + cancelErr.get());
}
}
2 changes: 1 addition & 1 deletion src/test/java/io/kurrent/dbclient/MiscTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

@Suite
@SelectPackages("io.kurrent.dbclient.misc")
@SelectClasses(SubscriptionStreamConsumerTests.class)
@SelectClasses({SubscriptionStreamConsumerTests.class, LeaderRedirectUnitTest.class})
public class MiscTests {}
Loading