Skip to content

Instantly share code, notes, and snippets.

@jvz
Created July 25, 2017 15:06
Show Gist options
  • Save jvz/a82227a8b6598ef0feaaeba3aed9b2b5 to your computer and use it in GitHub Desktop.
Save jvz/a82227a8b6598ef0feaaeba3aed9b2b5 to your computer and use it in GitHub Desktop.
import akka.NotUsed
import akka.actor.ActorRef
import akka.pattern.AskableActorRef
import akka.stream.scaladsl.Source
import akka.util.Timeout
import scala.reflect.ClassTag
/**
* Generic oracle that answers queries with streams.
*/
trait StreamOracle {
/**
* Asks this oracle a request with an expected `Source[T, NotUsed]` response stream type.
*/
def ?[T](request: Any)(implicit tag: ClassTag[Source[T, NotUsed]]): Source[T, NotUsed]
}
object StreamOracle {
import scala.concurrent.duration._
def apply(actor: ActorRef, timeout: Timeout = Timeout(5.seconds)): StreamOracle =
new ActorStreamOracle(new AskableActorRef(actor), timeout)
private class ActorStreamOracle(actor: AskableActorRef, implicit val timeout: Timeout) extends StreamOracle {
override def ?[T](request: Any)(implicit tag: ClassTag[Source[T, NotUsed]]): Source[T, NotUsed] =
Source.fromFutureSource {
(actor ? request).mapTo[Source[T, NotUsed]]
} mapMaterializedValue (_ => NotUsed)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment