Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Experimentation with Rx (2)
import rx.Observable
import java.io.File
import rx.schedulers.Schedulers
import java.util.concurrent.Executors
/** Make it true to show some logs during computation. */
val DEBUG = true
/** Basic data structure to add information to a file reference. */
data class FileInfo(val file: File, var bytes: Long = 0, var filesCount: Long = 1)
/** A scheduler based on a thread pool fixed to N threads, where N is the number of available processors. */
val scheduler = Schedulers.from(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()))
/** Main function. */
fun main(args: Array<String>) {
Observable.just(File("/Users/joan/workspace/"))
.flatMap({ fileToFileInfo(it) })
.subscribe(
{ fileInfo -> println("[${Thread.currentThread().getName()}] $fileInfo") },
{ throwable -> throwable.printStackTrace() })
Thread.sleep(20000)
}
/** Recursive function to compute additional information on a file. */
fun fileToFileInfo(file: File): Observable<FileInfo> {
if (file.isFile()) {
return Observable.just(FileInfo(file, file.length()))
.print({ "emit\t\t${it.file}" })
} else if (file.isDirectory()) {
return Observable.defer({
Observable.from(file.listFiles())
.flatMap({ fileToFileInfo(it) })
.reduce(FileInfo(file), { acc, b ->
acc.bytes += b.bytes
acc.filesCount += b.filesCount
acc
})
.print({ "reduced\t${it.file}" })
}).subscribeOn(scheduler)
}
return Observable.error(IllegalArgumentException("$file is neither a file or a directory"))
}
/** Utility function that allows you to print the current thread along with a message in an Rx stream. */
fun <T> Observable<T>.print(message: (T) -> String = { "" }): Observable<T> {
if (!DEBUG) return this;
return this.map({ println("[${Thread.currentThread().getName()}] ${message(it)}"); it })
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment