Skip to content

Instantly share code, notes, and snippets.

@raphw
Created February 13, 2017 00:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save raphw/3ba783c298cabd04dfc8070a70e8076e to your computer and use it in GitHub Desktop.
Save raphw/3ba783c298cabd04dfc8070a70e8076e to your computer and use it in GitHub Desktop.
Monitors folder and reads it upon discovery
import java.io.IOException;
import java.nio.file.*;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class FileMonitor implements Runnable {
private final Path dir;
private final Consumer<Path> processor;
private final ErrorHandler errorHandler;
private final Predicate<Path> filter = path -> {
try {
return Files.isRegularFile(path) && !Files.isHidden(path);
} catch (IOException e) {
return false;
}
};
private volatile Status status = Status.OFFLINE;
public FileMonitor(Path dir, Consumer<Path> processor, ErrorHandler errorHandler) {
this.dir = dir;
this.processor = processor;
this.errorHandler = errorHandler;
}
public static void main(String[] args) throws Exception {
new FileMonitor(
Paths.get("/home/rafael/foo"),
path -> System.out.println("Processing: " + path),
Throwable::printStackTrace
).run();
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
status = Status.RUNNING;
try (WatchService watcher = FileSystems.getDefault().newWatchService()) {
dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE);
Set<Path> preexisting = new HashSet<>();
try (DirectoryStream<Path> files = Files.newDirectoryStream(dir, filter::test)) {
files.forEach(preexisting::add);
}
preexisting.stream().sorted(Comparator.comparingLong(path -> {
try {
return Files.getLastModifiedTime(path).toMillis();
} catch (IOException e) {
return Long.MAX_VALUE;
}
})).forEach(processor);
while (!Thread.interrupted()) {
WatchKey key = watcher.take();
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
throw new IOException("Too many concurrent operations in " + dir);
}
@SuppressWarnings("unchecked")
Path file = dir.resolve(((WatchEvent<Path>) event).context());
if (!preexisting.contains(file) && filter.test(file)) {
processor.accept(file);
}
}
if (!key.reset()) {
throw new IOException("Could not watch folder " + dir);
}
}
throw new InterruptedException(); // to break outer loop
} catch (IOException | RuntimeException e) {
status = Status.ERROR;
errorHandler.accept(e);
}
}
} catch (InterruptedException e) {
// Do nothing.
} finally {
status = Status.OFFLINE;
}
}
public Status getStatus() {
return status;
}
public interface ErrorHandler {
void accept(Exception e) throws InterruptedException;
}
public enum Status {
RUNNING, OFFLINE, ERROR
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment