Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,9 @@ public class PipeTaskCoordinatorLock {
public void lock() {
LOGGER.debug(
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
try {
semaphore.acquire();
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
Thread.currentThread().getName());
}
semaphore.acquireUninterruptibly();
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
}

public boolean tryLock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -60,6 +61,7 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {
private volatile long lastUpdate;

private final AtomicReference<byte[]> result = new AtomicReference<>();
private final AtomicBoolean executing = new AtomicBoolean(false);
private volatile boolean locked = false;
private boolean lockedWhenLoading = false;

Expand Down Expand Up @@ -233,6 +235,16 @@ protected void releaseLock(Env env) {
// no op
}

/**
* Called after an execution attempt returns {@link ProcedureLockState#LOCK_EVENT_WAIT}. Override
* it to put the procedure into the corresponding lock wait queue.
*
* @param env env
*/
protected void waitForLock(Env env) {
// no op
}

/**
* Used to keep procedure lock even when the procedure is yielded or suspended.
*
Expand All @@ -254,6 +266,14 @@ protected boolean isYieldAfterExecution(Env env) {
}

// -------------------------Internal methods - called by the procedureExecutor------------------
final boolean tryAcquireExecution() {
return executing.compareAndSet(false, true);
}

final void releaseExecution() {
executing.set(false);
}

/**
* Internal method called by the ProcedureExecutor that starts the user-level code execute().
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
Expand All @@ -40,7 +39,6 @@
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -82,6 +80,16 @@ public class ProcedureExecutor<Env> {
private final Env environment;
private final IProcedureStore<Env> store;

private static final class LockStateResult<Env> {
private final ProcedureLockState lockState;
private final Procedure<Env> procedure;

private LockStateResult(ProcedureLockState lockState, Procedure<Env> procedure) {
this.lockState = lockState;
this.procedure = procedure;
}
}

public ProcedureExecutor(
final Env environment, final IProcedureStore<Env> store, final ProcedureScheduler scheduler) {
this.environment = environment;
Expand Down Expand Up @@ -320,32 +328,38 @@ private void executeProcedure(Procedure<Env> proc) {
return;
}
ProcedureLockState lockState = null;
Procedure<Env> lockEventWaitProcedure = null;
try {
do {
if (!rootProcStack.acquire()) {
if (rootProcStack.setRollback()) {
lockState = executeRootStackRollback(rootProcId, rootProcStack);
LockStateResult<Env> lockStateResult =
executeRootStackRollback(rootProcId, rootProcStack);
lockState = lockStateResult.lockState;
switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
LOG.info("LOCK_EVENT_WAIT rollback {}", proc);
LOG.info("LOCK_EVENT_WAIT rollback {}", lockStateResult.procedure);
rootProcStack.unsetRollback();
lockEventWaitProcedure = lockStateResult.procedure;
break;
case LOCK_YIELD_WAIT:
rootProcStack.unsetRollback();
scheduler.yield(proc);
scheduler.yield(lockStateResult.procedure);
break;
default:
throw new UnsupportedOperationException();
}
} else {
if (!proc.wasExecuted()) {
switch (executeRollback(proc)) {
lockState = executeRollback(proc);
switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
LOG.info("LOCK_EVENT_WAIT can't rollback child running for {}", proc);
lockEventWaitProcedure = proc;
break;
case LOCK_YIELD_WAIT:
scheduler.yield(proc);
Expand All @@ -357,19 +371,25 @@ private void executeProcedure(Procedure<Env> proc) {
}
break;
}
lockState = acquireLock(proc);
switch (lockState) {
case LOCK_ACQUIRED:
executeProcedure(rootProcStack, proc);
break;
case LOCK_YIELD_WAIT:
case LOCK_EVENT_WAIT:
LOG.info("{} lockstate is {}", proc, lockState);
break;
default:
throw new UnsupportedOperationException();
try {
lockState = acquireLock(proc);
switch (lockState) {
case LOCK_ACQUIRED:
executeProcedure(rootProcStack, proc);
break;
case LOCK_YIELD_WAIT:
case LOCK_EVENT_WAIT:
LOG.info("{} lockstate is {}", proc, lockState);
if (lockState == ProcedureLockState.LOCK_EVENT_WAIT) {
lockEventWaitProcedure = proc;
}
break;
default:
throw new UnsupportedOperationException();
}
} finally {
rootProcStack.release();
}
rootProcStack.release();

if (proc.isSuccess()) {
// update metrics on finishing the procedure
Expand All @@ -387,9 +407,9 @@ private void executeProcedure(Procedure<Env> proc) {
} finally {
// Only after procedure has completed execution can it be allowed to be rescheduled to prevent
// data races
if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
LOG.info("procedureId {} wait for lock.", proc.getProcId());
((ConfigNodeProcedureEnv) this.environment).getNodeLock().waitProcedure(proc);
if (lockEventWaitProcedure != null) {
LOG.info("procedureId {} wait for lock.", lockEventWaitProcedure.getProcId());
lockEventWaitProcedure.waitForLock(this.environment);
}
}
}
Expand All @@ -404,6 +424,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
if (proc.getState() != ProcedureState.RUNNABLE) {
LOG.error(
"The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc);
releaseLock(proc, false);
return;
}
boolean reExecute;
Expand Down Expand Up @@ -570,8 +591,8 @@ private void yieldProcedure(Procedure<Env> proc) {
* @param procedureStack root procedure stack
* @return lock state
*/
private ProcedureLockState executeRootStackRollback(
Long rootProcId, RootProcedureStack procedureStack) {
private LockStateResult<Env> executeRootStackRollback(
Long rootProcId, RootProcedureStack<Env> procedureStack) {
Procedure<Env> rootProcedure = procedures.get(rootProcId);
ProcedureException exception = rootProcedure.getException();
if (exception == null) {
Expand All @@ -590,19 +611,19 @@ private ProcedureLockState executeRootStackRollback(
}
ProcedureLockState lockState = acquireLock(procedure);
if (lockState != ProcedureLockState.LOCK_ACQUIRED) {
return lockState;
return new LockStateResult<>(lockState, procedure);
}
lockState = executeRollback(procedure);
releaseLock(procedure, false);

boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED;
abortRollback |= !isRunning() || !store.isRunning();
if (abortRollback) {
return lockState;
return new LockStateResult<>(lockState, procedure);
}

if (!procedure.isFinished() && procedure.isYieldAfterExecution(this.environment)) {
return ProcedureLockState.LOCK_YIELD_WAIT;
return new LockStateResult<>(ProcedureLockState.LOCK_YIELD_WAIT, procedure);
}

if (procedure != rootProcedure) {
Expand All @@ -612,7 +633,7 @@ private ProcedureLockState executeRootStackRollback(

LOG.info("Rolled back {}, time duration is {}", rootProcedure, rootProcedure.elapsedTime());
rootProcedureCleanup(rootProcedure);
return ProcedureLockState.LOCK_ACQUIRED;
return new LockStateResult<>(ProcedureLockState.LOCK_ACQUIRED, rootProcedure);
}

private ProcedureLockState acquireLock(Procedure<Env> proc) {
Expand Down Expand Up @@ -728,16 +749,33 @@ public void run() {
Thread.sleep(1000);
continue;
}
this.activeProcedure.set(procedure);
activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
executeProcedure(procedure);
activeExecutorCount.decrementAndGet();
LOG.trace(
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
boolean executionAcquired = false;
while (isRunning() && !(executionAcquired = procedure.tryAcquireExecution())) {
Thread.sleep(10);
}
if (!executionAcquired) {
continue;
}
try {
this.activeProcedure.set(procedure);
activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
try {
executeProcedure(procedure);
} finally {
procedure.releaseExecution();
activeExecutorCount.decrementAndGet();
LOG.trace(
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}
} catch (Exception e) {
LOG.warn(
"Exception happened when worker {} execute procedure {}", getName(), procedure, e);
throw e;
}
}

} catch (Exception e) {
Expand All @@ -748,6 +786,7 @@ public void run() {
this.activeProcedure.get(),
e);
}
this.activeProcedure.set(null);
} finally {
LOG.info("Procedure worker {} terminated.", getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ public TimeoutExecutorThread(
}

public void add(Procedure<Env> procedure) {
queue.add(new ProcedureDelayContainer<>(procedure));
ProcedureDelayContainer<Env> delayTask = new ProcedureDelayContainer<>(procedure);
queue.remove(delayTask);
queue.add(delayTask);
}

public boolean remove(Procedure<Env> procedure) {
return queue.remove(new ProcedureDelayContainer<>(procedure));
return queue.remove(new ProcedureDelayContainer<>(procedure)) || procedure.isFinished();
}

private ProcedureDelayContainer<Env> takeQuietly() {
Expand All @@ -62,10 +64,15 @@ public void run() {
}
Procedure<Env> procedure = delayTask.getProcedure();
if (procedure instanceof InternalProcedure) {
if (procedure.isFinished()) {
continue;
}
InternalProcedure internal = (InternalProcedure) procedure;
internal.periodicExecute(executor.getEnvironment());
procedure.updateTimestamp();
queue.add(delayTask);
if (!procedure.isFinished()) {
procedure.updateTimestamp();
queue.add(delayTask);
}
} else {
if (procedure.setTimeoutFailure(executor.getEnvironment())) {
long rootProcId = executor.getRootProcedureId(procedure);
Expand All @@ -92,6 +99,23 @@ public Procedure<Env> getProcedure() {
return procedure;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ProcedureDelayContainer)) {
return false;
}
ProcedureDelayContainer<?> that = (ProcedureDelayContainer<?>) o;
return procedure == that.procedure;
}

@Override
public int hashCode() {
return System.identityHashCode(procedure);
}

@Override
public long getDelay(TimeUnit unit) {
long delay = procedure.getTimeoutTimestamp() - System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ private void addNextStateAndCalculateCycles() {
nextState);
}
}
if (getStateId(getCurrentState()) == stateToBeAdded) {
final TState currentState = getCurrentState();
if (currentState != null && getStateId(currentState) == stateToBeAdded) {
cycles++;
} else {
cycles = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public boolean equals(Object o) {
}
CreateCQProcedure that = (CreateCQProcedure) o;
return getProcId() == that.getProcId()
&& getCurrentState().equals(that.getCurrentState())
&& Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& isGeneratedByPipe == that.isGeneratedByPipe
&& firstExecutionTime == that.firstExecutionTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced
}
}

@Override
protected void waitForLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
configNodeProcedureEnv.getSchedulerLock().lock();
try {
configNodeProcedureEnv
.getNodeLock()
.waitProcedure(this, configNodeProcedureEnv.getScheduler());
} finally {
configNodeProcedureEnv.getSchedulerLock().unlock();
}
}

@Override
protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
configNodeProcedureEnv.getSchedulerLock().lock();
Expand Down
Loading
Loading