-
-
Save simonbasle/0167a1f833a19724646bc7eb27e4346b to your computer and use it in GitHub Desktop.
Example of loading a large file using Reactor and Java 8
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package snippets; | |
import java.io.Console; | |
import java.io.IOException; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import java.nio.file.Paths; | |
import java.util.ArrayList; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Scanner; | |
import java.util.stream.BaseStream; | |
import reactor.core.publisher.Flux; | |
/** | |
* @author Simon Baslé | |
*/ | |
public class ReadFile { | |
public static void main(String[] args) throws IOException { | |
String defaultFileName = System.getProperty("user.home") + "/bookshelf.txt"; | |
Scanner c = new Scanner(System.in); | |
System.out.printf("Please enter a path to a large text file [%s]:", defaultFileName); | |
String fileName = c.nextLine(); | |
c.close(); | |
if (fileName == null || fileName.isEmpty()) { | |
fileName = defaultFileName; | |
} | |
Path path = Paths.get(fileName); | |
if (!path.toFile().isFile()) { | |
System.err.println("The file " + path + " doesn't exist or is not a text file"); | |
System.exit(-1); | |
} | |
Runtime runtime = Runtime.getRuntime(); | |
System.err.printf("Found %s, of size %dMB\n", path, Files.size(path) / (1024 * 1024)); | |
System.gc(); | |
long beforeFlux = runtime.totalMemory() - runtime.freeMemory(); | |
System.err.printf("Memory in use before reading: %dMB\n\n", beforeFlux / (1024 * 1024)); | |
Flux<String> books = fluxVersion(path); | |
books.doOnNext(System.out::println) | |
.blockLast(); | |
listVersion(path); | |
} | |
private static Flux<String> fluxVersion(Path path) { | |
final Runtime runtime = Runtime.getRuntime(); | |
return fromPath(path) | |
.filter(s -> s.startsWith("Title: ") || s.startsWith("Author: ") | |
|| s.equalsIgnoreCase("##BOOKSHELF##")) | |
.map(s -> s.replaceFirst("Title: ", "")) | |
.map(s -> s.replaceFirst("Author: ", " by ")) | |
.windowWhile(s -> !s.contains("##")) | |
.flatMap(bookshelf -> bookshelf | |
.window(2) | |
.flatMap(bookInfo -> bookInfo.reduce(String::concat)) | |
.collectList() | |
.doOnNext(s -> System.gc()) | |
.flatMapMany(bookList -> Flux.just( | |
"\n\nFound new Bookshelf of " + bookList.size() + " books:", | |
bookList.toString(), | |
String.format("Memory in use while reading: %dMB\n", (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024)) | |
))); | |
} | |
private static Flux<String> fromPath(Path path) { | |
return Flux.using(() -> Files.lines(path), | |
Flux::fromStream, | |
BaseStream::close | |
); | |
} | |
private static void listVersion(Path path) throws IOException { | |
final Runtime runtime = Runtime.getRuntime(); | |
List<String> wholeData = Files.readAllLines(path); | |
List<String> books = new ArrayList<>(); | |
Iterator<String> iter = wholeData.iterator(); | |
String title = null; | |
while(iter.hasNext()) { | |
String line = iter.next(); | |
if (line.startsWith("Title: ")) { | |
title = line.replaceFirst("Title: ", ""); | |
} | |
else if (line.startsWith("Author: ")) { | |
String author = line.replaceFirst("Author: ", " by "); | |
books.add(title + author); | |
title = null; | |
} | |
else if (line.equalsIgnoreCase("##BOOKSHELF##")) { | |
System.gc(); | |
System.out.println("\n\nFound new bookshelf of " + books.size() + " books:"); | |
System.out.println(books); | |
System.out.printf("Memory in use while reading: %dMB\n", (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024)); | |
books.clear(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment