How to cancel the ShceduledFuture and wait for the runnable to complete if the runnable is executing at the time of cancellation?

When any command scheduled at a fixed rate in any ScheduledExecutorService returns a ScheduledFuture, which can also be canceled. But "undo" does not guarantee that the command will not be executed after undoing the redo, for example, because the command was already in the middle of execution when the "undo" was called.

For most use cases, functionality is sufficient. But I am dealing with a usecase where you need to block the current thread after canceling if the command is already running and wait for the command to complete. In other words, the thread that is causing the cancellation should not go ahead if the command is still running. Canceling with mayInterruptIfRunning = true is also inappropriate because I don't want to disrupt current runs, I just need to wait for normal work to finish.

I haven't found how to achieve these requirements through the standard JDK classes. Question1: Was I wrong and is there such functionality?

So I decided to implement it on my own: import java.util.concurrent. *;

public class GracefullyStoppingScheduledFutureDecorator implements ScheduledFuture {

/**
 * @return the scheduled future with method special implementation of "cancel" method, 
 * which in additional to standard implementation, 
 * provides strongly guarantee that command is not in the middle of progress when "cancel" returns  
 */
public static ScheduledFuture schedule(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) {
    CancellableCommand cancellableCommand = new CancellableCommand(command);
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit);
    return new GracefullyStoppingScheduledFutureDecorator(future, cancellableCommand);
}

private GracefullyStoppingScheduledFutureDecorator(ScheduledFuture targetFuture, CancellableCommand command) {
    this.targetFuture = targetFuture;
    this.runnable = command;
}

private final ScheduledFuture targetFuture;
private final CancellableCommand runnable;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    runnable.cancel();
    return targetFuture.cancel(mayInterruptIfRunning);
}

@Override
public long getDelay(TimeUnit unit) {
    return targetFuture.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
    return targetFuture.compareTo(o);
}

@Override
public boolean isCancelled() {
    return targetFuture.isCancelled();
}

@Override
public boolean isDone() {
    return targetFuture.isDone();
}

@Override
public Object get() throws InterruptedException, ExecutionException {
    return targetFuture.get();
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    return targetFuture.get(timeout, unit);
}

private static class CancellableCommand implements Runnable {

    private final Object monitor = new Object();
    private final Runnable target;
    private boolean cancelled = false;

    private CancellableCommand(Runnable target) {
        this.target = target;
    }

        public void cancel() {
            synchronized (monitor) {
                cancelled = true;
            }
        }

        @Override
        public void run() {
            synchronized (monitor) {
                if (!cancelled) {
                    target.run();
                }
            }
        }

    }

}

      

Question2: Can anyone find errors in the code above?

+3


source to share


1 answer


Question2: Can anyone find errors in the code above?

There is a hypothetical impasse that can be reproduced in the following scenario:

  • The presence of a T1 stream that contains the M1 monitor
  • A scheduled task is running (holding monitor M2) on thread T2 and wants to enter M1, so T2 needs to wait until T1 exits monitor M1.
  • T1 decided to cancel the task, but since its M2 monitor is blocked by the task itself, we have a dead end.


Most likely, the abovr script is unrealistic, but to protect against all possible cases, I decided to rewrite the code without blocking:

public class GracefullyStoppingScheduledFuture {

/**
 * @return the scheduled future with method special implementation of "cancel" method,
 * which in additional to standard implementation,
 * provides strongly guarantee that command is not in the middle of progress when "cancel" returns
 */
public static GracefullyStoppingScheduledFuture cheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) {
    CancellableCommand cancellableCommand = new CancellableCommand(command);
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit);
    return new GracefullyStoppingScheduledFuture(future, cancellableCommand);
}

private GracefullyStoppingScheduledFuture(ScheduledFuture targetFuture, CancellableCommand command) {
    this.targetFuture = targetFuture;
    this.runnable = command;
}

private final ScheduledFuture targetFuture;
private final CancellableCommand runnable;

public void cancelAndBeSureOfTermination(boolean mayInterruptIfRunning) throws InterruptedException, ExecutionException {
    try {
        targetFuture.cancel(mayInterruptIfRunning);
    } finally {
        runnable.cancel();
    }
}

private static class CancellableCommand implements Runnable {

    private static final int NOT_EXECUTING = 0;
    private static final int IN_PROGRESS = 1;
    private static final int CANCELLED_WITHOUT_OBSTRUCTION = 2;
    private static final int CANCELLED_IN_MIDDLE_OF_PROGRESS = 3;

    private final AtomicInteger state = new AtomicInteger(NOT_EXECUTING);
    private final AtomicReference<Thread> executionThread = new AtomicReference<>();
    private final CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
    private final Runnable target;

    private CancellableCommand(Runnable target) {
        this.target = target;
    }

    public void cancel() throws ExecutionException, InterruptedException {
        if (executionThread.get() == Thread.currentThread()) {
            // cancel method was called from target by itself
            state.set(CANCELLED_IN_MIDDLE_OF_PROGRESS);
            return;
        }
        while (true) {
            if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) {
                return;
            }
            if (state.get() == CANCELLED_IN_MIDDLE_OF_PROGRESS) {
                cancellationFuture.get();
                return;
            }
            if (state.compareAndSet(NOT_EXECUTING, CANCELLED_WITHOUT_OBSTRUCTION)) {
                return;
            }
            if (state.compareAndSet(IN_PROGRESS, CANCELLED_IN_MIDDLE_OF_PROGRESS)) {
                cancellationFuture.get();
                return;
            }
        }
    }

    @Override
    public void run() {
        if (!state.compareAndSet(NOT_EXECUTING, IN_PROGRESS)) {
            notifyWaiters();
            return;
        }

        try {
            executionThread.set(Thread.currentThread());
            target.run();
        } finally {
            executionThread.set(null);
            if (!state.compareAndSet(IN_PROGRESS, NOT_EXECUTING)) {
                notifyWaiters();
            }
        }
    }

    private void notifyWaiters() {
        if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) {
            // no need to notify anything
            return;
        }
        // someone waits for cancelling
        cancellationFuture.complete(null);
        return;
    }

}

      

+2


source







All Articles