Created
July 25, 2017 15:06
-
-
Save jvz/a82227a8b6598ef0feaaeba3aed9b2b5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.actor.ActorRef | |
import akka.pattern.AskableActorRef | |
import akka.stream.scaladsl.Source | |
import akka.util.Timeout | |
import scala.reflect.ClassTag | |
/** | |
* Generic oracle that answers queries with streams. | |
*/ | |
trait StreamOracle { | |
/** | |
* Asks this oracle a request with an expected `Source[T, NotUsed]` response stream type. | |
*/ | |
def ?[T](request: Any)(implicit tag: ClassTag[Source[T, NotUsed]]): Source[T, NotUsed] | |
} | |
object StreamOracle { | |
import scala.concurrent.duration._ | |
def apply(actor: ActorRef, timeout: Timeout = Timeout(5.seconds)): StreamOracle = | |
new ActorStreamOracle(new AskableActorRef(actor), timeout) | |
private class ActorStreamOracle(actor: AskableActorRef, implicit val timeout: Timeout) extends StreamOracle { | |
override def ?[T](request: Any)(implicit tag: ClassTag[Source[T, NotUsed]]): Source[T, NotUsed] = | |
Source.fromFutureSource { | |
(actor ? request).mapTo[Source[T, NotUsed]] | |
} mapMaterializedValue (_ => NotUsed) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment