Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,44 @@

package org.apache.iotdb.it.env.cluster.env;

import org.apache.iotdb.it.env.cluster.node.AINodeStarter;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;

import java.util.List;

public class AIEnv extends AbstractEnv {

@Override
public void initClusterEnvironment() {
initClusterEnvironment(1, 1);
}

@Override
public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
super.initEnvironment(configNodesNum, dataNodesNum, 600, true);
super.initEnvironment(configNodesNum, dataNodesNum, 600);
}

@Override
public void initClusterEnvironment(
int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, true);
super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
}

@Override
protected void initExtraNodes(
final List<ConfigNodeWrapper> configNodeWrappers,
final List<DataNodeWrapper> dataNodeWrappers,
final String testClassName) {
AINodeStarter.startAINode(
configNodeWrappers.get(0).getIpAndPortString(),
dataNodeWrappers.get(0).getPort(),
testClassName,
testMethodName,
index,
startTime,
extraNodeKillPoints,
this::registerExtraNode,
this::dumpTestJVMSnapshot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iotdb.it.env.cluster.config.MppConfigNodeConfig;
import org.apache.iotdb.it.env.cluster.config.MppDataNodeConfig;
import org.apache.iotdb.it.env.cluster.config.MppJVMConfig;
import org.apache.iotdb.it.env.cluster.node.AINodeWrapper;
import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
Expand Down Expand Up @@ -101,14 +100,15 @@ public abstract class AbstractEnv implements BaseEnv {
private final Random rand = new Random();
protected List<ConfigNodeWrapper> configNodeWrapperList = Collections.emptyList();
protected List<DataNodeWrapper> dataNodeWrapperList = Collections.emptyList();
protected List<AINodeWrapper> aiNodeWrapperList = Collections.emptyList();
protected List<AbstractNodeWrapper> extraNodeWrappers = Collections.emptyList();
protected String testMethodName = null;
protected int index = 0;
protected long startTime;
protected int retryCount = 30;
private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
private List<String> configNodeKillPoints = new ArrayList<>();
private List<String> dataNodeKillPoints = new ArrayList<>();
protected List<String> extraNodeKillPoints = new ArrayList<>();

/**
* This config object stores the properties set by developers during the test. It will be cleared
Expand Down Expand Up @@ -169,17 +169,10 @@ protected void initEnvironment(final int configNodesNum, final int dataNodesNum)

protected void initEnvironment(
final int configNodesNum, final int dataNodesNum, final int testWorkingRetryCount) {
initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, false);
}

protected void initEnvironment(
final int configNodesNum,
final int dataNodesNum,
final int retryCount,
final boolean addAINode) {
this.retryCount = retryCount;
this.retryCount = testWorkingRetryCount;
this.configNodeWrapperList = new ArrayList<>();
this.dataNodeWrapperList = new ArrayList<>();
this.extraNodeWrappers = new ArrayList<>();

clientManager =
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
Expand Down Expand Up @@ -258,14 +251,34 @@ protected void initEnvironment(
throw new AssertionError();
}

if (addAINode) {
this.aiNodeWrapperList = new ArrayList<>();
startAINode(seedConfigNode, this.dataNodeWrapperList.get(0).getPort(), testClassName);
}
initExtraNodes(configNodeWrapperList, dataNodeWrapperList, testClassName);

checkClusterStatusWithoutUnknown();
}

/**
* Hook method for subclasses to initialize and start extra node types beyond the core ConfigNode
* and DataNode (e.g., AINode, StreamNode, ProxyNode).
*
* <p>Subclasses should create node wrappers, add them to {@link #extraNodeWrappers}, configure
* kill points via {@link #extraNodeKillPoints}, and start the nodes. Subclasses have direct
* access to protected fields: {@code testMethodName}, {@code index}, {@code startTime}.
*
* @param configNodeWrappers list of all ConfigNode wrappers in the cluster (unmodifiable)
* @param dataNodeWrappers list of all DataNode wrappers in the cluster (unmodifiable)
* @param testClassName the test class name for logging and identification purposes
*/
protected void initExtraNodes(
final List<ConfigNodeWrapper> configNodeWrappers,
final List<DataNodeWrapper> dataNodeWrappers,
final String testClassName) {
// Default: no extra nodes. Subclasses override to add nodes.
}

protected void registerExtraNode(final AbstractNodeWrapper nodeWrapper) {
extraNodeWrappers.add(nodeWrapper);
}

private ConfigNodeWrapper newConfigNode() {
final ConfigNodeWrapper configNodeWrapper =
new ConfigNodeWrapper(
Expand Down Expand Up @@ -309,39 +322,6 @@ private DataNodeWrapper newDataNode() {
return dataNodeWrapper;
}

private void startAINode(
final String seedConfigNode, final int clusterIngressPort, final String testClassName) {
final String aiNodeEndPoint;
final AINodeWrapper aiNodeWrapper =
new AINodeWrapper(
seedConfigNode,
clusterIngressPort,
testClassName,
testMethodName,
index,
EnvUtils.searchAvailablePorts(),
startTime);
aiNodeWrapperList.add(aiNodeWrapper);
aiNodeEndPoint = aiNodeWrapper.getIpAndPortString();
aiNodeWrapper.createNodeDir();
aiNodeWrapper.createLogDir();
final RequestDelegate<Void> aiNodesDelegate =
new ParallelRequestDelegate<>(
Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT, this);

aiNodesDelegate.addRequest(
() -> {
aiNodeWrapper.start();
return null;
});

try {
aiNodesDelegate.requestAll();
} catch (final SQLException e) {
logger.error("Start aiNodes failed", e);
}
}

public String getTestClassName() {
final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
for (final StackTraceElement stackTraceElement : stack) {
Expand Down Expand Up @@ -433,7 +413,7 @@ public void checkClusterStatus(
if (showClusterResp.getNodeStatus().size()
!= configNodeWrapperList.size()
+ dataNodeWrapperList.size()
+ aiNodeWrapperList.size()) {
+ extraNodeWrappers.size()) {
passed = false;
nodeSizePassed = false;
actualNodeSize = showClusterResp.getNodeStatusSize();
Expand Down Expand Up @@ -465,7 +445,7 @@ public void checkClusterStatus(
processStatusMap.put(nodeWrapper, 0);
}
}
for (AINodeWrapper nodeWrapper : aiNodeWrapperList) {
for (AbstractNodeWrapper nodeWrapper : extraNodeWrappers) {
boolean alive = nodeWrapper.getInstance().isAlive();
if (!alive) {
processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor());
Expand Down Expand Up @@ -568,14 +548,14 @@ private void handleProcessStatus(Map<AbstractNodeWrapper, Integer> processStatus
configNodeWrapper.start();
}
}
for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) {
if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) {
for (AbstractNodeWrapper extraNodeWrapper : extraNodeWrappers) {
if (portOccupationMap.containsValue(extraNodeWrapper.getPid())) {
logger.info(
"A port is occupied by another AINode {}-{}, restart it",
aiNodeWrapper.getIpAndPortString(),
aiNodeWrapper.getPid());
aiNodeWrapper.stop();
aiNodeWrapper.start();
"A port is occupied by another node {}-{}, restart it",
extraNodeWrapper.getIpAndPortString(),
extraNodeWrapper.getPid());
extraNodeWrapper.stop();
extraNodeWrapper.start();
}
}
} catch (IOException e) {
Expand All @@ -592,8 +572,8 @@ private void handleProcessStatus(Map<AbstractNodeWrapper, Integer> processStatus
public void cleanClusterEnvironment() {
final List<AbstractNodeWrapper> allNodeWrappers =
Stream.concat(
dataNodeWrapperList.stream(),
Stream.concat(configNodeWrapperList.stream(), aiNodeWrapperList.stream()))
Stream.concat(configNodeWrapperList.stream(), dataNodeWrapperList.stream()),
extraNodeWrappers.stream())
.collect(Collectors.toList());
allNodeWrappers.stream()
.findAny()
Expand Down Expand Up @@ -1045,6 +1025,7 @@ public void dumpTestJVMSnapshot() {
public List<AbstractNodeWrapper> getNodeWrapperList() {
final List<AbstractNodeWrapper> result = new ArrayList<>(configNodeWrapperList);
result.addAll(dataNodeWrapperList);
result.addAll(extraNodeWrappers);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.it.env.cluster.node;

import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.framework.IoTDBTestLogger;
import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate;
import org.apache.iotdb.itbase.runtime.RequestDelegate;

import org.slf4j.Logger;

import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

import static org.apache.iotdb.it.env.cluster.ClusterConstant.NODE_START_TIMEOUT;

public class AINodeStarter {
private static final Logger logger = IoTDBTestLogger.logger;

private AINodeStarter() {}

public static AINodeWrapper startAINode(
final String seedConfigNode,
final int clusterIngressPort,
final String testClassName,
final String testMethodName,
final int clusterIndex,
final long startTime,
final List<String> killPoints,
final Consumer<AINodeWrapper> nodeRegister,
final Runnable dumpTestJVMSnapshot) {
final AINodeWrapper aiNodeWrapper =
new AINodeWrapper(
seedConfigNode,
clusterIngressPort,
testClassName,
testMethodName,
clusterIndex,
EnvUtils.searchAvailablePorts(),
startTime);
nodeRegister.accept(aiNodeWrapper);
aiNodeWrapper.setKillPoints(killPoints);
aiNodeWrapper.createNodeDir();
aiNodeWrapper.createLogDir();

final RequestDelegate<Void> aiNodesDelegate =
new ParallelRequestDelegate<>(
Collections.singletonList(aiNodeWrapper.getIpAndPortString()),
NODE_START_TIMEOUT,
dumpTestJVMSnapshot);
aiNodesDelegate.addRequest(
() -> {
aiNodeWrapper.start();
return null;
});

try {
aiNodesDelegate.requestAll();
} catch (final SQLException e) {
logger.error("Start AINode {} failed", aiNodeWrapper.getId(), e);
throw new AssertionError();
}
return aiNodeWrapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,20 @@
*/
public class ParallelRequestDelegate<T> extends RequestDelegate<T> {
private final int taskTimeoutSeconds;
private final AbstractEnv env;
private final Runnable dumpTestJVMSnapshot;

public ParallelRequestDelegate(
final List<String> endpoints, final int taskTimeoutSeconds, final AbstractEnv env) {
this(endpoints, taskTimeoutSeconds, env::dumpTestJVMSnapshot);
}

public ParallelRequestDelegate(
final List<String> endpoints,
final int taskTimeoutSeconds,
final Runnable dumpTestJVMSnapshot) {
super(endpoints);
this.taskTimeoutSeconds = taskTimeoutSeconds;
this.env = env;
this.dumpTestJVMSnapshot = dumpTestJVMSnapshot;
}

public List<T> requestAll() throws SQLException {
Expand All @@ -60,7 +67,7 @@ public List<T> requestAll() throws SQLException {
} catch (ExecutionException e) {
exceptions[i] = e;
} catch (InterruptedException | TimeoutException e) {
env.dumpTestJVMSnapshot();
dumpTestJVMSnapshot.run();
for (int j = i; j < getEndpoints().size(); j++) {
resultFutures.get(j).cancel(true);
}
Expand Down
Loading