Skip to content

Instantly share code, notes, and snippets.

@johanandren
Created September 3, 2018 17:35
Show Gist options
  • Save johanandren/685dee068f38f41bec7a1f6165b39f70 to your computer and use it in GitHub Desktop.
Save johanandren/685dee068f38f41bec7a1f6165b39f70 to your computer and use it in GitHub Desktop.
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{GraphDSL, Partition, Sink, Source}
import akka.stream.{ActorMaterializer, SinkShape}
import scala.reflect.ClassTag
object PartitionOnType extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
case class PartitionOnType[T](private val typesAndSinks: List[(Class[_], Sink[_, _])] = Nil) {
def addSink[B <: T](sink: Sink[B, Any])(implicit ct: ClassTag[B]): PartitionOnType[T] = {
new PartitionOnType[T]((ct.runtimeClass, sink) :: typesAndSinks)
}
def build: Sink[T, NotUsed] = {
val partitions = typesAndSinks.reverse
val typeIndexes = partitions.map(_._1).toArray
Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(Partition[T](partitions.size,
{ t =>
val downStreamIdx = typeIndexes.indexOf(t.getClass)
if (downStreamIdx > -1) downStreamIdx
else throw new IllegalArgumentException(s"Unknown element type ${t.getClass}")
}
))
partitions.zipWithIndex.foreach { case ((_, sink), idx) =>
// we know the types are safe because the addSink signature
partition.out(idx).as[AnyRef] ~> sink.asInstanceOf[Sink[AnyRef, AnyRef]]
}
SinkShape(partition.in)
})
}
}
trait Animal
case class Cat() extends Animal
case class Dog() extends Animal
val dogSink = Sink.foreach[Dog](d => println(s"Dog: $d"))
val catSink = Sink.foreach[Cat](c => println(s"Cat: $c"))
val sinks = new PartitionOnType[Animal]()
.addSink(dogSink)
.addSink(catSink)
.build
val animalSource = Source(List(Cat(), Dog(), Cat(), Dog()))
animalSource.to(sinks).run()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment