Skip to content

Instantly share code, notes, and snippets.

@johanandren
Created April 11, 2013 08:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johanandren/5361674 to your computer and use it in GitHub Desktop.
Save johanandren/5361674 to your computer and use it in GitHub Desktop.
A version of Tomasz Nurkiewicz (http://nurkiewicz.blogspot.se/2013/04/watchservice-combined-with-akka-actors.html) file system watch actor without using an extra thread and instead poll the watch service from the actor at regular intervals.
package markatta
import java.nio.file._
import java.nio.file.StandardWatchEventKinds._
import java.nio.file.attribute.BasicFileAttributes
import collection.JavaConversions._
import concurrent.duration._
import akka.actor.{Cancellable, ActorRef, Actor}
import markatta.FileSystemWatchActor._
object FileSystemWatchActor {
// messages for interacting with the actor
// accepted
case class WatchDir(path: Path, listener: ActorRef)
case class StopWatchingDir(path: Path, listener: ActorRef)
// sent
case class Created(path: Path)
case class Deleted(path: Path)
case class Changed(path: Path)
// internal
case object CheckForNewEvents
}
class FileSystemWatchActor extends Actor {
import context.dispatcher
var listeners = Seq[(Path, ActorRef)]()
var watchService: Option[WatchService] = None
var newEventsCancellable: Option[Cancellable] = None
override def preStart() {
watchService = Some(FileSystems.getDefault.newWatchService())
newEventsCancellable = Some(context.system.scheduler.schedule(100 millisecond, 100 millisecond)(self ! CheckForNewEvents))
}
override def postStop() {
watchService.foreach(_.close())
watchService = None
newEventsCancellable.foreach(_.cancel())
newEventsCancellable = None
}
def receive = {
case WatchDir(path, listener) =>
listeners = listeners :+ (path, listener)
watchRecursively(path)
case StopWatchingDir(path, listener) =>
listeners = listeners.filterNot(_ == (path, listener))
case CheckForNewEvents => pollEvents()
}
def informListeners(message: AnyRef, path: Path) {
listeners.filter { case (path, _ ) =>
path.startsWith(path)
}.foreach { case (_, listener) =>
listener ! message
}
}
def pollEvents() {
val maybeKey = for {
service <- watchService
key <- Option(service.poll())
} yield key
maybeKey.map { key =>
key.pollEvents().foreach { event: WatchEvent[_] =>
val relativePath = event.context().asInstanceOf[Path]
val path = key.watchable().asInstanceOf[Path].resolve(relativePath)
val message = event.kind() match {
case ENTRY_CREATE =>
if (path.toFile.isDirectory) watchRecursively(path)
Created(path)
case ENTRY_DELETE => Deleted(path)
case ENTRY_MODIFY => Changed(path)
}
informListeners(message, path)
}
key.reset()
}
}
def watchRecursively(root: Path) {
watch(root)
Files.walkFileTree(root, new SimpleFileVisitor[Path] {
override def preVisitDirectory(dir: Path, attrs: BasicFileAttributes) = {
watch(dir)
FileVisitResult.CONTINUE
}
})
}
private def watch(path: Path) {
watchService.foreach(service => path.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment