Synchronizing Java threads correctly with wait / notifyAll?
Here is a simplified version of my application showing what I am doing.
/*
in my app main():
Runner run = new Runner();
run.dowork();
*/
class Runner
{
private int totalWorkers = 2;
private int workersDone = 0;
public synchronized void workerDone()
{
workersDone++;
notifyAll();
}
public synchronized void dowork()
{
workersDone = 0;
//<code for opening a file here, other setup here, etc>
Worker a = new Worker(this);
Worker b = new Worker(this);
while ((line = reader.readLine()) != null)
{
//<a large amount of processing on 'line'>
a.setData(line);
b.setData(line);
while (workersDone < totalWorkers)
{
wait();
}
}
}
}
class Worker implements Runnable
{
private Runner runner;
private String data;
public Worker(Runner r)
{
this.runner = r;
Thread t = new Thread(this);
t.start();
}
public synchronized void setData(String s)
{
this.data = s;
notifyAll();
}
public void run
{
while (true)
{
synchronized(this)
{
wait();
//<do work with this.data here>
this.runner.workerDone();
}
}
}
}
The basic idea here is that I have a group of workers who process everything on the incoming data line, independently of each other, and write the data wherever it is - they don't need to communicate any data back to the main thread, or exchange data with each other.
The problem I am facing is this code is blocking. I am reading a file with over 1 million lines, and I was lucky to get 100 lines before the application stops responding.
The workers actually all do different jobs, so I want to wait until they are all finished before moving on to the next line.
I cannot let the workers run at different speeds and digitize the data from the inside out because the files I am processing are too large for that and will not fit into memory.
I can't give each worker their own FileReader to retrieve the string independently, because I do a ton of on-line processing before the workers see it and don't want to re-process every worker.
I know I am missing the rather simple aspect of synchronization in Java, but I am stuck at this point. If someone could explain what I am doing wrong, I would appreciate it. I believe I don't understand some aspect of synchronization, but I have no idea trying to fix this.
source to share
Working directly with synchronized
, wait()
and notify()
definitely challenging.
Fortunately, the Java Concurrency API provides some excellent control objects for this kind of thing that are much more intuitive. In particular, take a look CyclicBarrier
and CountDownLatch
; one of them will almost certainly be what you are looking for.
You may also find ThreadPoolExecutor
this to be handy for this situation.
Here's a simple example / transformation of your snippet that produces the following output (no deadlock of course):
Read Line: Line 1
Waiting for Online Shutdown: Line 1 Line
Work: Line 1 Line
Work: Line 1
Reading Line: Line 2
Waiting for Online Shutdown: Line 2 Line
Work: Line 2 Line
Work: Line 2
Reading Line: Line 3
Waiting for Online Shutdown: Line 3 Line
Work: Line 3 Line
Work: Line 3
All Work Completed!
public class Runner
{
public static void main(String args[]) {
Runner r = new Runner();
try {
r.dowork();
} catch (IOException e) {
// handle
e.printStackTrace();
}
}
CyclicBarrier barrier;
ExecutorService executor;
private int totalWorkers = 2;
public Runner() {
this.barrier = new CyclicBarrier(this.totalWorkers + 1);
this.executor = Executors.newFixedThreadPool(this.totalWorkers);
}
public synchronized void dowork() throws IOException
{
//<code for opening a file here, other setup here, etc>
//BufferedReader reader = null;
//String line;
final Worker worker = new Worker();
for(String line : new String[]{"Line 1", "Line 2", "Line 3"})
//while ((line = reader.readLine()) != null)
{
System.out.println("Read line: " + line);
//<a large amount of processing on 'line'>
for(int c = 0; c < this.totalWorkers; c++) {
final String curLine = line;
this.executor.submit(new Runnable() {
public void run() {
worker.doWork(curLine);
}
});
}
try {
System.out.println("Waiting for work to be complete on line: " + line);
this.barrier.await();
} catch (InterruptedException e) {
// handle
e.printStackTrace();
} catch (BrokenBarrierException e) {
// handle
e.printStackTrace();
}
}
System.out.println("All work complete!");
}
class Worker
{
public void doWork(String line)
{
//<do work with this.data here>
System.out.println("Working on line: " + line);
try {
Runner.this.barrier.await();
} catch (InterruptedException e) {
// handle
e.printStackTrace();
} catch (BrokenBarrierException e) {
// handle
e.printStackTrace();
}
}
}
}
source to share
IMHO you misplaced "workersDone = 0".
public synchronized void dowork()
{
// workersDone = 0;
//<code for opening a file here, other setup here, etc>
Worker a = new Worker(this);
Worker b = new Worker(this);
while ((line = reader.readLine()) != null)
{
workersDone = 0;
//<a large amount of processing on 'line'>
a.setData(line);
b.setData(line);
while (workersDone < totalWorkers)
{
wait();
}
}
}
source to share