Skip to content

Instantly share code, notes, and snippets.

@simonbasle
Created October 26, 2017 10:22
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save simonbasle/0167a1f833a19724646bc7eb27e4346b to your computer and use it in GitHub Desktop.
Save simonbasle/0167a1f833a19724646bc7eb27e4346b to your computer and use it in GitHub Desktop.
Example of loading a large file using Reactor and Java 8
package snippets;
import java.io.Console;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import java.util.stream.BaseStream;
import reactor.core.publisher.Flux;
/**
* @author Simon Baslé
*/
public class ReadFile {
public static void main(String[] args) throws IOException {
String defaultFileName = System.getProperty("user.home") + "/bookshelf.txt";
Scanner c = new Scanner(System.in);
System.out.printf("Please enter a path to a large text file [%s]:", defaultFileName);
String fileName = c.nextLine();
c.close();
if (fileName == null || fileName.isEmpty()) {
fileName = defaultFileName;
}
Path path = Paths.get(fileName);
if (!path.toFile().isFile()) {
System.err.println("The file " + path + " doesn't exist or is not a text file");
System.exit(-1);
}
Runtime runtime = Runtime.getRuntime();
System.err.printf("Found %s, of size %dMB\n", path, Files.size(path) / (1024 * 1024));
System.gc();
long beforeFlux = runtime.totalMemory() - runtime.freeMemory();
System.err.printf("Memory in use before reading: %dMB\n\n", beforeFlux / (1024 * 1024));
Flux<String> books = fluxVersion(path);
books.doOnNext(System.out::println)
.blockLast();
listVersion(path);
}
private static Flux<String> fluxVersion(Path path) {
final Runtime runtime = Runtime.getRuntime();
return fromPath(path)
.filter(s -> s.startsWith("Title: ") || s.startsWith("Author: ")
|| s.equalsIgnoreCase("##BOOKSHELF##"))
.map(s -> s.replaceFirst("Title: ", ""))
.map(s -> s.replaceFirst("Author: ", " by "))
.windowWhile(s -> !s.contains("##"))
.flatMap(bookshelf -> bookshelf
.window(2)
.flatMap(bookInfo -> bookInfo.reduce(String::concat))
.collectList()
.doOnNext(s -> System.gc())
.flatMapMany(bookList -> Flux.just(
"\n\nFound new Bookshelf of " + bookList.size() + " books:",
bookList.toString(),
String.format("Memory in use while reading: %dMB\n", (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024))
)));
}
private static Flux<String> fromPath(Path path) {
return Flux.using(() -> Files.lines(path),
Flux::fromStream,
BaseStream::close
);
}
private static void listVersion(Path path) throws IOException {
final Runtime runtime = Runtime.getRuntime();
List<String> wholeData = Files.readAllLines(path);
List<String> books = new ArrayList<>();
Iterator<String> iter = wholeData.iterator();
String title = null;
while(iter.hasNext()) {
String line = iter.next();
if (line.startsWith("Title: ")) {
title = line.replaceFirst("Title: ", "");
}
else if (line.startsWith("Author: ")) {
String author = line.replaceFirst("Author: ", " by ");
books.add(title + author);
title = null;
}
else if (line.equalsIgnoreCase("##BOOKSHELF##")) {
System.gc();
System.out.println("\n\nFound new bookshelf of " + books.size() + " books:");
System.out.println(books);
System.out.printf("Memory in use while reading: %dMB\n", (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024));
books.clear();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment