From c1ee977bed1b9e3252c1a238e435639045290923 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 16 Jun 2026 18:18:59 +0800 Subject: [PATCH] Fix duplicate scheduling in procedure execution (#17902) * Fix duplicate scheduling in procedure execution * Fix delayed procedure deduplication and semaphore release * Fix SQL parser error handler traversal * Fix pipe procedure lock release race * Fix procedure lock wait scheduling (cherry picked from commit c25849a093b466966353ecd7b63f722535b8d5b3) (cherry picked from commit 0a45a3b7b708c3b436e6b9b4c6aa9b65bede7076) --- .../task/PipeTaskCoordinatorLock.java | 13 +- .../iotdb/confignode/procedure/Procedure.java | 20 ++++ .../procedure/ProcedureExecutor.java | 113 ++++++++++++------ .../procedure/TimeoutExecutorThread.java | 32 ++++- .../procedure/impl/StateMachineProcedure.java | 3 +- .../procedure/impl/cq/CreateCQProcedure.java | 2 +- .../impl/node/AbstractNodeProcedure.java | 12 ++ .../pipe/AbstractOperatePipeProcedureV2.java | 23 ++-- .../plugin/CreatePipePluginProcedure.java | 2 +- .../pipe/plugin/DropPipePluginProcedure.java | 2 +- .../PipeHandleLeaderChangeProcedure.java | 2 +- .../PipeHandleMetaChangeProcedure.java | 2 +- .../impl/pipe/task/DropPipeProcedureV2.java | 2 +- .../impl/pipe/task/StartPipeProcedureV2.java | 2 +- .../impl/pipe/task/StopPipeProcedureV2.java | 2 +- .../impl/schema/DeleteDatabaseProcedure.java | 2 +- .../schema/DeleteLogicalViewProcedure.java | 2 +- .../schema/DeleteTimeSeriesProcedure.java | 2 +- .../impl/trigger/CreateTriggerProcedure.java | 2 +- .../impl/trigger/DropTriggerProcedure.java | 2 +- .../procedure/scheduler/LockQueue.java | 10 +- .../scheduler/SimpleProcedureScheduler.java | 37 +++++- .../task/PipeTaskCoordinatorLockTest.java | 60 ++++++++++ .../confignode/procedure/TestLockRegime.java | 23 ++++ .../procedure/TestProcedureExecutor.java | 92 ++++++++++++++ .../procedure/entity/SimpleLockProcedure.java | 11 +- .../PipeHandleLeaderChangeProcedureTest.java | 47 ++++++++ 27 files changed, 433 insertions(+), 89 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index b86c556f20df8..58347f35af5a6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -43,16 +43,9 @@ public class PipeTaskCoordinatorLock { public void lock() { LOGGER.debug( "PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName()); - try { - semaphore.acquire(); - LOGGER.debug( - "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.error( - "Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}", - Thread.currentThread().getName()); - } + semaphore.acquireUninterruptibly(); + LOGGER.debug( + "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); } public boolean tryLock() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java index 91af03d3971be..862bc449c7319 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -60,6 +61,7 @@ public abstract class Procedure implements Comparable> { private volatile long lastUpdate; private final AtomicReference result = new AtomicReference<>(); + private final AtomicBoolean executing = new AtomicBoolean(false); private volatile boolean locked = false; private boolean lockedWhenLoading = false; @@ -233,6 +235,16 @@ protected void releaseLock(Env env) { // no op } + /** + * Called after an execution attempt returns {@link ProcedureLockState#LOCK_EVENT_WAIT}. Override + * it to put the procedure into the corresponding lock wait queue. + * + * @param env env + */ + protected void waitForLock(Env env) { + // no op + } + /** * Used to keep procedure lock even when the procedure is yielded or suspended. * @@ -254,6 +266,14 @@ protected boolean isYieldAfterExecution(Env env) { } // -------------------------Internal methods - called by the procedureExecutor------------------ + final boolean tryAcquireExecution() { + return executing.compareAndSet(false, true); + } + + final void releaseExecution() { + executing.set(false); + } + /** * Internal method called by the ProcedureExecutor that starts the user-level code execute(). * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 0d8368583b4eb..1633f78eec903 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -21,7 +21,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; @@ -40,7 +39,6 @@ import java.util.Deque; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -82,6 +80,16 @@ public class ProcedureExecutor { private final Env environment; private final IProcedureStore store; + private static final class LockStateResult { + private final ProcedureLockState lockState; + private final Procedure procedure; + + private LockStateResult(ProcedureLockState lockState, Procedure procedure) { + this.lockState = lockState; + this.procedure = procedure; + } + } + public ProcedureExecutor( final Env environment, final IProcedureStore store, final ProcedureScheduler scheduler) { this.environment = environment; @@ -320,32 +328,38 @@ private void executeProcedure(Procedure proc) { return; } ProcedureLockState lockState = null; + Procedure lockEventWaitProcedure = null; try { do { if (!rootProcStack.acquire()) { if (rootProcStack.setRollback()) { - lockState = executeRootStackRollback(rootProcId, rootProcStack); + LockStateResult lockStateResult = + executeRootStackRollback(rootProcId, rootProcStack); + lockState = lockStateResult.lockState; switch (lockState) { case LOCK_ACQUIRED: break; case LOCK_EVENT_WAIT: - LOG.info("LOCK_EVENT_WAIT rollback {}", proc); + LOG.info("LOCK_EVENT_WAIT rollback {}", lockStateResult.procedure); rootProcStack.unsetRollback(); + lockEventWaitProcedure = lockStateResult.procedure; break; case LOCK_YIELD_WAIT: rootProcStack.unsetRollback(); - scheduler.yield(proc); + scheduler.yield(lockStateResult.procedure); break; default: throw new UnsupportedOperationException(); } } else { if (!proc.wasExecuted()) { - switch (executeRollback(proc)) { + lockState = executeRollback(proc); + switch (lockState) { case LOCK_ACQUIRED: break; case LOCK_EVENT_WAIT: LOG.info("LOCK_EVENT_WAIT can't rollback child running for {}", proc); + lockEventWaitProcedure = proc; break; case LOCK_YIELD_WAIT: scheduler.yield(proc); @@ -357,19 +371,25 @@ private void executeProcedure(Procedure proc) { } break; } - lockState = acquireLock(proc); - switch (lockState) { - case LOCK_ACQUIRED: - executeProcedure(rootProcStack, proc); - break; - case LOCK_YIELD_WAIT: - case LOCK_EVENT_WAIT: - LOG.info("{} lockstate is {}", proc, lockState); - break; - default: - throw new UnsupportedOperationException(); + try { + lockState = acquireLock(proc); + switch (lockState) { + case LOCK_ACQUIRED: + executeProcedure(rootProcStack, proc); + break; + case LOCK_YIELD_WAIT: + case LOCK_EVENT_WAIT: + LOG.info("{} lockstate is {}", proc, lockState); + if (lockState == ProcedureLockState.LOCK_EVENT_WAIT) { + lockEventWaitProcedure = proc; + } + break; + default: + throw new UnsupportedOperationException(); + } + } finally { + rootProcStack.release(); } - rootProcStack.release(); if (proc.isSuccess()) { // update metrics on finishing the procedure @@ -387,9 +407,9 @@ private void executeProcedure(Procedure proc) { } finally { // Only after procedure has completed execution can it be allowed to be rescheduled to prevent // data races - if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) { - LOG.info("procedureId {} wait for lock.", proc.getProcId()); - ((ConfigNodeProcedureEnv) this.environment).getNodeLock().waitProcedure(proc); + if (lockEventWaitProcedure != null) { + LOG.info("procedureId {} wait for lock.", lockEventWaitProcedure.getProcId()); + lockEventWaitProcedure.waitForLock(this.environment); } } } @@ -404,6 +424,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure p if (proc.getState() != ProcedureState.RUNNABLE) { LOG.error( "The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc); + releaseLock(proc, false); return; } boolean reExecute; @@ -570,8 +591,8 @@ private void yieldProcedure(Procedure proc) { * @param procedureStack root procedure stack * @return lock state */ - private ProcedureLockState executeRootStackRollback( - Long rootProcId, RootProcedureStack procedureStack) { + private LockStateResult executeRootStackRollback( + Long rootProcId, RootProcedureStack procedureStack) { Procedure rootProcedure = procedures.get(rootProcId); ProcedureException exception = rootProcedure.getException(); if (exception == null) { @@ -590,7 +611,7 @@ private ProcedureLockState executeRootStackRollback( } ProcedureLockState lockState = acquireLock(procedure); if (lockState != ProcedureLockState.LOCK_ACQUIRED) { - return lockState; + return new LockStateResult<>(lockState, procedure); } lockState = executeRollback(procedure); releaseLock(procedure, false); @@ -598,11 +619,11 @@ private ProcedureLockState executeRootStackRollback( boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED; abortRollback |= !isRunning() || !store.isRunning(); if (abortRollback) { - return lockState; + return new LockStateResult<>(lockState, procedure); } if (!procedure.isFinished() && procedure.isYieldAfterExecution(this.environment)) { - return ProcedureLockState.LOCK_YIELD_WAIT; + return new LockStateResult<>(ProcedureLockState.LOCK_YIELD_WAIT, procedure); } if (procedure != rootProcedure) { @@ -612,7 +633,7 @@ private ProcedureLockState executeRootStackRollback( LOG.info("Rolled back {}, time duration is {}", rootProcedure, rootProcedure.elapsedTime()); rootProcedureCleanup(rootProcedure); - return ProcedureLockState.LOCK_ACQUIRED; + return new LockStateResult<>(ProcedureLockState.LOCK_ACQUIRED, rootProcedure); } private ProcedureLockState acquireLock(Procedure proc) { @@ -728,16 +749,33 @@ public void run() { Thread.sleep(1000); continue; } - this.activeProcedure.set(procedure); - activeExecutorCount.incrementAndGet(); - startTime.set(System.currentTimeMillis()); - executeProcedure(procedure); - activeExecutorCount.decrementAndGet(); - LOG.trace( - "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); - this.activeProcedure.set(null); - lastUpdated = System.currentTimeMillis(); - startTime.set(lastUpdated); + boolean executionAcquired = false; + while (isRunning() && !(executionAcquired = procedure.tryAcquireExecution())) { + Thread.sleep(10); + } + if (!executionAcquired) { + continue; + } + try { + this.activeProcedure.set(procedure); + activeExecutorCount.incrementAndGet(); + startTime.set(System.currentTimeMillis()); + try { + executeProcedure(procedure); + } finally { + procedure.releaseExecution(); + activeExecutorCount.decrementAndGet(); + LOG.trace( + "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); + this.activeProcedure.set(null); + lastUpdated = System.currentTimeMillis(); + startTime.set(lastUpdated); + } + } catch (Exception e) { + LOG.warn( + "Exception happened when worker {} execute procedure {}", getName(), procedure, e); + throw e; + } } } catch (Exception e) { @@ -748,6 +786,7 @@ public void run() { this.activeProcedure.get(), e); } + this.activeProcedure.set(null); } finally { LOG.info("Procedure worker {} terminated.", getName()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java index 5aaf9a623f523..c998ad903c208 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java @@ -37,11 +37,13 @@ public TimeoutExecutorThread( } public void add(Procedure procedure) { - queue.add(new ProcedureDelayContainer<>(procedure)); + ProcedureDelayContainer delayTask = new ProcedureDelayContainer<>(procedure); + queue.remove(delayTask); + queue.add(delayTask); } public boolean remove(Procedure procedure) { - return queue.remove(new ProcedureDelayContainer<>(procedure)); + return queue.remove(new ProcedureDelayContainer<>(procedure)) || procedure.isFinished(); } private ProcedureDelayContainer takeQuietly() { @@ -62,10 +64,15 @@ public void run() { } Procedure procedure = delayTask.getProcedure(); if (procedure instanceof InternalProcedure) { + if (procedure.isFinished()) { + continue; + } InternalProcedure internal = (InternalProcedure) procedure; internal.periodicExecute(executor.getEnvironment()); - procedure.updateTimestamp(); - queue.add(delayTask); + if (!procedure.isFinished()) { + procedure.updateTimestamp(); + queue.add(delayTask); + } } else { if (procedure.setTimeoutFailure(executor.getEnvironment())) { long rootProcId = executor.getRootProcedureId(procedure); @@ -92,6 +99,23 @@ public Procedure getProcedure() { return procedure; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ProcedureDelayContainer)) { + return false; + } + ProcedureDelayContainer that = (ProcedureDelayContainer) o; + return procedure == that.procedure; + } + + @Override + public int hashCode() { + return System.identityHashCode(procedure); + } + @Override public long getDelay(TimeUnit unit) { long delay = procedure.getTimeoutTimestamp() - System.currentTimeMillis(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java index f698735ee95be..400da5653181c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java @@ -195,7 +195,8 @@ private void addNextStateAndCalculateCycles() { nextState); } } - if (getStateId(getCurrentState()) == stateToBeAdded) { + final TState currentState = getCurrentState(); + if (currentState != null && getStateId(currentState) == stateToBeAdded) { cycles++; } else { cycles = 0; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index af7f968e8a57c..76df7765c9a57 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -295,7 +295,7 @@ public boolean equals(Object o) { } CreateCQProcedure that = (CreateCQProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && isGeneratedByPipe == that.isGeneratedByPipe && firstExecutionTime == that.firstExecutionTime diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java index b141027917366..6cade537f1fbb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java @@ -56,6 +56,18 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced } } + @Override + protected void waitForLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { + configNodeProcedureEnv.getSchedulerLock().lock(); + try { + configNodeProcedureEnv + .getNodeLock() + .waitProcedure(this, configNodeProcedureEnv.getScheduler()); + } finally { + configNodeProcedureEnv.getSchedulerLock().unlock(); + } + } + @Override protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { configNodeProcedureEnv.getSchedulerLock().lock(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index 857d5733f6a7a..43620ac915796 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -133,12 +133,7 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced LOGGER.warn("ProcedureId {}: LOCK_EVENT_WAIT. Without acquiring pipe lock.", getProcId()); } else { LOGGER.debug("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be released.", getProcId()); - configNodeProcedureEnv - .getConfigManager() - .getPipeManager() - .getPipeTaskCoordinator() - .unlock(); - pipeTaskInfo = null; + releasePipeTaskCoordinatorLock(configNodeProcedureEnv); } break; default: @@ -152,12 +147,7 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced "ProcedureId {}: {}. Invalid lock state. Pipe lock will be released.", getProcId(), procedureLockState); - configNodeProcedureEnv - .getConfigManager() - .getPipeManager() - .getPipeTaskCoordinator() - .unlock(); - pipeTaskInfo = null; + releasePipeTaskCoordinatorLock(configNodeProcedureEnv); } break; } @@ -181,11 +171,16 @@ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { } PipeProcedureMetrics.getInstance() .updateTimer(this.getOperation().getName(), this.elapsedTime()); - configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock(); - pipeTaskInfo = null; + releasePipeTaskCoordinatorLock(configNodeProcedureEnv); } } + private void releasePipeTaskCoordinatorLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { + // Clear before releasing the semaphore to avoid clobbering a re-scheduled execution's marker. + pipeTaskInfo = null; + configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock(); + } + protected abstract PipeTaskOperation getOperation(); /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java index f4fa738428d43..22fe0ef40f279 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java @@ -314,7 +314,7 @@ public boolean equals(Object that) { if (that instanceof CreatePipePluginProcedure) { CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) that; return thatProcedure.getProcId() == getProcId() - && thatProcedure.getCurrentState().equals(getCurrentState()) + && Objects.equals(thatProcedure.getCurrentState(), getCurrentState()) && thatProcedure.getCycles() == getCycles() && thatProcedure.pipePluginMeta.equals(pipePluginMeta); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index efbe1ee6ccdda..5deb0ee4bf31f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -275,7 +275,7 @@ public boolean equals(Object that) { if (that instanceof DropPipePluginProcedure) { final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure) that; return thatProcedure.getProcId() == getProcId() - && thatProcedure.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProcedure.getCurrentState(), this.getCurrentState()) && thatProcedure.getCycles() == this.getCycles() && (thatProcedure.pluginName).equals(pluginName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index 61f6f3cae2aaf..598d8192e06af 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -183,7 +183,7 @@ public boolean equals(Object o) { } PipeHandleLeaderChangeProcedure that = (PipeHandleLeaderChangeProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && this.regionGroupToOldAndNewLeaderPairMap.equals( that.regionGroupToOldAndNewLeaderPairMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index 401859f0a7e0a..def062359e4c4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -186,7 +186,7 @@ public boolean equals(Object o) { } PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && needWriteConsensusOnConfigNodes == that.needWriteConsensusOnConfigNodes && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java index 0c7042caf3f07..0f9a8bc0d8c84 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java @@ -165,7 +165,7 @@ public boolean equals(Object o) { } DropPipeProcedureV2 that = (DropPipeProcedureV2) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && pipeName.equals(that.pipeName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java index fe36137b35f47..29cb2b51ab4e5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java @@ -184,7 +184,7 @@ public boolean equals(Object o) { } StartPipeProcedureV2 that = (StartPipeProcedureV2) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && pipeName.equals(that.pipeName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java index b2e1a584ec54a..44ca8a7bd5c1a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java @@ -187,7 +187,7 @@ public boolean equals(Object o) { } StopPipeProcedureV2 that = (StopPipeProcedureV2) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && isStoppedByRuntimeExceptionBeforeStop == that.isStoppedByRuntimeExceptionBeforeStop && pipeName.equals(that.pipeName); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 7288090e81559..b6ad21128af3b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -303,7 +303,7 @@ public boolean equals(Object that) { if (that instanceof DeleteDatabaseProcedure) { DeleteDatabaseProcedure thatProc = (DeleteDatabaseProcedure) that; return thatProc.getProcId() == this.getProcId() - && thatProc.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProc.getCurrentState(), this.getCurrentState()) && thatProc.getCycles() == this.getCycles() && thatProc.isGeneratedByPipe == this.isGeneratedByPipe && thatProc.deleteDatabaseSchema.equals(this.getDeleteDatabaseSchema()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java index aaf7bc1a7bff0..fd5d0d52f227c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java @@ -303,7 +303,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; DeleteLogicalViewProcedure that = (DeleteLogicalViewProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == that.getCycles() && isGeneratedByPipe == that.isGeneratedByPipe && patternTree.equals(that.patternTree); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java index 74f21abc72da2..0a223de1525f3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java @@ -367,7 +367,7 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) return false; final DeleteTimeSeriesProcedure that = (DeleteTimeSeriesProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == getCycles() && this.isGeneratedByPipe == that.isGeneratedByPipe && this.patternTree.equals(that.patternTree); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java index 74f4112075a7e..548ba1ee22a35 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java @@ -304,7 +304,7 @@ public boolean equals(Object that) { if (that instanceof CreateTriggerProcedure) { CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that; return thatProc.getProcId() == this.getProcId() - && thatProc.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProc.getCurrentState(), this.getCurrentState()) && thatProc.getCycles() == this.getCycles() && thatProc.isGeneratedByPipe == this.isGeneratedByPipe && thatProc.triggerInformation.equals(this.triggerInformation); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java index 19bfcdc30d2ba..958000d2b3fd6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java @@ -178,7 +178,7 @@ public boolean equals(Object that) { if (that instanceof DropTriggerProcedure) { DropTriggerProcedure thatProc = (DropTriggerProcedure) that; return thatProc.getProcId() == this.getProcId() - && thatProc.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProc.getCurrentState(), this.getCurrentState()) && thatProc.getCycles() == this.getCycles() && thatProc.isGeneratedByPipe == this.isGeneratedByPipe && (thatProc.triggerName).equals(this.triggerName); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java index 832e339c0aede..e2f5935a909ab 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java @@ -45,7 +45,15 @@ public boolean releaseLock(Procedure procedure) { return true; } - public void waitProcedure(Procedure procedure) { + public void waitProcedure(Procedure procedure, ProcedureScheduler procedureScheduler) { + if (lockOwnerProcedure == null) { + procedureScheduler.addFront(procedure); + return; + } + if (deque.stream() + .anyMatch(waitingProcedure -> waitingProcedure.getProcId() == procedure.getProcId())) { + return; + } deque.addLast(procedure); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java index 3cd5ceacf4b6d..94b6f3119308a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java @@ -22,6 +22,7 @@ import org.apache.iotdb.confignode.procedure.Procedure; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.locks.ReentrantLock; /** Simple scheduler for procedures */ public class SimpleProcedureScheduler extends AbstractProcedureScheduler { @@ -48,6 +49,7 @@ public void clear() { schedLock(); try { runnables.clear(); + waitings.clear(); } finally { schedUnlock(); } @@ -68,12 +70,37 @@ public int queueSize() { return runnables.size(); } - public void addWaiting(Procedure proc) { - waitings.add(proc); + public void waitProcedure(Procedure proc, ReentrantLock lock) { + boolean signal = false; + schedLock(); + try { + if (lock.isLocked()) { + waitings.add(proc); + } else { + runnables.addFirst(proc); + signal = true; + } + } finally { + schedUnlock(); + } + if (signal) { + signalAll(); + } } - public void releaseWaiting() { - runnables.addAll(waitings); - waitings.clear(); + public void releaseWaiting(ReentrantLock lock) { + boolean signal; + schedLock(); + try { + lock.unlock(); + signal = !waitings.isEmpty(); + runnables.addAll(waitings); + waitings.clear(); + } finally { + schedUnlock(); + } + if (signal) { + signalAll(); + } } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java new file mode 100644 index 0000000000000..74d5d821d76c3 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.coordinator.task; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class PipeTaskCoordinatorLockTest { + + @Test + public void testInterruptedThreadDoesNotAcquireWithoutPermit() throws Exception { + PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock(); + lock.lock(); + + CountDownLatch waiting = new CountDownLatch(1); + AtomicBoolean acquired = new AtomicBoolean(false); + Thread thread = + new Thread( + () -> { + Thread.currentThread().interrupt(); + waiting.countDown(); + lock.lock(); + acquired.set(true); + lock.unlock(); + }); + thread.start(); + + Assert.assertTrue(waiting.await(3, TimeUnit.SECONDS)); + TimeUnit.MILLISECONDS.sleep(200); + Assert.assertFalse(acquired.get()); + + lock.unlock(); + thread.join(TimeUnit.SECONDS.toMillis(3)); + + Assert.assertFalse(thread.isAlive()); + Assert.assertTrue(acquired.get()); + Assert.assertFalse(lock.isLocked()); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java index 500a51e9e3d07..967611ca8eef1 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java @@ -19,7 +19,10 @@ package org.apache.iotdb.confignode.procedure; +import org.apache.iotdb.confignode.procedure.entity.NoopProcedure; import org.apache.iotdb.confignode.procedure.entity.SimpleLockProcedure; +import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; +import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil; import org.junit.Assert; @@ -43,4 +46,24 @@ public void testAcquireLock() { this.procExecutor, procIdList.stream().mapToLong(Long::longValue).toArray()); Assert.assertEquals(env.lockAcquireSeq.toString(), env.executeSeq.toString()); } + + @Test + public void testLockQueueDoesNotWakeDuplicateProcedure() { + LockQueue lockQueue = new LockQueue(); + SimpleProcedureScheduler scheduler = new SimpleProcedureScheduler(); + scheduler.start(); + + NoopProcedure lockOwner = new NoopProcedure(); + lockOwner.setProcId(0); + Assert.assertTrue(lockQueue.tryLock(lockOwner)); + + NoopProcedure procedure = new NoopProcedure(); + procedure.setProcId(1); + lockQueue.waitProcedure(procedure, scheduler); + lockQueue.waitProcedure(procedure, scheduler); + + Assert.assertEquals(1, lockQueue.wakeWaitingProcedures(scheduler)); + Assert.assertEquals(1, scheduler.size()); + scheduler.stop(); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java index dce7f2ba5dc48..ba5f635507a4a 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java @@ -23,11 +23,14 @@ import org.apache.iotdb.confignode.procedure.entity.NoopProcedure; import org.apache.iotdb.confignode.procedure.entity.StuckProcedure; import org.apache.iotdb.confignode.procedure.env.TestProcEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.state.ProcedureState; import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -96,6 +99,37 @@ public void testWorkerThreadStuck() throws InterruptedException { ProcedureTestUtil.waitForProcedure(procExecutor, busyProcId2); } + @Test + public void testDuplicatedSchedulingDoesNotExecuteConcurrently() throws InterruptedException { + BlockingProcedure blockingProcedure = new BlockingProcedure(); + long procId = procExecutor.submitProcedure(blockingProcedure); + + Assert.assertTrue(blockingProcedure.awaitExecution(30, TimeUnit.SECONDS)); + + procExecutor.getScheduler().addFront(blockingProcedure); + boolean duplicated = blockingProcedure.awaitExecution(3, TimeUnit.SECONDS); + + blockingProcedure.releaseExecutions(duplicated ? 2 : 1); + ProcedureTestUtil.waitForProcedure(procExecutor, procId); + + Assert.assertFalse(duplicated); + Assert.assertEquals(1, blockingProcedure.getExecutionCount()); + } + + @Test + public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws InterruptedException { + CompletingInternalProcedure internalProcedure = new CompletingInternalProcedure(); + + procExecutor.addInternalProcedure(internalProcedure); + procExecutor.addInternalProcedure(internalProcedure); + + Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS)); + Assert.assertFalse(internalProcedure.awaitExecution(300, TimeUnit.MILLISECONDS)); + Assert.assertEquals(1, internalProcedure.getExecutionCount()); + + Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure)); + } + private int waitThreadCount(final int expectedThreads) { long startTime = System.currentTimeMillis(); while (procExecutor.isRunning() @@ -107,4 +141,62 @@ private int waitThreadCount(final int expectedThreads) { } return procExecutor.getWorkerThreadCount(); } + + private static class BlockingProcedure extends Procedure { + + private final Semaphore entered = new Semaphore(0); + private final Semaphore finish = new Semaphore(0); + private final AtomicInteger executionCount = new AtomicInteger(); + + @Override + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + executionCount.incrementAndGet(); + entered.release(); + finish.acquire(); + return null; + } + + @Override + protected void rollback(TestProcEnv env) + throws IOException, InterruptedException, ProcedureException { + // No state to roll back. + } + + private boolean awaitExecution(long timeout, TimeUnit unit) throws InterruptedException { + return entered.tryAcquire(timeout, unit); + } + + private void releaseExecutions(int permits) { + finish.release(permits); + } + + private int getExecutionCount() { + return executionCount.get(); + } + } + + private static class CompletingInternalProcedure extends InternalProcedure { + + private final Semaphore entered = new Semaphore(0); + private final AtomicInteger executionCount = new AtomicInteger(); + + private CompletingInternalProcedure() { + super(0); + } + + @Override + protected void periodicExecute(TestProcEnv env) { + executionCount.incrementAndGet(); + entered.release(); + setState(ProcedureState.SUCCESS); + } + + private boolean awaitExecution(long timeout, TimeUnit unit) throws InterruptedException { + return entered.tryAcquire(timeout, unit); + } + + private int getExecutionCount() { + return executionCount.get(); + } + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java index ce9fea39d5589..42badd700799e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java @@ -53,18 +53,21 @@ protected ProcedureLockState acquireLock(TestProcEnv testProcEnv) { return ProcedureLockState.LOCK_ACQUIRED; } - SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) testProcEnv.getScheduler(); - scheduler.addWaiting(this); System.out.println(procName + " wait for lock."); return ProcedureLockState.LOCK_EVENT_WAIT; } + @Override + protected void waitForLock(TestProcEnv testProcEnv) { + SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) testProcEnv.getScheduler(); + scheduler.waitProcedure(this, testProcEnv.getEnvLock()); + } + @Override protected void releaseLock(TestProcEnv testProcEnv) { System.out.println(procName + " release lock."); - testProcEnv.getEnvLock().unlock(); SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) testProcEnv.getScheduler(); - scheduler.releaseWaiting(); + scheduler.releaseWaiting(testProcEnv.getEnvLock()); } @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java index 75c0963a27fab..b2ec615fbcd5b 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java @@ -106,4 +106,51 @@ public void deserializeOldFormatConfigRegionTest() { fail(); } } + + @Test + public void completedProcedureEqualsTest() { + Map> leaderMap = new HashMap<>(); + leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), new Pair<>(1, 2)); + + try { + PipeHandleLeaderChangeProcedure proc = deserializeCompletedProcedure(leaderMap); + PipeHandleLeaderChangeProcedure proc2 = deserializeCompletedProcedure(leaderMap); + + assertEquals(proc, proc2); + assertEquals(proc.hashCode(), proc2.hashCode()); + } catch (Exception e) { + fail(); + } + } + + private PipeHandleLeaderChangeProcedure deserializeCompletedProcedure( + Map> leaderMap) throws Exception { + PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode()); + outputStream.writeLong(Procedure.NO_PROC_ID); + outputStream.writeInt(ProcedureState.SUCCESS.ordinal()); + outputStream.writeLong(0L); + outputStream.writeLong(0L); + outputStream.writeLong(Procedure.NO_PROC_ID); + outputStream.writeLong(Procedure.NO_TIMEOUT); + outputStream.writeInt(-1); + outputStream.write((byte) 0); + outputStream.writeInt(-1); + outputStream.write((byte) 0); + outputStream.writeInt(1); + outputStream.writeInt(Integer.MIN_VALUE); + outputStream.write((byte) 0); + outputStream.writeInt(leaderMap.size()); + for (Map.Entry> entry : leaderMap.entrySet()) { + outputStream.writeInt(entry.getKey().getId()); + outputStream.writeInt(entry.getValue().getLeft()); + outputStream.writeInt(entry.getValue().getRight()); + } + + ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + return (PipeHandleLeaderChangeProcedure) ProcedureFactory.getInstance().create(buffer); + } }