Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/pipe-it-2cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \
-pl integration-test \
Expand Down Expand Up @@ -161,6 +162,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-pl integration-test \
Expand Down Expand Up @@ -245,6 +247,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-pl integration-test \
Expand Down Expand Up @@ -329,6 +332,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-pl integration-test \
Expand Down Expand Up @@ -413,6 +417,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-pl integration-test \
Expand Down Expand Up @@ -498,6 +503,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }},${{ matrix.cluster3 }} \
-pl integration-test \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static BaseEnv getEnv(final int index) throws IndexOutOfBoundsException {
/** Create several environments according to the specific number. */
public static void createEnv(final int num) {
// Not judge EnvType for individual test convenience
envList.clear();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < num; ++i) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
Expand Down Expand Up @@ -79,6 +80,9 @@

public abstract class AbstractEnv implements BaseEnv {
private static final Logger logger = IoTDBTestLogger.logger;
private static final int DEFAULT_CLUSTER_READY_RETRY_COUNT = 30;
private static final String CLUSTER_READY_RETRY_COUNT_PROPERTY =
"integrationTest.clusterReadyRetryCount";

private final Random rand = new Random();
protected List<ConfigNodeWrapper> configNodeWrapperList = Collections.emptyList();
Expand All @@ -87,7 +91,7 @@ public abstract class AbstractEnv implements BaseEnv {
protected String testMethodName = null;
protected int index = 0;
protected long startTime;
protected int retryCount = 30;
protected int retryCount = getDefaultClusterReadyRetryCount();
private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
private List<String> configNodeKillPoints = new ArrayList<>();
private List<String> dataNodeKillPoints = new ArrayList<>();
Expand All @@ -110,6 +114,12 @@ protected AbstractEnv(final long startTime) {
this.clusterConfig = new MppClusterConfig();
}

private static int getDefaultClusterReadyRetryCount() {
final int configuredRetryCount =
Integer.getInteger(CLUSTER_READY_RETRY_COUNT_PROPERTY, DEFAULT_CLUSTER_READY_RETRY_COUNT);
return configuredRetryCount > 0 ? configuredRetryCount : DEFAULT_CLUSTER_READY_RETRY_COUNT;
}

@Override
public ClusterConfig getConfig() {
return clusterConfig;
Expand Down Expand Up @@ -331,12 +341,14 @@ private Map<String, Integer> countNodeStatus(final Map<Integer, String> nodeStat
}

public void checkNodeInStatus(int nodeId, NodeStatus expectation) {
checkClusterStatus(nodeStatusMap -> expectation.getStatus().equals(nodeStatusMap.get(nodeId)));
checkClusterStatus(
nodeStatusMap -> expectation.getStatus().equals(nodeStatusMap.get(nodeId)), m -> true);
}

public void checkClusterStatusWithoutUnknown() {
checkClusterStatus(
nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals));
nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals),
processStatus -> processStatus.values().stream().noneMatch(i -> i != 0));
testJDBCConnection();
}

Expand All @@ -346,6 +358,10 @@ public void checkClusterStatusOneUnknownOtherRunning() {
Map<String, Integer> count = countNodeStatus(nodeStatus);
return count.getOrDefault("Unknown", 0) == 1
&& count.getOrDefault("Running", 0) == nodeStatus.size() - 1;
},
processStatus -> {
long aliveProcessCount = processStatus.values().stream().filter(i -> i == 0).count();
return aliveProcessCount == processStatus.size() - 1;
});
testJDBCConnection();
}
Expand All @@ -357,38 +373,89 @@ public void checkClusterStatusOneUnknownOtherRunning() {
* @param statusCheck the predicate to test the status of nodes
*/
public void checkClusterStatus(final Predicate<Map<Integer, String>> statusCheck) {
checkClusterStatus(
statusCheck, processStatus -> processStatus.values().stream().noneMatch(i -> i != 0));
}

/**
* check whether all nodes' status match the provided predicate with RPC. after retryCount times,
* if the status of all nodes still not match the predicate, throw AssertionError.
*
* @param nodeStatusCheck the predicate to test the status of nodes
* @param processStatusCheck the predicate to test the status of processes
*/
public void checkClusterStatus(
final Predicate<Map<Integer, String>> nodeStatusCheck,
final Predicate<Map<AbstractNodeWrapper, Integer>> processStatusCheck) {
logger.info("Testing cluster environment...");
TShowClusterResp showClusterResp;
Exception lastException = null;
boolean flag;
boolean passed;
boolean showClusterPassed = true;
boolean nodeSizePassed = true;
boolean nodeStatusPassed = true;
boolean processStatusPassed = true;
TSStatus showClusterStatus = null;
int actualNodeSize = 0;
Map<Integer, String> lastNodeStatus = null;
Map<AbstractNodeWrapper, Integer> processStatusMap = new HashMap<>();

for (int i = 0; i < retryCount; i++) {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
flag = true;
passed = true;
showClusterPassed = true;
nodeSizePassed = true;
nodeStatusPassed = true;
processStatusPassed = true;
processStatusMap.clear();

showClusterResp = client.showCluster();
showClusterStatus = showClusterResp.getStatus();
actualNodeSize = showClusterResp.getNodeStatusSize();
lastNodeStatus = showClusterResp.getNodeStatus();

// Check resp status
if (showClusterResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
flag = false;
passed = false;
showClusterPassed = false;
}

// Check the number of nodes
if (showClusterResp.getNodeStatus().size()
!= configNodeWrapperList.size()
+ dataNodeWrapperList.size()
+ aiNodeWrapperList.size()) {
flag = false;
passed = false;
nodeSizePassed = false;
}

// Check the status of nodes
if (flag) {
flag = statusCheck.test(showClusterResp.getNodeStatus());
if (passed) {
passed = nodeStatusCheck.test(showClusterResp.getNodeStatus());
if (!passed) {
nodeStatusPassed = false;
}
}

collectProcessStatus(processStatusMap);
processStatusPassed = processStatusCheck.test(processStatusMap);
if (!processStatusPassed) {
passed = false;
handleProcessStatus(processStatusMap);
}

if (flag) {
if (passed) {
logger.info("The cluster is now ready for testing!");
return;
}
logger.info(
"Retry {}: showClusterPassed={}, nodeSizePassed={}, nodeStatusPassed={}, processStatusPassed={}",
i,
showClusterPassed,
nodeSizePassed,
nodeStatusPassed,
processStatusPassed);
} catch (final Exception e) {
lastException = e;
}
Expand All @@ -405,8 +472,132 @@ public void checkClusterStatus(final Predicate<Map<Integer, String>> statusCheck
lastException.getMessage(),
lastException);
}
if (!showClusterPassed) {
logger.error("Show cluster failed: {}", showClusterStatus);
}
if (!nodeSizePassed) {
logger.error("Only {} nodes detected", actualNodeSize);
}
if (!nodeStatusPassed) {
logger.error("Some node status incorrect: {}", lastNodeStatus);
}
if (!processStatusPassed) {
logger.error("Some process status incorrect: {}", formatProcessStatus(processStatusMap));
}

dumpTestJVMSnapshotQuietly("cluster status check failed");
throw new AssertionError(
String.format("After %d times retry, the cluster can't work!", retryCount));
buildClusterStatusFailureMessage(
showClusterPassed,
nodeSizePassed,
nodeStatusPassed,
processStatusPassed,
showClusterStatus,
actualNodeSize,
lastNodeStatus,
processStatusMap,
lastException));
}

private void collectProcessStatus(final Map<AbstractNodeWrapper, Integer> processStatusMap)
throws InterruptedException {
final List<AbstractNodeWrapper> allNodeWrappers = new ArrayList<>();
allNodeWrappers.addAll(configNodeWrapperList);
allNodeWrappers.addAll(dataNodeWrapperList);
allNodeWrappers.addAll(aiNodeWrapperList);
for (final AbstractNodeWrapper nodeWrapper : allNodeWrappers) {
final Process process = nodeWrapper.getInstance();
if (process == null) {
processStatusMap.put(nodeWrapper, -1);
} else {
processStatusMap.put(nodeWrapper, process.isAlive() ? 0 : process.waitFor());
}
}
}

private String buildClusterStatusFailureMessage(
final boolean showClusterPassed,
final boolean nodeSizePassed,
final boolean nodeStatusPassed,
final boolean processStatusPassed,
final TSStatus showClusterStatus,
final int actualNodeSize,
final Map<Integer, String> lastNodeStatus,
final Map<AbstractNodeWrapper, Integer> processStatusMap,
final Exception lastException) {
final StringBuilder builder =
new StringBuilder(
String.format("After %d times retry, the cluster status check failed", retryCount));
builder
.append(": showClusterPassed=")
.append(showClusterPassed)
.append(", nodeSizePassed=")
.append(nodeSizePassed)
.append(", nodeStatusPassed=")
.append(nodeStatusPassed)
.append(", processStatusPassed=")
.append(processStatusPassed)
.append(", expectedNodeSize=")
.append(
configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size())
.append(", actualNodeSize=")
.append(actualNodeSize);
if (showClusterStatus != null) {
builder.append(", showClusterStatus=").append(showClusterStatus);
}
if (lastNodeStatus != null) {
builder.append(", lastNodeStatus=").append(lastNodeStatus);
}
if (!processStatusMap.isEmpty()) {
builder.append(", processStatus=").append(formatProcessStatus(processStatusMap));
}
if (lastException != null) {
builder
.append(", lastException=")
.append(lastException.getClass().getName())
.append(": ")
.append(lastException.getMessage());
}
builder.append(", logDirs=").append(getClusterLogDirs());
return builder.toString();
}

private Map<String, Integer> formatProcessStatus(
final Map<AbstractNodeWrapper, Integer> processStatusMap) {
final Map<String, Integer> result = new LinkedHashMap<>();
processStatusMap.forEach(
(nodeWrapper, statusCode) -> result.put(nodeWrapper.getId(), statusCode));
return result;
}

private List<String> getClusterLogDirs() {
final List<AbstractNodeWrapper> allNodeWrappers = new ArrayList<>();
allNodeWrappers.addAll(configNodeWrapperList);
allNodeWrappers.addAll(dataNodeWrapperList);
allNodeWrappers.addAll(aiNodeWrapperList);
return allNodeWrappers.stream()
.map(AbstractNodeWrapper::getLogDirPath)
.distinct()
.collect(Collectors.toList());
}

private void dumpTestJVMSnapshotQuietly(final String reason) {
try {
logger.info("Dumping test JVM snapshots because {}.", reason);
dumpTestJVMSnapshot();
} catch (final Exception e) {
logger.warn("Failed to dump test JVM snapshots after {}", reason, e);
}
}

private void handleProcessStatus(final Map<AbstractNodeWrapper, Integer> processStatusMap) {
for (final Map.Entry<AbstractNodeWrapper, Integer> entry : processStatusMap.entrySet()) {
final Integer statusCode = entry.getValue();
final AbstractNodeWrapper nodeWrapper = entry.getKey();
if (statusCode != 0) {
logger.info("Node {} is not running due to {}", nodeWrapper.getId(), statusCode);
}
}
}

@Override
Expand Down Expand Up @@ -698,6 +889,7 @@ protected void testJDBCConnection() {
.collect(Collectors.toList());
final RequestDelegate<Void> testDelegate =
new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
final Map<String, String> lastConnectionFailures = Collections.synchronizedMap(new HashMap<>());
for (final DataNodeWrapper dataNode : dataNodeWrapperList) {
final String dataNodeEndpoint = dataNode.getIpAndPortString();
testDelegate.addRequest(
Expand All @@ -716,6 +908,8 @@ protected void testJDBCConnection() {
return null;
} catch (final Exception e) {
lastException = e;
lastConnectionFailures.put(
dataNodeEndpoint, e.getClass().getName() + ": " + e.getMessage());
TimeUnit.SECONDS.sleep(1L);
}
}
Expand All @@ -729,8 +923,11 @@ protected void testJDBCConnection() {
testDelegate.requestAll();
} catch (final Exception e) {
logger.error("exception in test Cluster with RPC, message: {}", e.getMessage(), e);
dumpTestJVMSnapshotQuietly("JDBC connection check failed");
throw new AssertionError(
String.format("After %d times retry, the cluster can't work!", retryCount));
String.format(
"After %d times retry, JDBC connections to DataNodes are not ready. endpoints=%s, lastConnectionFailures=%s, logDirs=%s",
retryCount, endpoints, lastConnectionFailures, getClusterLogDirs()));
}
}

Expand Down
Loading