diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java index f812e9db3d160..a15a6243b3c74 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java @@ -19,7 +19,14 @@ package org.apache.iotdb.it.env.cluster.env; +import org.apache.iotdb.it.env.cluster.node.AINodeStarter; +import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; + +import java.util.List; + public class AIEnv extends AbstractEnv { + @Override public void initClusterEnvironment() { initClusterEnvironment(1, 1); @@ -27,12 +34,29 @@ public void initClusterEnvironment() { @Override public void initClusterEnvironment(int configNodesNum, int dataNodesNum) { - super.initEnvironment(configNodesNum, dataNodesNum, 600, true); + super.initEnvironment(configNodesNum, dataNodesNum, 600); } @Override public void initClusterEnvironment( int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { - super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, true); + super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount); + } + + @Override + protected void initExtraNodes( + final List configNodeWrappers, + final List dataNodeWrappers, + final String testClassName) { + AINodeStarter.startAINode( + configNodeWrappers.get(0).getIpAndPortString(), + dataNodeWrappers.get(0).getPort(), + testClassName, + testMethodName, + index, + startTime, + extraNodeKillPoints, + this::registerExtraNode, + this::dumpTestJVMSnapshot); } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 0e7eefb3a4188..77ccc25d9368a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -47,7 +47,6 @@ import org.apache.iotdb.it.env.cluster.config.MppConfigNodeConfig; import org.apache.iotdb.it.env.cluster.config.MppDataNodeConfig; import org.apache.iotdb.it.env.cluster.config.MppJVMConfig; -import org.apache.iotdb.it.env.cluster.node.AINodeWrapper; import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper; import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; @@ -101,7 +100,7 @@ public abstract class AbstractEnv implements BaseEnv { private final Random rand = new Random(); protected List configNodeWrapperList = Collections.emptyList(); protected List dataNodeWrapperList = Collections.emptyList(); - protected List aiNodeWrapperList = Collections.emptyList(); + protected List extraNodeWrappers = Collections.emptyList(); protected String testMethodName = null; protected int index = 0; protected long startTime; @@ -109,6 +108,7 @@ public abstract class AbstractEnv implements BaseEnv { private IClientManager clientManager; private List configNodeKillPoints = new ArrayList<>(); private List dataNodeKillPoints = new ArrayList<>(); + protected List extraNodeKillPoints = new ArrayList<>(); /** * This config object stores the properties set by developers during the test. It will be cleared @@ -169,17 +169,10 @@ protected void initEnvironment(final int configNodesNum, final int dataNodesNum) protected void initEnvironment( final int configNodesNum, final int dataNodesNum, final int testWorkingRetryCount) { - initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, false); - } - - protected void initEnvironment( - final int configNodesNum, - final int dataNodesNum, - final int retryCount, - final boolean addAINode) { - this.retryCount = retryCount; + this.retryCount = testWorkingRetryCount; this.configNodeWrapperList = new ArrayList<>(); this.dataNodeWrapperList = new ArrayList<>(); + this.extraNodeWrappers = new ArrayList<>(); clientManager = new IClientManager.Factory() @@ -258,14 +251,34 @@ protected void initEnvironment( throw new AssertionError(); } - if (addAINode) { - this.aiNodeWrapperList = new ArrayList<>(); - startAINode(seedConfigNode, this.dataNodeWrapperList.get(0).getPort(), testClassName); - } + initExtraNodes(configNodeWrapperList, dataNodeWrapperList, testClassName); checkClusterStatusWithoutUnknown(); } + /** + * Hook method for subclasses to initialize and start extra node types beyond the core ConfigNode + * and DataNode (e.g., AINode, StreamNode, ProxyNode). + * + *

Subclasses should create node wrappers, add them to {@link #extraNodeWrappers}, configure + * kill points via {@link #extraNodeKillPoints}, and start the nodes. Subclasses have direct + * access to protected fields: {@code testMethodName}, {@code index}, {@code startTime}. + * + * @param configNodeWrappers list of all ConfigNode wrappers in the cluster (unmodifiable) + * @param dataNodeWrappers list of all DataNode wrappers in the cluster (unmodifiable) + * @param testClassName the test class name for logging and identification purposes + */ + protected void initExtraNodes( + final List configNodeWrappers, + final List dataNodeWrappers, + final String testClassName) { + // Default: no extra nodes. Subclasses override to add nodes. + } + + protected void registerExtraNode(final AbstractNodeWrapper nodeWrapper) { + extraNodeWrappers.add(nodeWrapper); + } + private ConfigNodeWrapper newConfigNode() { final ConfigNodeWrapper configNodeWrapper = new ConfigNodeWrapper( @@ -309,39 +322,6 @@ private DataNodeWrapper newDataNode() { return dataNodeWrapper; } - private void startAINode( - final String seedConfigNode, final int clusterIngressPort, final String testClassName) { - final String aiNodeEndPoint; - final AINodeWrapper aiNodeWrapper = - new AINodeWrapper( - seedConfigNode, - clusterIngressPort, - testClassName, - testMethodName, - index, - EnvUtils.searchAvailablePorts(), - startTime); - aiNodeWrapperList.add(aiNodeWrapper); - aiNodeEndPoint = aiNodeWrapper.getIpAndPortString(); - aiNodeWrapper.createNodeDir(); - aiNodeWrapper.createLogDir(); - final RequestDelegate aiNodesDelegate = - new ParallelRequestDelegate<>( - Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT, this); - - aiNodesDelegate.addRequest( - () -> { - aiNodeWrapper.start(); - return null; - }); - - try { - aiNodesDelegate.requestAll(); - } catch (final SQLException e) { - logger.error("Start aiNodes failed", e); - } - } - public String getTestClassName() { final StackTraceElement[] stack = Thread.currentThread().getStackTrace(); for (final StackTraceElement stackTraceElement : stack) { @@ -433,7 +413,7 @@ public void checkClusterStatus( if (showClusterResp.getNodeStatus().size() != configNodeWrapperList.size() + dataNodeWrapperList.size() - + aiNodeWrapperList.size()) { + + extraNodeWrappers.size()) { passed = false; nodeSizePassed = false; actualNodeSize = showClusterResp.getNodeStatusSize(); @@ -465,7 +445,7 @@ public void checkClusterStatus( processStatusMap.put(nodeWrapper, 0); } } - for (AINodeWrapper nodeWrapper : aiNodeWrapperList) { + for (AbstractNodeWrapper nodeWrapper : extraNodeWrappers) { boolean alive = nodeWrapper.getInstance().isAlive(); if (!alive) { processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); @@ -568,14 +548,14 @@ private void handleProcessStatus(Map processStatus configNodeWrapper.start(); } } - for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) { - if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) { + for (AbstractNodeWrapper extraNodeWrapper : extraNodeWrappers) { + if (portOccupationMap.containsValue(extraNodeWrapper.getPid())) { logger.info( - "A port is occupied by another AINode {}-{}, restart it", - aiNodeWrapper.getIpAndPortString(), - aiNodeWrapper.getPid()); - aiNodeWrapper.stop(); - aiNodeWrapper.start(); + "A port is occupied by another node {}-{}, restart it", + extraNodeWrapper.getIpAndPortString(), + extraNodeWrapper.getPid()); + extraNodeWrapper.stop(); + extraNodeWrapper.start(); } } } catch (IOException e) { @@ -592,8 +572,8 @@ private void handleProcessStatus(Map processStatus public void cleanClusterEnvironment() { final List allNodeWrappers = Stream.concat( - dataNodeWrapperList.stream(), - Stream.concat(configNodeWrapperList.stream(), aiNodeWrapperList.stream())) + Stream.concat(configNodeWrapperList.stream(), dataNodeWrapperList.stream()), + extraNodeWrappers.stream()) .collect(Collectors.toList()); allNodeWrappers.stream() .findAny() @@ -1045,6 +1025,7 @@ public void dumpTestJVMSnapshot() { public List getNodeWrapperList() { final List result = new ArrayList<>(configNodeWrapperList); result.addAll(dataNodeWrapperList); + result.addAll(extraNodeWrappers); return result; } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java new file mode 100644 index 0000000000000..cde3aaa923523 --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.it.env.cluster.node; + +import org.apache.iotdb.it.env.cluster.EnvUtils; +import org.apache.iotdb.it.framework.IoTDBTestLogger; +import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate; +import org.apache.iotdb.itbase.runtime.RequestDelegate; + +import org.slf4j.Logger; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.iotdb.it.env.cluster.ClusterConstant.NODE_START_TIMEOUT; + +public class AINodeStarter { + private static final Logger logger = IoTDBTestLogger.logger; + + private AINodeStarter() {} + + public static AINodeWrapper startAINode( + final String seedConfigNode, + final int clusterIngressPort, + final String testClassName, + final String testMethodName, + final int clusterIndex, + final long startTime, + final List killPoints, + final Consumer nodeRegister, + final Runnable dumpTestJVMSnapshot) { + final AINodeWrapper aiNodeWrapper = + new AINodeWrapper( + seedConfigNode, + clusterIngressPort, + testClassName, + testMethodName, + clusterIndex, + EnvUtils.searchAvailablePorts(), + startTime); + nodeRegister.accept(aiNodeWrapper); + aiNodeWrapper.setKillPoints(killPoints); + aiNodeWrapper.createNodeDir(); + aiNodeWrapper.createLogDir(); + + final RequestDelegate aiNodesDelegate = + new ParallelRequestDelegate<>( + Collections.singletonList(aiNodeWrapper.getIpAndPortString()), + NODE_START_TIMEOUT, + dumpTestJVMSnapshot); + aiNodesDelegate.addRequest( + () -> { + aiNodeWrapper.start(); + return null; + }); + + try { + aiNodesDelegate.requestAll(); + } catch (final SQLException e) { + logger.error("Start AINode {} failed", aiNodeWrapper.getId(), e); + throw new AssertionError(); + } + return aiNodeWrapper; + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java index 437a6da6e5b13..26fd41b466c0e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java @@ -37,13 +37,20 @@ */ public class ParallelRequestDelegate extends RequestDelegate { private final int taskTimeoutSeconds; - private final AbstractEnv env; + private final Runnable dumpTestJVMSnapshot; public ParallelRequestDelegate( final List endpoints, final int taskTimeoutSeconds, final AbstractEnv env) { + this(endpoints, taskTimeoutSeconds, env::dumpTestJVMSnapshot); + } + + public ParallelRequestDelegate( + final List endpoints, + final int taskTimeoutSeconds, + final Runnable dumpTestJVMSnapshot) { super(endpoints); this.taskTimeoutSeconds = taskTimeoutSeconds; - this.env = env; + this.dumpTestJVMSnapshot = dumpTestJVMSnapshot; } public List requestAll() throws SQLException { @@ -60,7 +67,7 @@ public List requestAll() throws SQLException { } catch (ExecutionException e) { exceptions[i] = e; } catch (InterruptedException | TimeoutException e) { - env.dumpTestJVMSnapshot(); + dumpTestJVMSnapshot.run(); for (int j = i; j < getEndpoints().size(); j++) { resultFutures.get(j).cancel(true); }