Skip to content

Instantly share code, notes, and snippets.

@swsnr
Last active August 9, 2019 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save swsnr/17edc1b73b3f0df0f9edd11a7d8b7716 to your computer and use it in GitHub Desktop.
Save swsnr/17edc1b73b3f0df0f9edd11a7d8b7716 to your computer and use it in GitHub Desktop.
Akka Stream operator to split a stream by type
import akka.NotUsed
import akka.http.scaladsl.model.ws
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL}
import akka.stream.{FanOutShape, Graph, Outlet}
import scala.collection.immutable
/**
* Partition websocket messages into binary and text messages.
*
* The flow takes websocket message and partitions them into two outlets, one
* for binary messages, the other for text messages.
*/
object MessageByType {
def apply(): Graph[Shape, NotUsed] =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[ws.Message](2))
val collectText = b.add(Flow[ws.Message].collectType[ws.TextMessage])
val collectBinary = b.add(Flow[ws.Message].collectType[ws.BinaryMessage])
bcast.out(0) ~> collectBinary.in
bcast.out(1) ~> collectText.in
new Shape(
FanOutShape
.Ports(
bcast.in,
immutable.Seq(collectText.out, collectBinary.out)
)
)
}
/**
* Shape of the partition by type graph.
*/
class Shape(init: FanOutShape.Init[ws.Message] = FanOutShape.Name("MessageByType"))
extends FanOutShape[ws.Message](init) {
override protected def construct(init: FanOutShape.Init[Message]): FanOutShape[Message] =
new Shape(init)
val textOut: Outlet[TextMessage] = newOutlet[ws.TextMessage]("textOut")
val binaryOut: Outlet[BinaryMessage] = newOutlet[ws.BinaryMessage]("binaryOut")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment