Threaded sorting is slower than non-threaded sorting
I am trying to sort a file using streams. Here is Sort.java:
This function is sorted by streaming
public static String[] threadedSort(File[] files) throws IOException {
String sortedData[] = new String[0];
int counter = 0;
boolean allThreadsTerminated = false;
SortingThread[] threadList = new SortingThread[files.length];
for (File file : files) {
String[] data = getData(file);
threadList[counter] = new SortingThread(data);
threadList[counter].start();
counter++;
}
while(!allThreadsTerminated) {
allThreadsTerminated = true;
for(counter=0; counter<files.length; counter++) {
if(threadList[counter].getState() != Thread.State.TERMINATED) {
allThreadsTerminated = false;
}
}
}
for(counter=0; counter<files.length; counter++) {
sortedData = MergeSort.merge(sortedData, threadList[counter].data);
}
return sortedData;
}
This function only sorts normally
public static String[] sort(File[] files) throws IOException {
String[] sortedData = new String[0];
for (File file : files) {
String[] data = getData(file);
data = MergeSort.mergeSort(data);
sortedData = MergeSort.merge(sortedData, data);
}
return sortedData;
}
Now when I sort using both methods, normal sort is faster than the stream version. What could be the reason for this? Did I miss something?
My SortingThread looks something like this:
public class SortingThread extends Thread {
String[] data;
SortingThread(String[] data) {
this.data = data;
}
public void run() {
data = MergeSort.mergeSort(data);
}
}
When I analyze my threaded implementation, comparing its performance to the original non-threaded implementation, I find the second one faster. What could be the reason for this behavior? In terms of relative performance gains, we'd expect the streaming implementation to be faster, if I'm not mistaken.
EDIT: Suppose I have a properly functional MergeSort. But its useless to post your code here. Also the getData () function is just an input file. I think the problem lies in the fact that I am taking a whole array into an array. I think I have to provide different lines for different threads:
private static String[] getData(File file) throws IOException {
ArrayList<String> data = new ArrayList<String>();
BufferedReader in = new BufferedReader(new FileReader(file));
while (true) {
String line = in.readLine();
if (line == null) {
break;
}
else {
data.add(line);
}
}
in.close();
return data.toArray(new String[0]);
}
source to share
How many CPUs / cores do you have? One problem with this code is that the main thread wastes CPU time in a while (! AllThreadsTerminated) loop, actively checking the state of the thread. If you have one processor - you lose it, instead of the actual sorting.
Replace the while loop:
for(counter=0; counter<files.length; counter++) {
threadList[counter].join();
}
source to share
You have to use Stream and standard collation:
static String[] sort(File[] files, boolean parallel) {
return (parallel ? Stream.of(files).parallel() : Stream.of(files))
.flatMap(f -> {
try {
return Files.lines(f.toPath());
} catch (Exception e) {
e.printStackTrace();
return null;
}
})
.sorted()
.toArray(String[]::new);
}
static String[] sort(File[] files) {
return sort(files, false);
}
static String[] threadSort(File[] files) {
return sort(files, true);
}
It threadSort
is faster in my environment .
sort:
files=511 sorted lines=104419 elapse=4784ms
threadSort:
files=511 sorted lines=104419 elapse=3060ms
source to share
You can use java.util.concurrent.ExecutorService
which will run all your tasks on a certain number of threads, and once all threads have finished, you will get a list Future
that will contain the result of each thread execution. The list of Future objects will be in the same order in which you inserted the Callable objects into your list.
To do this, you need to have an interface SortingThread
Callable
so that you can get the output of each thread.
Every object Callable
must implement a method call()
, and its return type will be your object Future
.
public class SortingThread implements Callable<String[]> {
String[] data;
SortingThread(String[] data) {
this.data = data;
}
@Override
public String[] call() throws Exception {
data = MergeSort.mergeSort(data);
return data;
}
}
Next you need to use ExecutorSerivce
for flow control.
public static String[] sortingExampleWithMultiThreads(File[] files) throws IOException {
String sortedData[] = new String[0];
int counter = 0;
boolean allThreadsTerminated = false;
SortingThread[] threadList = new SortingThread[files.length];
ArrayList<Callable<String[]>> callableList = new ArrayList<Callable<String[]>>();
for (File file : files) {
String[] data = getData(file);
callableList.add(new SortingThread(data)); //Prepare a Callable list which would be passed to invokeAll() method.
counter++;
}
ExecutorService service = Executors.newFixedThreadPool(counter); // Create a fixed size thread pool, one thread for each file processing...
List<Future<String[]>> futureObjects = service.invokeAll(callableList); //List of what call() method of SortingThread is returning...
for(counter=0; counter<files.length; counter++) {
sortedData = MergeSort.merge(sortedData, futureObjects.get(counter));
}
return sortedData;
}
This method avoids the use of a WHILE loop, which is known to increase the CPU load (hence reduce the speed), and if you have a single-core processor, then it can reach 100% use, and if dual-core, then 50%.
Also, using ExecutorService
thread control is the best way when dealing with multi-threaded systems, rather than starting and controlling threads for results. Thus, you can expect performance.
I haven't run it, so you might need to change that here and there, but I stressed that you are a good fit.
PS: When measuring performance, always create a new JVM instance for each run to get accurate and accurate results.
source to share