Synchronizing Java threads correctly with wait / notifyAll?

Here is a simplified version of my application showing what I am doing.

/*
in my app main():

    Runner run = new Runner();

    run.dowork();

*/

class Runner
{
    private int totalWorkers = 2;
    private int workersDone = 0;

    public synchronized void workerDone()
    {
        workersDone++;
        notifyAll();
    }

    public synchronized void dowork()
    {
        workersDone = 0;

        //<code for opening a file here, other setup here, etc>

        Worker a = new Worker(this);
        Worker b = new Worker(this);

        while ((line = reader.readLine()) != null)
        {
            //<a large amount of processing on 'line'>

            a.setData(line);
            b.setData(line);

            while (workersDone < totalWorkers)
            {
                wait();
            }               
        }
    }
}

class Worker implements Runnable
{
    private Runner runner;
    private String data;

    public Worker(Runner r)
    {
        this.runner = r;
        Thread t = new Thread(this);
        t.start();
    }

    public synchronized void setData(String s)
    {
        this.data = s;
        notifyAll();
    }

    public void run
    {
        while (true)
        {
            synchronized(this)
            {
                wait();

                //<do work with this.data here>

                this.runner.workerDone();
            }
        }
    }
}

      

The basic idea here is that I have a group of workers who process everything on the incoming data line, independently of each other, and write the data wherever it is - they don't need to communicate any data back to the main thread, or exchange data with each other.

The problem I am facing is this code is blocking. I am reading a file with over 1 million lines, and I was lucky to get 100 lines before the application stops responding.

The workers actually all do different jobs, so I want to wait until they are all finished before moving on to the next line.

I cannot let the workers run at different speeds and digitize the data from the inside out because the files I am processing are too large for that and will not fit into memory.

I can't give each worker their own FileReader to retrieve the string independently, because I do a ton of on-line processing before the workers see it and don't want to re-process every worker.

I know I am missing the rather simple aspect of synchronization in Java, but I am stuck at this point. If someone could explain what I am doing wrong, I would appreciate it. I believe I don't understand some aspect of synchronization, but I have no idea trying to fix this.

+1


source to share


2 answers


Working directly with synchronized

, wait()

and notify()

definitely challenging.

Fortunately, the Java Concurrency API provides some excellent control objects for this kind of thing that are much more intuitive. In particular, take a look CyclicBarrier

and CountDownLatch

; one of them will almost certainly be what you are looking for.

You may also find ThreadPoolExecutor

this to be handy for this situation.



Here's a simple example / transformation of your snippet that produces the following output (no deadlock of course):

Read Line: Line 1
Waiting for Online Shutdown: Line 1 Line
Work: Line 1 Line
Work: Line 1
Reading Line: Line 2
Waiting for Online Shutdown: Line 2 Line
Work: Line 2 Line
Work: Line 2
Reading Line: Line 3
Waiting for Online Shutdown: Line 3 Line
Work: Line 3 Line
Work: Line 3
All Work Completed!

public class Runner
{

    public static void main(String args[]) {
        Runner r = new Runner();
        try {
            r.dowork();
        } catch (IOException e) {
            // handle
            e.printStackTrace();
        }
    }

    CyclicBarrier barrier;
    ExecutorService executor;
    private int totalWorkers = 2;

    public Runner() {
        this.barrier = new CyclicBarrier(this.totalWorkers + 1);
        this.executor = Executors.newFixedThreadPool(this.totalWorkers);
    }

    public synchronized void dowork() throws IOException
    {
        //<code for opening a file here, other setup here, etc>
        //BufferedReader reader = null;
        //String line;

        final Worker worker = new Worker();

        for(String line : new String[]{"Line 1", "Line 2", "Line 3"})
        //while ((line = reader.readLine()) != null)
        {
            System.out.println("Read line: " + line);
            //<a large amount of processing on 'line'>

            for(int c = 0; c < this.totalWorkers; c++) {
                final String curLine = line;
                this.executor.submit(new Runnable() {
                    public void run() {
                        worker.doWork(curLine);
                    }
                });
            }

            try {
                System.out.println("Waiting for work to be complete on line: " + line);
                this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }

        System.out.println("All work complete!");
    }

    class Worker
    {
        public void doWork(String line)
        {
            //<do work with this.data here>
            System.out.println("Working on line: " + line);

            try {
                Runner.this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }
    }    
}

      

+3


source


IMHO you misplaced "workersDone = 0".



public synchronized void dowork()
        {
                // workersDone = 0;

                //<code for opening a file here, other setup here, etc>

                Worker a = new Worker(this);
                Worker b = new Worker(this);

                while ((line = reader.readLine()) != null)
                {
                        workersDone = 0;

                        //<a large amount of processing on 'line'>

                        a.setData(line);
                        b.setData(line);

                        while (workersDone < totalWorkers)
                        {
                                wait();
                        }                               
                }
        }

      

0


source







All Articles