Created
June 11, 2015 01:36
-
-
Save JavadocMD/13b647bb8c7f64951c44 to your computer and use it in GitHub Desktop.
Example of combining Observables and Actors for a workflow.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ornithoptergames.psav | |
import java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
import com.beachape.filemanagement.RxMonitor | |
import com.ornithoptergames.psav.FrameInfoLoader._ | |
import com.ornithoptergames.psav.Messages._ | |
import com.ornithoptergames.psav.RxMessage.Implicits._ | |
import akka.actor.ActorSystem | |
import akka.util.Timeout | |
import com.beachape.filemanagement.Messages.EventAtPath | |
import com.beachape.filemanagement.Messages.EventAtPath | |
class FileManager(implicit system: ActorSystem) { | |
// fileWatcher will monitor a path and emit the file when it's modified. | |
val fileWatcher = RxMonitor() | |
// When we see a new file, change fileWatcher's watching path. | |
newFile.subscribe { file => | |
// Registration is "bossy" by default here: don't have to worry about unregistering old paths. | |
fileWatcher.registerPath(ENTRY_MODIFY, file.toPath()) | |
} | |
// When fileWatcher sees a reload, publish an updated file. | |
fileWatcher.observable.forwardTo(updateFile, (event: EventAtPath) => event.path.toFile) | |
implicit val timeout = Timeout(15 seconds) | |
val loader = system.actorSelection(system / FrameInfoLoader.actorName) | |
// When a file is updated or the frame-name exclude list changes, publish updated frames. | |
updateFile.combineLatest(frameFilter).map(Load.tupled) | |
.pipeThrough(loader, updateFrames, { case t => t.printStackTrace() }) | |
// When a new file is selected, publish new frames. | |
val mostRecentFrameFilter = new MostRecent(frameFilter) | |
newFile.map(f => Load(f, mostRecentFrameFilter.get)) | |
.pipeThrough(loader, newFrames, { case t => t.printStackTrace() }) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment