Parallel check if collection is empty

I have this piece of code:

private ConcurrentLinkedQueue<Interval> intervals = new ConcurrentLinkedQueue();
@Override
public void run(){
  while(!intervals.isEmpty()){
    //remove one interval
    //do calculations
    //add some intervals
  }
}

      

This code is executed by a certain number of threads at the same time. As you can see, the loop should continue until there are no more intervals in the collection, but there is a problem. At the beginning of each iteration, the interval is removed from the collection, and at the end, a number of intervals can be added back to one collection.

The problem is that even though one thread is inside the loop, the collection can become empty, so other threads that try to enter the loop will not be able to do so and will end their work ahead of schedule, although the collection may be filled with values ​​after the first the thread will complete the iteration. I want the number of threads to remain constant (or at most some number n) until all jobs are finished.

This means that no thread is running in a loop and there are no items in the collection. What are the possible ways to achieve this? Any ideas are appreciated.

One way to solve this problem in my particular case is to give each theme a separate chunk of the original collection. But after one thread finishes its work, it will no longer be used by the program, although it can help other threads with their calculations, so I don't like this solution, because it is important to use all the cores of the machine in my problem.

This is the simplest minimal working example I could think of. It can be lengthy.

public class Test{   
   private ConcurrentLinkedQueue<Interval> intervals = new ConcurrentLinkedQueue();
   private int threadNumber;
   private Thread[] threads;
   private double result;

   public Test(int threadNumber){
      intervals.add(new Interval(0, 1));
      this.threadNumber = threadNumber;
      threads = new Thread[threadNumber];
   }

   public double find(){
      for(int i = 0; i < threadNumber; i++){
         threads[i] = new Thread(new Finder());
         threads[i].start();
      }
      try{
         for(int i = 0; i < threadNumber; i++){
            threads[i].join();
         }
      }
      catch(InterruptedException e){
         System.err.println(e);
      }
      return result;
   }   

   private class Finder implements Runnable{   
      @Override
      public void run(){
         while(!intervals.isEmpty()){
            Interval interval = intervals.poll();
            if(interval.high - interval.low > 1e-6){    
               double middle = (interval.high + interval.low) / 2;
               boolean something = true;
               if(something){
                  intervals.add(new Interval(interval.low + 0.1, middle - 0.1));
                  intervals.add(new Interval(middle + 0.1, interval.high - 0.1));
               }
               else{
                  intervals.add(new Interval(interval.low + 0.1, interval.high - 0.1));
               }
            }
         }
      }
   }

   private class Interval{
      double low;
      double high;
      public Interval(double low, double high){
         this.low = low;
         this.high = high;
      }
   }
}

      

What you may need to know about the program: After each iteration interval must either disappear (because it is too small), it will decrease, or split into two smaller intervals. The job ends after there are no gaps left. Also, I should be able to limit the number of threads that do this job to some number n. The actual program looks for the maximum value of some function by dividing the intervals and discarding parts of those intervals that cannot contain the maximum value using some rules, but this shouldn't really be relevant for my problem.

+3


source to share


5 answers


You can use an atomic flag, i.e .:

private ConcurrentLinkedQueue<Interval> intervals = new ConcurrentLinkedQueue<>();
private AtomicBoolean inUse = new AtomicBoolean();

@Override
public void run() {
    while (!intervals.isEmpty() && inUse.compareAndSet(false, true)) {
        // work
        inUse.set(false);
    }
}

      

UPD

The question has been updated so I would give you a better solution. This is a more "classic" solution using a blocking queue;



private BlockingQueue<Interval> intervals = new ArrayBlockingQueue<Object>();
private volatile boolean finished = false;

@Override
public void run() {
    try {
        while (!finished) {
            Interval next = intervals.take();
            // put work there
            // after you decide work is finished just set finished = true
            intervals.put(interval); // anyway, return interval to queue
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

      

UPD2

It is now better to rewrite the solution and split the range into sub-ranges for each stream.

+2


source


CompletableFuture is also an interesting solution for such tasks. It automatically distributes the workload across multiple worker threads.



static CompletableFuture<Integer> fibonacci(int n) {
  if(n < 2) return CompletableFuture.completedFuture(n);
  else {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println(Thread.currentThread());
      CompletableFuture<Integer> f1 = fibonacci(n - 1);
      CompletableFuture<Integer> f2 = fibonacci(n - 2);
      return f1.thenCombineAsync(f2, (a, b) -> a + b);
    }).thenComposeAsync(f -> f);
  }
}

public static void main(String[] args) throws Exception {
  int fib = fibonacci(10).get();
  System.out.println(fib);
}

      

+2


source


Your problem looks like a recursive one - processing one task (interval) may result in some sub-tasks (auxiliary intervals).

For this purpose, I would use ForkJoinPool

and RecursiveTask

:

class Interval {
    ...
}

class IntervalAction extends RecursiveAction {
    private Interval interval;

    private IntervalAction(Interval interval) {
        this.interval = interval;
    }

    @Override
    protected void compute() {
        if (...) {
            // we need two sub-tasks
            IntervalAction sub1 = new IntervalAction(new Interval(...));
            IntervalAction sub2 = new IntervalAction(new Interval(...));
            sub1.fork();
            sub2.fork();
            sub1.join();
            sub2.join();
        } else if (...) {
            // we need just one sub-task
            IntervalAction sub3 = new IntervalAction(new Interval(...));
            sub3.fork();
            sub3.join();
        } else {
            // current task doesn't need any sub-tasks, just return
        }
    }
}

public static void compute(Interval initial) {
    ForkJoinPool pool = new ForkJoinPool();
    pool.invoke(new IntervalAction(initial));
    // invoke will return when all the processing is completed
}

      

+1


source


Then I would suggest a master / worker approach.

The master process walks through the intervals and assigns the calculations of that interval to another process. It also removes / adds as needed. Thus, all cores are used, and only when all intervals are completed, the process is executed. This is also known as dynamic work allocation.

Possible example:

public void run(){
  while(!intervals.isEmpty()){
    //remove one interval
    Thread t = new Thread(new Runnable()
    {
        //do calculations
    });
    t.run();
    //add some intervals
  }
}

      

The possible solution you provided is known as static allocation and you are correct, it will finish as fast as the slowest processor, but the dynamic approach will use all the memory.

0


source


I faced this problem. The way I solved it was to use AtomicInteger to find out what is in the queue. Before each sentence (), add an integer. After each poll (), decrement integer. CLQ has no real isEmpty () as it has to look at the head / tail nodes and this can change atomically (CAS).

This does not 100% guarantee that some streams can grow after another stream shrinks, so you need to check again until you finish the stream. It's better than relying on while (... isEmpty ())

In addition, you may need to sync.

0


source







All Articles