dataNodeWrappers,
+ final String testClassName) {
+ String seedConfigNode = configNodeWrappers.get(0).getIpAndPortString();
+ int dataNodePort = dataNodeWrappers.get(0).getPort();
+ startAINode(seedConfigNode, dataNodePort, testClassName);
}
private void startAINode(
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 97a32e4653374..74c34392fb75f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -251,21 +251,27 @@ protected void initEnvironment(
throw new AssertionError();
}
- initExtraNodes(seedConfigNode, this.dataNodeWrapperList.get(0).getPort());
+ initExtraNodes(configNodeWrapperList, dataNodeWrapperList, testClassName);
checkClusterStatusWithoutUnknown();
}
/**
- * Hook for subclasses to create and start extra node types (e.g., AINode, StreamNode) beyond the
- * core ConfigNode and DataNode. Subclasses should create node wrappers, add them to {@link
- * #extraNodeWrappers}, and start them.
+ * Hook method for subclasses to initialize and start extra node types beyond the core ConfigNode
+ * and DataNode (e.g., AINode, StreamNode, ProxyNode).
*
- * @param seedConfigNode the ip:port of the seed ConfigNode
- * @param dataNodePort the port of the first DataNode (useful for nodes that need it, e.g.,
- * AINode)
+ * Subclasses should create node wrappers, add them to {@link #extraNodeWrappers}, configure
+ * kill points via {@link #extraNodeKillPoints}, and start the nodes. Subclasses have direct
+ * access to protected fields: {@code testMethodName}, {@code index}, {@code startTime}.
+ *
+ * @param configNodeWrappers list of all ConfigNode wrappers in the cluster (unmodifiable)
+ * @param dataNodeWrappers list of all DataNode wrappers in the cluster (unmodifiable)
+ * @param testClassName the test class name for logging and identification purposes
*/
- protected void initExtraNodes(final String seedConfigNode, final int dataNodePort) {
+ protected void initExtraNodes(
+ final List configNodeWrappers,
+ final List dataNodeWrappers,
+ final String testClassName) {
// Default: no extra nodes. Subclasses override to add nodes.
}
From 12897881bd905c7ac4473fc9635f1f64a27ecd81 Mon Sep 17 00:00:00 2001
From: clsu <404083629@qq.com>
Date: Wed, 17 Jun 2026 20:40:40 +0800
Subject: [PATCH 6/6] fix code review
---
.../iotdb/it/env/cluster/env/AIEnv.java | 59 +++----------
.../iotdb/it/env/cluster/env/AbstractEnv.java | 4 +
.../it/env/cluster/node/AINodeStarter.java | 84 +++++++++++++++++++
.../runtime/ParallelRequestDelegate.java | 13 ++-
4 files changed, 109 insertions(+), 51 deletions(-)
create mode 100644 integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
index cf9520ed6a258..a15a6243b3c74 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
@@ -19,24 +19,13 @@
package org.apache.iotdb.it.env.cluster.env;
-import org.apache.iotdb.it.env.cluster.EnvUtils;
-import org.apache.iotdb.it.env.cluster.node.AINodeWrapper;
+import org.apache.iotdb.it.env.cluster.node.AINodeStarter;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
-import org.apache.iotdb.it.framework.IoTDBTestLogger;
-import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate;
-import org.apache.iotdb.itbase.runtime.RequestDelegate;
-import org.slf4j.Logger;
-
-import java.sql.SQLException;
-import java.util.Collections;
import java.util.List;
-import static org.apache.iotdb.it.env.cluster.ClusterConstant.NODE_START_TIMEOUT;
-
public class AIEnv extends AbstractEnv {
- private static final Logger logger = IoTDBTestLogger.logger;
@Override
public void initClusterEnvironment() {
@@ -59,41 +48,15 @@ protected void initExtraNodes(
final List configNodeWrappers,
final List dataNodeWrappers,
final String testClassName) {
- String seedConfigNode = configNodeWrappers.get(0).getIpAndPortString();
- int dataNodePort = dataNodeWrappers.get(0).getPort();
- startAINode(seedConfigNode, dataNodePort, testClassName);
- }
-
- private void startAINode(
- final String seedConfigNode, final int clusterIngressPort, final String testClassName) {
- final AINodeWrapper aiNodeWrapper =
- new AINodeWrapper(
- seedConfigNode,
- clusterIngressPort,
- testClassName,
- testMethodName,
- index,
- EnvUtils.searchAvailablePorts(),
- startTime);
- extraNodeWrappers.add(aiNodeWrapper);
- aiNodeWrapper.setKillPoints(extraNodeKillPoints);
- final String aiNodeEndPoint = aiNodeWrapper.getIpAndPortString();
- aiNodeWrapper.createNodeDir();
- aiNodeWrapper.createLogDir();
- final RequestDelegate aiNodesDelegate =
- new ParallelRequestDelegate<>(
- Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT, this);
-
- aiNodesDelegate.addRequest(
- () -> {
- aiNodeWrapper.start();
- return null;
- });
-
- try {
- aiNodesDelegate.requestAll();
- } catch (final SQLException e) {
- logger.error("Start aiNodes failed", e);
- }
+ AINodeStarter.startAINode(
+ configNodeWrappers.get(0).getIpAndPortString(),
+ dataNodeWrappers.get(0).getPort(),
+ testClassName,
+ testMethodName,
+ index,
+ startTime,
+ extraNodeKillPoints,
+ this::registerExtraNode,
+ this::dumpTestJVMSnapshot);
}
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 74c34392fb75f..77ccc25d9368a 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -275,6 +275,10 @@ protected void initExtraNodes(
// Default: no extra nodes. Subclasses override to add nodes.
}
+ protected void registerExtraNode(final AbstractNodeWrapper nodeWrapper) {
+ extraNodeWrappers.add(nodeWrapper);
+ }
+
private ConfigNodeWrapper newConfigNode() {
final ConfigNodeWrapper configNodeWrapper =
new ConfigNodeWrapper(
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java
new file mode 100644
index 0000000000000..cde3aaa923523
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.it.env.cluster.node;
+
+import org.apache.iotdb.it.env.cluster.EnvUtils;
+import org.apache.iotdb.it.framework.IoTDBTestLogger;
+import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate;
+import org.apache.iotdb.itbase.runtime.RequestDelegate;
+
+import org.slf4j.Logger;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.apache.iotdb.it.env.cluster.ClusterConstant.NODE_START_TIMEOUT;
+
+public class AINodeStarter {
+ private static final Logger logger = IoTDBTestLogger.logger;
+
+ private AINodeStarter() {}
+
+ public static AINodeWrapper startAINode(
+ final String seedConfigNode,
+ final int clusterIngressPort,
+ final String testClassName,
+ final String testMethodName,
+ final int clusterIndex,
+ final long startTime,
+ final List killPoints,
+ final Consumer nodeRegister,
+ final Runnable dumpTestJVMSnapshot) {
+ final AINodeWrapper aiNodeWrapper =
+ new AINodeWrapper(
+ seedConfigNode,
+ clusterIngressPort,
+ testClassName,
+ testMethodName,
+ clusterIndex,
+ EnvUtils.searchAvailablePorts(),
+ startTime);
+ nodeRegister.accept(aiNodeWrapper);
+ aiNodeWrapper.setKillPoints(killPoints);
+ aiNodeWrapper.createNodeDir();
+ aiNodeWrapper.createLogDir();
+
+ final RequestDelegate aiNodesDelegate =
+ new ParallelRequestDelegate<>(
+ Collections.singletonList(aiNodeWrapper.getIpAndPortString()),
+ NODE_START_TIMEOUT,
+ dumpTestJVMSnapshot);
+ aiNodesDelegate.addRequest(
+ () -> {
+ aiNodeWrapper.start();
+ return null;
+ });
+
+ try {
+ aiNodesDelegate.requestAll();
+ } catch (final SQLException e) {
+ logger.error("Start AINode {} failed", aiNodeWrapper.getId(), e);
+ throw new AssertionError();
+ }
+ return aiNodeWrapper;
+ }
+}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
index 437a6da6e5b13..26fd41b466c0e 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
@@ -37,13 +37,20 @@
*/
public class ParallelRequestDelegate extends RequestDelegate {
private final int taskTimeoutSeconds;
- private final AbstractEnv env;
+ private final Runnable dumpTestJVMSnapshot;
public ParallelRequestDelegate(
final List endpoints, final int taskTimeoutSeconds, final AbstractEnv env) {
+ this(endpoints, taskTimeoutSeconds, env::dumpTestJVMSnapshot);
+ }
+
+ public ParallelRequestDelegate(
+ final List endpoints,
+ final int taskTimeoutSeconds,
+ final Runnable dumpTestJVMSnapshot) {
super(endpoints);
this.taskTimeoutSeconds = taskTimeoutSeconds;
- this.env = env;
+ this.dumpTestJVMSnapshot = dumpTestJVMSnapshot;
}
public List requestAll() throws SQLException {
@@ -60,7 +67,7 @@ public List requestAll() throws SQLException {
} catch (ExecutionException e) {
exceptions[i] = e;
} catch (InterruptedException | TimeoutException e) {
- env.dumpTestJVMSnapshot();
+ dumpTestJVMSnapshot.run();
for (int j = i; j < getEndpoints().size(); j++) {
resultFutures.get(j).cancel(true);
}