Last active
August 29, 2015 14:24
-
-
Save magnusart/0802295c0fafdf9b5028 to your computer and use it in GitHub Desktop.
Merge from two differently typed channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream.scaladsl._ | |
import akka.stream._ | |
import scala.util._ | |
import FanInShape._ | |
class EitherMergeShape[L, R]( _init: Init[Either[L, R]] = Name( "EitherMerge" ) ) | |
extends FanInShape[Either[L, R]]( _init ) { | |
val left = newInlet[L]( "left" ) | |
val right = newInlet[R]( "right" ) | |
protected override def construct( i: Init[Either[L, R]] ) = new EitherMergeShape( i ) | |
} | |
class EitherMerge[L, R] extends FlexiMerge[Either[L, R], EitherMergeShape[L, R]]( | |
new EitherMergeShape, Attributes.name( "EitherMerge" ) | |
) { | |
import FlexiMerge._ | |
override def createMergeLogic( p: PortT ) = new MergeLogic[Either[L, R]] { | |
override def initialState = | |
State[Any]( ReadPreferred[Any]( p.right, p.left ) ) { | |
// ^-- Here I get a compilation error. ReadPreferered inlets must have the same type. Why this constraint?! | |
( ctx, _, element ) ⇒ | |
val either = element match { | |
case left: L ⇒ Left( left ) | |
case right: R ⇒ Right( right ) | |
} | |
ctx.emit( either ) | |
SameState | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package apoyo | |
package storage | |
import akka.stream.scaladsl._ | |
import akka.stream._ | |
import scala.util._ | |
import scala.reflect._ | |
import FanInShape._ | |
class EitherMergeShape[L, R]( _init: Init[Either[L, R]] = Name( "EitherMerge" ) ) | |
extends FanInShape[Either[L, R]]( _init ) { | |
val left = newInlet[L]( "left" ) | |
val right = newInlet[R]( "right" ) | |
protected override def construct( i: Init[Either[L, R]] ) = new EitherMergeShape( i ) | |
} | |
class EitherMerge[L: ClassTag, R: ClassTag] extends FlexiMerge[Either[L, R], EitherMergeShape[L, R]]( | |
new EitherMergeShape, Attributes.name( "EitherMerge" ) | |
) { | |
import FlexiMerge._ | |
override def createMergeLogic( p: PortT ) = new MergeLogic[Either[L, R]] { | |
override def initialState = | |
State[Any]( ReadPreferred[Nothing]( p.right, p.left ).asInstanceOf[ReadPreferred[Any]] ) { | |
( ctx, _, element ) ⇒ | |
val either = element match { | |
case left: L ⇒ Left( left ) | |
case right: R ⇒ Right( right ) | |
} | |
ctx.emit( either ) | |
SameState | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Added modified version based on Joe Edwards answer on akka-users list.