From a2edecf7a49c1827b3e2df338ad3cd8105f4dd9b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:19:46 +0800 Subject: [PATCH] Improve IT cluster readiness diagnostics (#17903) * Improve IT cluster readiness diagnostics * Increase pipe IT cluster readiness retries (cherry picked from commit ef0d9f8534fbbb01efb294ce548f53361fac8ccb) --- .github/workflows/pipe-it-2cluster.yml | 6 + .../apache/iotdb/it/env/MultiEnvFactory.java | 1 + .../iotdb/it/env/cluster/env/AbstractEnv.java | 221 +++++++++++++++++- 3 files changed, 216 insertions(+), 12 deletions(-) diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml index 6c5f11bc35c61..89794a279f753 100644 --- a/.github/workflows/pipe-it-2cluster.yml +++ b/.github/workflows/pipe-it-2cluster.yml @@ -75,6 +75,7 @@ jobs: mvn clean verify \ -P with-integration-tests \ -DskipUTs \ + -DintegrationTest.clusterReadyRetryCount=90 \ -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ -pl integration-test \ @@ -161,6 +162,7 @@ jobs: mvn clean verify \ -P with-integration-tests \ -DskipUTs \ + -DintegrationTest.clusterReadyRetryCount=90 \ -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ -pl integration-test \ @@ -245,6 +247,7 @@ jobs: mvn clean verify \ -P with-integration-tests \ -DskipUTs \ + -DintegrationTest.clusterReadyRetryCount=90 \ -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ -pl integration-test \ @@ -329,6 +332,7 @@ jobs: mvn clean verify \ -P with-integration-tests \ -DskipUTs \ + -DintegrationTest.clusterReadyRetryCount=90 \ -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ -pl integration-test \ @@ -413,6 +417,7 @@ jobs: mvn clean verify \ -P with-integration-tests \ -DskipUTs \ + -DintegrationTest.clusterReadyRetryCount=90 \ -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ -pl integration-test \ @@ -498,6 +503,7 @@ jobs: mvn clean verify \ -P with-integration-tests \ -DskipUTs \ + -DintegrationTest.clusterReadyRetryCount=90 \ -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }},${{ matrix.cluster3 }} \ -pl integration-test \ diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java index 5832f1c485bc5..f2e5f9c1f903d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java @@ -51,6 +51,7 @@ public static BaseEnv getEnv(final int index) throws IndexOutOfBoundsException { /** Create several environments according to the specific number. */ public static void createEnv(final int num) { // Not judge EnvType for individual test convenience + envList.clear(); final long startTime = System.currentTimeMillis(); for (int i = 0; i < num; ++i) { try { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 07b7364d0042d..f2b7a103f5201 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; @@ -79,6 +80,9 @@ public abstract class AbstractEnv implements BaseEnv { private static final Logger logger = IoTDBTestLogger.logger; + private static final int DEFAULT_CLUSTER_READY_RETRY_COUNT = 30; + private static final String CLUSTER_READY_RETRY_COUNT_PROPERTY = + "integrationTest.clusterReadyRetryCount"; private final Random rand = new Random(); protected List configNodeWrapperList = Collections.emptyList(); @@ -87,7 +91,7 @@ public abstract class AbstractEnv implements BaseEnv { protected String testMethodName = null; protected int index = 0; protected long startTime; - protected int retryCount = 30; + protected int retryCount = getDefaultClusterReadyRetryCount(); private IClientManager clientManager; private List configNodeKillPoints = new ArrayList<>(); private List dataNodeKillPoints = new ArrayList<>(); @@ -110,6 +114,12 @@ protected AbstractEnv(final long startTime) { this.clusterConfig = new MppClusterConfig(); } + private static int getDefaultClusterReadyRetryCount() { + final int configuredRetryCount = + Integer.getInteger(CLUSTER_READY_RETRY_COUNT_PROPERTY, DEFAULT_CLUSTER_READY_RETRY_COUNT); + return configuredRetryCount > 0 ? configuredRetryCount : DEFAULT_CLUSTER_READY_RETRY_COUNT; + } + @Override public ClusterConfig getConfig() { return clusterConfig; @@ -331,12 +341,14 @@ private Map countNodeStatus(final Map nodeStat } public void checkNodeInStatus(int nodeId, NodeStatus expectation) { - checkClusterStatus(nodeStatusMap -> expectation.getStatus().equals(nodeStatusMap.get(nodeId))); + checkClusterStatus( + nodeStatusMap -> expectation.getStatus().equals(nodeStatusMap.get(nodeId)), m -> true); } public void checkClusterStatusWithoutUnknown() { checkClusterStatus( - nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals)); + nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals), + processStatus -> processStatus.values().stream().noneMatch(i -> i != 0)); testJDBCConnection(); } @@ -346,6 +358,10 @@ public void checkClusterStatusOneUnknownOtherRunning() { Map count = countNodeStatus(nodeStatus); return count.getOrDefault("Unknown", 0) == 1 && count.getOrDefault("Running", 0) == nodeStatus.size() - 1; + }, + processStatus -> { + long aliveProcessCount = processStatus.values().stream().filter(i -> i == 0).count(); + return aliveProcessCount == processStatus.size() - 1; }); testJDBCConnection(); } @@ -357,19 +373,52 @@ public void checkClusterStatusOneUnknownOtherRunning() { * @param statusCheck the predicate to test the status of nodes */ public void checkClusterStatus(final Predicate> statusCheck) { + checkClusterStatus( + statusCheck, processStatus -> processStatus.values().stream().noneMatch(i -> i != 0)); + } + + /** + * check whether all nodes' status match the provided predicate with RPC. after retryCount times, + * if the status of all nodes still not match the predicate, throw AssertionError. + * + * @param nodeStatusCheck the predicate to test the status of nodes + * @param processStatusCheck the predicate to test the status of processes + */ + public void checkClusterStatus( + final Predicate> nodeStatusCheck, + final Predicate> processStatusCheck) { logger.info("Testing cluster environment..."); TShowClusterResp showClusterResp; Exception lastException = null; - boolean flag; + boolean passed; + boolean showClusterPassed = true; + boolean nodeSizePassed = true; + boolean nodeStatusPassed = true; + boolean processStatusPassed = true; + TSStatus showClusterStatus = null; + int actualNodeSize = 0; + Map lastNodeStatus = null; + Map processStatusMap = new HashMap<>(); + for (int i = 0; i < retryCount; i++) { try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { - flag = true; + passed = true; + showClusterPassed = true; + nodeSizePassed = true; + nodeStatusPassed = true; + processStatusPassed = true; + processStatusMap.clear(); + showClusterResp = client.showCluster(); + showClusterStatus = showClusterResp.getStatus(); + actualNodeSize = showClusterResp.getNodeStatusSize(); + lastNodeStatus = showClusterResp.getNodeStatus(); // Check resp status if (showClusterResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - flag = false; + passed = false; + showClusterPassed = false; } // Check the number of nodes @@ -377,18 +426,36 @@ public void checkClusterStatus(final Predicate> statusCheck != configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size()) { - flag = false; + passed = false; + nodeSizePassed = false; } // Check the status of nodes - if (flag) { - flag = statusCheck.test(showClusterResp.getNodeStatus()); + if (passed) { + passed = nodeStatusCheck.test(showClusterResp.getNodeStatus()); + if (!passed) { + nodeStatusPassed = false; + } + } + + collectProcessStatus(processStatusMap); + processStatusPassed = processStatusCheck.test(processStatusMap); + if (!processStatusPassed) { + passed = false; + handleProcessStatus(processStatusMap); } - if (flag) { + if (passed) { logger.info("The cluster is now ready for testing!"); return; } + logger.info( + "Retry {}: showClusterPassed={}, nodeSizePassed={}, nodeStatusPassed={}, processStatusPassed={}", + i, + showClusterPassed, + nodeSizePassed, + nodeStatusPassed, + processStatusPassed); } catch (final Exception e) { lastException = e; } @@ -405,8 +472,132 @@ public void checkClusterStatus(final Predicate> statusCheck lastException.getMessage(), lastException); } + if (!showClusterPassed) { + logger.error("Show cluster failed: {}", showClusterStatus); + } + if (!nodeSizePassed) { + logger.error("Only {} nodes detected", actualNodeSize); + } + if (!nodeStatusPassed) { + logger.error("Some node status incorrect: {}", lastNodeStatus); + } + if (!processStatusPassed) { + logger.error("Some process status incorrect: {}", formatProcessStatus(processStatusMap)); + } + + dumpTestJVMSnapshotQuietly("cluster status check failed"); throw new AssertionError( - String.format("After %d times retry, the cluster can't work!", retryCount)); + buildClusterStatusFailureMessage( + showClusterPassed, + nodeSizePassed, + nodeStatusPassed, + processStatusPassed, + showClusterStatus, + actualNodeSize, + lastNodeStatus, + processStatusMap, + lastException)); + } + + private void collectProcessStatus(final Map processStatusMap) + throws InterruptedException { + final List allNodeWrappers = new ArrayList<>(); + allNodeWrappers.addAll(configNodeWrapperList); + allNodeWrappers.addAll(dataNodeWrapperList); + allNodeWrappers.addAll(aiNodeWrapperList); + for (final AbstractNodeWrapper nodeWrapper : allNodeWrappers) { + final Process process = nodeWrapper.getInstance(); + if (process == null) { + processStatusMap.put(nodeWrapper, -1); + } else { + processStatusMap.put(nodeWrapper, process.isAlive() ? 0 : process.waitFor()); + } + } + } + + private String buildClusterStatusFailureMessage( + final boolean showClusterPassed, + final boolean nodeSizePassed, + final boolean nodeStatusPassed, + final boolean processStatusPassed, + final TSStatus showClusterStatus, + final int actualNodeSize, + final Map lastNodeStatus, + final Map processStatusMap, + final Exception lastException) { + final StringBuilder builder = + new StringBuilder( + String.format("After %d times retry, the cluster status check failed", retryCount)); + builder + .append(": showClusterPassed=") + .append(showClusterPassed) + .append(", nodeSizePassed=") + .append(nodeSizePassed) + .append(", nodeStatusPassed=") + .append(nodeStatusPassed) + .append(", processStatusPassed=") + .append(processStatusPassed) + .append(", expectedNodeSize=") + .append( + configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size()) + .append(", actualNodeSize=") + .append(actualNodeSize); + if (showClusterStatus != null) { + builder.append(", showClusterStatus=").append(showClusterStatus); + } + if (lastNodeStatus != null) { + builder.append(", lastNodeStatus=").append(lastNodeStatus); + } + if (!processStatusMap.isEmpty()) { + builder.append(", processStatus=").append(formatProcessStatus(processStatusMap)); + } + if (lastException != null) { + builder + .append(", lastException=") + .append(lastException.getClass().getName()) + .append(": ") + .append(lastException.getMessage()); + } + builder.append(", logDirs=").append(getClusterLogDirs()); + return builder.toString(); + } + + private Map formatProcessStatus( + final Map processStatusMap) { + final Map result = new LinkedHashMap<>(); + processStatusMap.forEach( + (nodeWrapper, statusCode) -> result.put(nodeWrapper.getId(), statusCode)); + return result; + } + + private List getClusterLogDirs() { + final List allNodeWrappers = new ArrayList<>(); + allNodeWrappers.addAll(configNodeWrapperList); + allNodeWrappers.addAll(dataNodeWrapperList); + allNodeWrappers.addAll(aiNodeWrapperList); + return allNodeWrappers.stream() + .map(AbstractNodeWrapper::getLogDirPath) + .distinct() + .collect(Collectors.toList()); + } + + private void dumpTestJVMSnapshotQuietly(final String reason) { + try { + logger.info("Dumping test JVM snapshots because {}.", reason); + dumpTestJVMSnapshot(); + } catch (final Exception e) { + logger.warn("Failed to dump test JVM snapshots after {}", reason, e); + } + } + + private void handleProcessStatus(final Map processStatusMap) { + for (final Map.Entry entry : processStatusMap.entrySet()) { + final Integer statusCode = entry.getValue(); + final AbstractNodeWrapper nodeWrapper = entry.getKey(); + if (statusCode != 0) { + logger.info("Node {} is not running due to {}", nodeWrapper.getId(), statusCode); + } + } } @Override @@ -698,6 +889,7 @@ protected void testJDBCConnection() { .collect(Collectors.toList()); final RequestDelegate testDelegate = new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this); + final Map lastConnectionFailures = Collections.synchronizedMap(new HashMap<>()); for (final DataNodeWrapper dataNode : dataNodeWrapperList) { final String dataNodeEndpoint = dataNode.getIpAndPortString(); testDelegate.addRequest( @@ -716,6 +908,8 @@ protected void testJDBCConnection() { return null; } catch (final Exception e) { lastException = e; + lastConnectionFailures.put( + dataNodeEndpoint, e.getClass().getName() + ": " + e.getMessage()); TimeUnit.SECONDS.sleep(1L); } } @@ -729,8 +923,11 @@ protected void testJDBCConnection() { testDelegate.requestAll(); } catch (final Exception e) { logger.error("exception in test Cluster with RPC, message: {}", e.getMessage(), e); + dumpTestJVMSnapshotQuietly("JDBC connection check failed"); throw new AssertionError( - String.format("After %d times retry, the cluster can't work!", retryCount)); + String.format( + "After %d times retry, JDBC connections to DataNodes are not ready. endpoints=%s, lastConnectionFailures=%s, logDirs=%s", + retryCount, endpoints, lastConnectionFailures, getClusterLogDirs())); } }