Created
February 13, 2017 00:34
-
-
Save raphw/3ba783c298cabd04dfc8070a70e8076e to your computer and use it in GitHub Desktop.
Monitors folder and reads it upon discovery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.nio.file.*; | |
import java.util.Comparator; | |
import java.util.HashSet; | |
import java.util.Set; | |
import java.util.function.Consumer; | |
import java.util.function.Predicate; | |
public class FileMonitor implements Runnable { | |
private final Path dir; | |
private final Consumer<Path> processor; | |
private final ErrorHandler errorHandler; | |
private final Predicate<Path> filter = path -> { | |
try { | |
return Files.isRegularFile(path) && !Files.isHidden(path); | |
} catch (IOException e) { | |
return false; | |
} | |
}; | |
private volatile Status status = Status.OFFLINE; | |
public FileMonitor(Path dir, Consumer<Path> processor, ErrorHandler errorHandler) { | |
this.dir = dir; | |
this.processor = processor; | |
this.errorHandler = errorHandler; | |
} | |
public static void main(String[] args) throws Exception { | |
new FileMonitor( | |
Paths.get("/home/rafael/foo"), | |
path -> System.out.println("Processing: " + path), | |
Throwable::printStackTrace | |
).run(); | |
} | |
@Override | |
public void run() { | |
try { | |
while (!Thread.interrupted()) { | |
status = Status.RUNNING; | |
try (WatchService watcher = FileSystems.getDefault().newWatchService()) { | |
dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE); | |
Set<Path> preexisting = new HashSet<>(); | |
try (DirectoryStream<Path> files = Files.newDirectoryStream(dir, filter::test)) { | |
files.forEach(preexisting::add); | |
} | |
preexisting.stream().sorted(Comparator.comparingLong(path -> { | |
try { | |
return Files.getLastModifiedTime(path).toMillis(); | |
} catch (IOException e) { | |
return Long.MAX_VALUE; | |
} | |
})).forEach(processor); | |
while (!Thread.interrupted()) { | |
WatchKey key = watcher.take(); | |
for (WatchEvent<?> event : key.pollEvents()) { | |
WatchEvent.Kind<?> kind = event.kind(); | |
if (kind == StandardWatchEventKinds.OVERFLOW) { | |
throw new IOException("Too many concurrent operations in " + dir); | |
} | |
@SuppressWarnings("unchecked") | |
Path file = dir.resolve(((WatchEvent<Path>) event).context()); | |
if (!preexisting.contains(file) && filter.test(file)) { | |
processor.accept(file); | |
} | |
} | |
if (!key.reset()) { | |
throw new IOException("Could not watch folder " + dir); | |
} | |
} | |
throw new InterruptedException(); // to break outer loop | |
} catch (IOException | RuntimeException e) { | |
status = Status.ERROR; | |
errorHandler.accept(e); | |
} | |
} | |
} catch (InterruptedException e) { | |
// Do nothing. | |
} finally { | |
status = Status.OFFLINE; | |
} | |
} | |
public Status getStatus() { | |
return status; | |
} | |
public interface ErrorHandler { | |
void accept(Exception e) throws InterruptedException; | |
} | |
public enum Status { | |
RUNNING, OFFLINE, ERROR | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment