How to free resource in invalidated CompletableFuture

Uscase

Suppose we start execution with CompletableFuture.runAsync (..), and in the runnable we have a try-with-resources block (we use some resource that must be closed, no matter what happens), and in some then the moment when the execution is not completed try the block, we will cancel the terminated future ... altough execution is stopped, the resource to be closed is not closed. close () AutoClosable is not called ...


Question

Is this a Java issue or is there a way to do it right? without hacky workarounds like using futures (which support interruption, etc.), and if its the expected behavior, how should a similar situation be handled when the CompletableFuture interrupt is canceled ...?


Code

public class AutoClosableResourceTest {

    public static class SomeService{
        public void connect(){
            System.out.println("connect");
        }

        public Integer disconnect(){
            System.out.println("disconnect");
            return null;
        }
    }

    public static class AutoClosableResource<T> implements AutoCloseable {

        private final T resource;
        private final Runnable closeFunction;

        private AutoClosableResource(T resource, Runnable closeFunction){
            this.resource = resource;
            this.closeFunction = closeFunction;
        }

        public T get(){
            return resource;
        }

        @Override
        public void close() throws Exception {
            closeFunction.run();
        }
    }

    @Test
    public void testTryWithResource() throws InterruptedException {
        SomeService service  = new SomeService();

        CompletableFuture<Void> async = CompletableFuture.runAsync(() -> {
            try (AutoClosableResource<SomeService> resource = new AutoClosableResource<>(service, service::disconnect)) {
                resource.get().connect();
                while (true) {
                    Thread.sleep(1000);
                    System.out.println("working...");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        Thread.sleep(2500);
        async.cancel(true);
        Thread.sleep(2500);

    }
}

      

it will produce

connect
working...
working...
working...
working...

      

as you can see that it does not call cancel () and leaves the resource open ...

+4


source to share


6 answers


You seem to have difficulty understanding the goal CompletableFuture

. Take a look at the first sentence of the documentation for this class :

Future

which can be explicitly completed (by setting its value and status), ...

Thus, unlike FutureTask

which terminates by a thread executing its method run

, it CompletableFuture

can be terminated by any thread that sets its value / status at an arbitrary point in time. CompletableFuture

doesn't know which thread will terminate it, and doesn't even know if there is a thread working to terminate it.



Therefore, CompletableFuture

it cannot interrupt the desired thread when canceled. This is a fundamental part of his design.

If you need a worker thread that you can interrupt, it's better to use FutureTask

/ ThreadPoolExecutor

. A job scheduled in this way may complete CompletableFuture

at the end.

+5


source


The following code is stuck in an infinite loop. Calling async.cancel will not bind its desire to stop with the next cycle.

while (true) {
    Thread.sleep(1000);
    System.out.println("working...");
}

      

The test exit fails because the thread stuck in this loop is not a daemon thread.



Replace the while loop check with the following, which checks the isCancelled flag on each iteration. Calling CompletableFuture.cancel () will mark the future as canceled, but it does not interrupt the thread that was started via runAsync.

while (isCancelled()) {
    Thread.sleep(1000);
   System.out.println("working...");
}

      

+1


source


You can use the "full" CompletableFuture method to stop the thread.

Below is a simple code showing the behavior:

package com.ardevco;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureTest3 {
  public static void main(String[] args) throws Exception {

     ExecutorService pool = Executors.newFixedThreadPool(5);

     CompletableFuture<Integer> longRunningcompletableFuture = CompletableFuture.supplyAsync(() -> {
        for (int i = 0; i < 1; i--) {
           System.out.println("i " + i);
           sleep();
        }
        return 1; // we will newer reach this line so the thread will be stuck
     });

     CompletableFuture<Integer> completor = CompletableFuture.supplyAsync(() -> {
        System.out.println("completing the longRunningcompletableFuture");
        longRunningcompletableFuture.complete(1000);
        System.out.println("completed the longRunningcompletableFuture");
        return 10;
     });

     Thread.sleep(10000);

     System.out.println("completor...");
     int i = completor.get();
     System.out.println("completor i:" + i);
     System.out.println("completor...");

     System.out.println("completableFutureToBeCompleted2...");
     int i2 = longRunningcompletableFuture.get();
     System.out.println("completableFutureToBeCompleted2: " + i2);
     System.out.println("completableFutureToBeCompleted2...");

  }

  private static void sleep() {
     try {Thread.sleep(1000);}catch (Exception e) {}
  }

      

}

output:

i 0 completion of a long entry completed a long meeting i -1 i -2 i -3 i -4 i -5 i -6 i -7 i -8 i -9 i -10 completor ... completion i: 10 completor ... completableFutureToBeCompleted2 ... completableFutureToBeCompleted2: 1000 completableFutureToBeCompleted2 ...

0


source


Although there is an answer marked as correct, the reason is completely different: see the CompletableFuture.cancel(mayInterruptIfRunning)

method documentation and read the CompletedFuture cannot be interrupted article to better understand the problem.

This problem is solved in my Tascalate Concurrent , changes in your code should be: From CompletableFuture<Void> async = CompletableFuture.runAsync(() -> { ... });

For Promise<Void> async = CompletableTask.runAsync(() -> { ... }, someExplicitExecutor);

... and you get expected behavior (executor thread interrupted, AutoClosable closed, exited async

with CancellationException

).

More information about the library can be found in my blog

0


source


I am also facing this problem in Java 8 SE. It is important for me not to use third party libraries.

cancel ( mayInterruptIfRunning ) This value has no effect in this implementation because interrupts are not used to control processing.

The idea is that , to use Thread.interrupt () when you call the cancel (), but only for the Runnable .

/** Enable and disable the interrupt */
private static class Interruptor {

    volatile boolean interrupted;
    volatile Runnable interrupt;

    /** Enable interrupt support */
    synchronized boolean start() {
        if (interrupted) {
            return false;
        }
        Thread runThread = Thread.currentThread();
        interrupt = () -> {
            if (runThread != Thread.currentThread()) {
                runThread.interrupt();
            }
        };
        return true;
    }

    /** Interrupt Runnable */
    synchronized void interrupt() {
        if (interrupted) {
            return;
        }
        interrupted = true;
        if (interrupt != null) {
            interrupt.run();
            interrupt = null;
        }
    }

    /** Disable interrupt support */
    synchronized void finish() {
        interrupt = null;
    }
}


/** CompletableFuture with interrupt support */
public static CompletableFuture<Void> runAsyncInterrupted(Runnable run) {

    final Interruptor interruptor = new Interruptor();

    Runnable wrap = () -> {
        if (!interruptor.start()) { // allow interruption
            return; // was canceled before the thread started
        }
        try {
            run.run(); // can be interrupted
        } finally {
            interruptor.finish(); // can no longer be interrupted
        }
    };

    CompletableFuture<Void> cfRun = CompletableFuture.runAsync(wrap);

    // here is caught "CompletableFuture.cancel()"
    cfRun.whenComplete((r, t) -> {
        if (t instanceof CancellationException) {
            interruptor.interrupt();
        }
    });

    return cfRun;
}

      

Usage example

Runnable mySlowIoRun = () -> {
    try {
        InputStream is = openSomeResource(); // open resource
        try {
            // there may be problem (#1) with reading,
            // such as loss of network connection
            int bt = is.read();
            // ..
            // .. some code
        } finally {
            is.close(); // problem (#2): releases any system resources associated with the stream
        }
    } catch (Throwable th) {
        throw new RuntimeException(th);
    }
};

CompletableFuture<Void> cf = runAsyncInterrupted(mySlowIoRun);

try {
    cf.get(5, TimeUnit.SECONDS); // 5 sec timeout
} catch (Throwable th) {
    cf.cancel(true); // cancel with interrupt mySlowIoRun
    throw th;
}

      

0


source


So this is a generalization of how I usually deal with the problem ... commit to a cancelable state and close resources IMMEDIATELY after an open state.

private static BufferedReader openFile(String fn) {
    try {
        return Files.newBufferedReader(Paths.get(fn));
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

static class Util {
    static void closeQuietly(AutoCloseable c) {
        if (c == null) return;
        try {
            c.close();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    static <T extends AutoCloseable, R> R runThenCloseQuietly(T c, Function<T,R> cb) {
        try {
            return cb.apply(c);
        } finally {
            closeQuietly(c);
        }
    }

    static <T extends AutoCloseable, R> Optional<R> runThenCloseQuietlyCancellable(BooleanSupplier cancelled
        , T c, Function<T,Optional<R>> cb) {
        if (c == null) return Optional.empty(); // safe doesn't throw
        try {
            if (cancelled.getAsBoolean()) return Optional.empty(); // might throw, wrap for safety
            return cb.apply(c); // might throw
        } finally {
            closeQuietly(c); // might throw, but at least we're closed
        }
    }

    private static Optional<String> emptyString() {
        return Optional.empty();
    }
}

interface Cancellable {
    boolean isCancelled();
    void cancel();
}

static class CancellableAB implements Cancellable {
    private final AtomicBoolean cancelled;

    CancellableAB(AtomicBoolean cancelled) {
        this.cancelled = cancelled;
    }

    @Override
    public boolean isCancelled() {
        return cancelled.get();
    }

    @Override
    public void cancel() {
        cancelled.set(true);
    }
}
static class CancellableArray implements Cancellable {
    private final boolean[] cancelled;
    private final int idx;
    CancellableArray(boolean[] cancelled) {
        this(cancelled, 0);
    }
    CancellableArray(boolean[] cancelled, int idx) {
        this.cancelled = cancelled;
        this.idx = idx;
    }

    @Override
    public boolean isCancelled() {
        return cancelled[idx];
    }

    @Override
    public void cancel() {
        cancelled[idx]=true;
    }
}

static class CancellableV implements Cancellable {
    volatile boolean cancelled;

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public void cancel() {
        this.cancelled = true;
    }
}

/**
 * The only reason this is a class is because we need SOME external object for the lambda to check for mutated
 * cancelled state.
 * This gives the added benefit that we can directly call cancel on the resource.
 * We allow a cancellable to be passed in to CHAIN-IN cancellable state.  e.g. if cancellation should affect MULTIPLE
 * CompletableFuture states, we don't want other promises to tie references to this task.. So the cancellable
 * object can be externalized.
 * 
 * Normally you don't need this much genericism, you can directly implement a volatile 'cancel boolean'.
 * But this allows you to create a C.F. task as a 3rd party library call - gives maximum flexibility to invoker.
 *
 */
static class FooTask {
    volatile Cancellable cancelled;
    String fileName;

    public FooTask(String fileName) {
        this.fileName = fileName;
        this.cancelled = new CancellableV();
    }

    public FooTask(String fileName, Cancellable cancelled) {
        this.cancelled = cancelled;
    }


    public boolean isCancelled() {
        return cancelled.isCancelled();
    }

    public void cancel() {
        cancelled.cancel();
    }

    /**
     * asynchronously opens file, scans for first valid line (closes file), then processes the line.
     * Note if an exception happens, it the same as not finding any lines. Don't need to special case.
     * Use of utility functions is mostly for generic-mapping
     * (avoiding annoying double-type-casting plus editor warnings)
     */
    CompletableFuture<Optional<Long>> run1() {
        return
            CompletableFuture.supplyAsync(() -> openFile(fileName))
                .thenApplyAsync(c ->  { // this stage MUST close the prior stage
                        if(cancelled.isCancelled() || c == null) return Util.emptyString(); // shouldn't throw
                        try {
                            return c
                                .lines()
                                .filter(line -> !cancelled.isCancelled())
                                .filter(line -> !line.startsWith("#"))
                                .findFirst();
                        } catch (RuntimeException e) {
                            Util.closeQuietly(c);
                            throw new RuntimeException(e);
                        }
                    }
                )
                .thenApplyAsync(oLine -> // this stage doesn't need closing
                    oLine
                        .map(line -> line.split(":"))
                        .map(cols -> cols[2])
                        .map(Long::valueOf)
                        )
            ;
    }


    /**
     * Same as run1 but avoids messy brackets + try-finally
     */
    CompletableFuture<Optional<Long>> run2() {
        return
            CompletableFuture.supplyAsync(() -> openFile(fileName))
                .thenApplyAsync(c ->  // this stage MUST close the prior stage
                    Util.runThenCloseQuietly(
                        c
                        , r -> cancelled.isCancelled() ? Util.emptyString() // shouldn't throw
                            : r
                            .lines()
                            .filter(line -> !cancelled.isCancelled())
                            .filter(line -> !line.startsWith("#"))
                            .findFirst()
                    ))
                .thenApplyAsync(oLine -> // this stage doesn't need closing
                    oLine
                        .map(line -> line.split(":"))
                        .map(cols -> cols[2])
                        .map(Long::valueOf)
                        )
            ;
    }

    /**
     * Same as run2 but avoids needing the teneary operator - says Cancellable in func-name so is more readable
     */
    CompletableFuture<Optional<Long>> run3() {
        return
            CompletableFuture.supplyAsync(() -> openFile(fileName))
                .thenApplyAsync(c ->  // this stage MUST close the prior stage
                    Util.runThenCloseQuietlyCancellable(
                    cancelled::isCancelled // lambda here is slightly easier to read than explicit if-statement
                    , c
                    , r ->  r
                            .lines()
                            .filter(line -> !cancelled.isCancelled())
                            .filter(line -> !line.startsWith("#"))
                            .findFirst()
                ))
                .thenApplyAsync(oLine -> // this stage doesn't need closing
                    oLine
                        .map(line -> line.split(":"))
                        .map(cols -> cols[2])
                        .map(Long::valueOf)
                        )
        ;
    }

}

@Test
public void testFooGood() {
    var task = new FooTask("/etc/passwd");
    var cf = task.run3();

    var oVal = cf.join();
    assertTrue(oVal.isPresent());
    System.out.println(oVal.get()); // should not throw
}

@Test
public void testFooCancel() {
    var task = new FooTask("/etc/passwd");
    var cf = task.run3();
    task.cancel();

    var oVal = cf.join();
    assertTrue(oVal.isEmpty());
}

      

0


source







All Articles