Skip to content

Instantly share code, notes, and snippets.

@csotiriou
Created May 21, 2020 06:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save csotiriou/21522e21be4d9fa28ea4f3f1a09d64b8 to your computer and use it in GitHub Desktop.
Save csotiriou/21522e21be4d9fa28ea4f3f1a09d64b8 to your computer and use it in GitHub Desktop.
Sample RxJava large file process line-by-line
/**
* Opens a file at a specified URL and returns a Flowable ready to emit
* its contents line by line.
*
* @param filepath the file path to open
* @return a stream ready to be consumed. Its output will be a file line on each emission.
*/
public Flowable<String> openFileAsStream(String filepath) {
return Flowable.using(
() -> new BufferedReader(new FileReader(filepath)),
reader -> Flowable.fromIterable(() -> reader.lines().iterator()),
BufferedReader::close
);
}
openFileAsStream(filePath)
.parallel(4)
.runOn(Schedulers.io())
.map(this::processFileLineForOperations)
.sequential()
.buffer(100)
.onBackpressureBuffer(backPressureBufferSize)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment