Java 8 Stream reads a file for every 100 lines
Let's say I have a huge file and I would like to read 100 lines and do one operation. (I want to concatenate 100 lines and send a request to rest)
In Java 7 I would do something like below.
try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
String line;
int count = 0;
List<String> list = new ArrayList<>();
while ((line = br.readLine()) != null) {
list.add(line);
count++;
if (count % 100 == 0) {
//do the operation on list
list = new ArrayList();
}
}
} catch (IOException e) {
e.printStackTrace();
}
Is there something we can use for Java 8 Stream here? I know we can do something like this, but it works on every line instead of 100 lines. So I think foreach is not an option here.
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
stream.forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
source to share
If you don't like the approach described above, you can just use the second approach, but you cannot create partial streams in parallel as you have to read sequentially lines
. eg:
split(Paths.get("file"), 100).forEach(this::sendRequest);
void sendRequest(List<String> each) {
// then you must send the rest request in parallel here
}
Stream<List<String>> split(Path path, int limit) throws IOException {
// skip the remaining lines if its size < limit
return split(Files.lines(path), limit, true);
}
<T> Stream<List<T>> split(Stream<T> source,
int limit, boolean skipRemainingElements) {
//variables just for printing purpose
Spliterator<T> it = source.spliterator();
long size = it.estimateSize();
int c = it.characteristics();// characteristics
return stream(new AbstractSpliterator<List<T>>(size, c) {
private int thresholds = skipRemainingElements ? limit : 1;
@Override
@SuppressWarnings("StatementWithEmptyBody")
public boolean tryAdvance(Consumer<? super List<T>> action) {
List<T> each = new ArrayList<>(limit);
while (each.size() < limit && it.tryAdvance(each::add)) ;
if (each.size() < thresholds) return false;
action.accept(each);
return true;
}
}, false).onClose(source::close);
}
source to share
you can split the stream using Stream # skip and Stream # limit and then send a rest request 100 lines in parallel. eg:
split(Paths.get("file"), 100).parallel().forEach(this::sendRequest);
Stream<Stream<String>> split(Path path, int limit) throws IOException {
return LongStream.of(0, lines(path) / limit).parallel()
.map(it -> it * limit)
.mapToObj(offset -> {
try {
return Files.lines(path).skip(offset).limit(limit);
} catch (IOException e) {
throw new RejectedExecutionException(e);
}
});
}
long lines(Path path) throws IOException {
try (LineNumberReader in = open(path)) {
return in.getLineNumber();
}
}
LineNumberReader open(Path path) throws IOException {
return new LineNumberReader(newBufferedReader(path));
}
void sendRequest(Stream<String> each) {
try (BufferedWriter out = null) {// todo: create the output writer
each.forEach(line -> {
try {
out.write(line);
} catch (IOException e) {
// todo: handle error
}
});
} catch (IOException ex) {
//todo: handle error
}
}
IF you want to improve performance, you should implement your own algorithm in split
and methods lines
. and Note LineNumberReader#getLineNumber
is this int
, not long
. for calculating strings, I think there are many open source projects that can calculate the total number of strings in parallel.
And this is just a skeleton if you want the highest performance. firstly, you need to index the line information like: ( totalLines
and offset
) in parallel (an algorithm similar to merge sort) into memory or disk if needed. then you can use RandomeAccessFile
to quickly jump to offset
.
Indexed string info file format as below
total_lines|ofsset1|offset2|...|offsetN
Note : There is no delimiter in the line information file |
. and every value you have to use DataOutputStream#writeLong
to write as long
, because write the line information file in this format, you can calculate the position offsetN
by bytes like:, 8*M; M=(1..N)
then you can get offsetN
by reading 8 bytes from offsetN
.
Indeed, the index file must be created when a huge file has been created (if the file is too large, it needs to be split), then it can save wasted time to be used further.
source to share