Skip to content

Instantly share code, notes, and snippets.

@Alotor
Last active January 16, 2018 13:14
Show Gist options
  • Save Alotor/0cebef8b8c3741b06700ab70c8556274 to your computer and use it in GitHub Desktop.
Save Alotor/0cebef8b8c3741b06700ab70c8556274 to your computer and use it in GitHub Desktop.
New files streaming

New files stream

This file creates a stream that will emit values for each new file created in the directory watching. It's only dependency is the RxJava library, and uses inside the java.nio2 watch service for new values.

@Grab("io.reactivex.rxjava2:rxjava:2.1.8")
import java.nio.file.*
import io.reactivex.*
Observable<Path> newFilesStream(Path path) {
Observable.create { emitter ->
println(">> Watching $path")
def parentWatchService = FileSystems.getDefault().newWatchService()
def foldersWatching = [ "$path" : parentWatchService ]
path.register(parentWatchService, StandardWatchEventKinds.ENTRY_CREATE)
while(true) {
newFolders = [:]
foldersWatching.each { entry ->
def curPath = Paths.get(entry.key)
def watchService = entry.value
def key = watchService.poll()
if (key) {
key.pollEvents().each { ev ->
def eventPath = curPath.resolve("${ev.context()}")
if (Files.isDirectory(eventPath) && !foldersWatching.containsKey("$eventPath") && !newFolders.containsKey("$eventPath")) {
def newService = FileSystems.getDefault().newWatchService()
eventPath.register(newService, StandardWatchEventKinds.ENTRY_CREATE)
newFolders["$eventPath"] = newService
println(">> Adding directory $eventPath to watch list")
}
emitter.onNext(eventPath)
}
key.reset()
}
}
foldersWatching << newFolders
}
}
}
newFilesStream(Paths.get("/tmp/watch")).take(10).forEach { Path path ->
println(">>> $path")
}
@Grab("commons-io:commons-io:2.6")
@Grab("io.reactivex.rxjava2:rxjava:2.1.8")
import org.apache.commons.io.monitor.*
import java.nio.file.*
import io.reactivex.*
Observable<Path> newFilesStream(Path path) {
Observable.create { emitter ->
def observer = new FileAlterationObserver(path.toFile())
observer.addListener(new FileAlterationListenerAdaptor() {
@Override
public void onDirectoryCreate(File file) {
emitter.onNext(file.toPath())
}
@Override
public void onFileCreate(File file) {
emitter.onNext(file.toPath())
}
})
def monitor = new FileAlterationMonitor(1000, observer)
emitter.cancellable = {
monitor.stop()
}
monitor.start()
}
}
newFilesStream(Paths.get("/mnt/shared")).take(5).forEach { Path path ->
println(">>> $path")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment