Skip to content

Instantly share code, notes, and snippets.

@magnusart
Last active August 29, 2015 14:24
Show Gist options
  • Save magnusart/0802295c0fafdf9b5028 to your computer and use it in GitHub Desktop.
Save magnusart/0802295c0fafdf9b5028 to your computer and use it in GitHub Desktop.
Merge from two differently typed channels
import akka.stream.scaladsl._
import akka.stream._
import scala.util._
import FanInShape._
class EitherMergeShape[L, R]( _init: Init[Either[L, R]] = Name( "EitherMerge" ) )
extends FanInShape[Either[L, R]]( _init ) {
val left = newInlet[L]( "left" )
val right = newInlet[R]( "right" )
protected override def construct( i: Init[Either[L, R]] ) = new EitherMergeShape( i )
}
class EitherMerge[L, R] extends FlexiMerge[Either[L, R], EitherMergeShape[L, R]](
new EitherMergeShape, Attributes.name( "EitherMerge" )
) {
import FlexiMerge._
override def createMergeLogic( p: PortT ) = new MergeLogic[Either[L, R]] {
override def initialState =
State[Any]( ReadPreferred[Any]( p.right, p.left ) ) {
// ^-- Here I get a compilation error. ReadPreferered inlets must have the same type. Why this constraint?!
( ctx, _, element ) ⇒
val either = element match {
case left: L ⇒ Left( left )
case right: R ⇒ Right( right )
}
ctx.emit( either )
SameState
}
}
}
package apoyo
package storage
import akka.stream.scaladsl._
import akka.stream._
import scala.util._
import scala.reflect._
import FanInShape._
class EitherMergeShape[L, R]( _init: Init[Either[L, R]] = Name( "EitherMerge" ) )
extends FanInShape[Either[L, R]]( _init ) {
val left = newInlet[L]( "left" )
val right = newInlet[R]( "right" )
protected override def construct( i: Init[Either[L, R]] ) = new EitherMergeShape( i )
}
class EitherMerge[L: ClassTag, R: ClassTag] extends FlexiMerge[Either[L, R], EitherMergeShape[L, R]](
new EitherMergeShape, Attributes.name( "EitherMerge" )
) {
import FlexiMerge._
override def createMergeLogic( p: PortT ) = new MergeLogic[Either[L, R]] {
override def initialState =
State[Any]( ReadPreferred[Nothing]( p.right, p.left ).asInstanceOf[ReadPreferred[Any]] ) {
( ctx, _, element ) ⇒
val either = element match {
case left: L ⇒ Left( left )
case right: R ⇒ Right( right )
}
ctx.emit( either )
SameState
}
}
}
@magnusart
Copy link
Author

Added modified version based on Joe Edwards answer on akka-users list.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment