Skip to content

Instantly share code, notes, and snippets.

@l15k4
Created January 16, 2016 18:30
Show Gist options
  • Save l15k4/633b31525f0728392fe0 to your computer and use it in GitHub Desktop.
Save l15k4/633b31525f0728392fe0 to your computer and use it in GitHub Desktop.
import akka.stream.SourceShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.TestSubscriber
class BypassMergeTestSuite extends AkkaSuite {
import scala.language.implicitConversions
def assertBypassWorks[O, B](output: List[O], bypassed: List[B])(launch: TestSubscriber.ManualProbe[O] => Unit) = {
val source = Source.fromGraph(
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(new BypassingMerge[O, B])
Source(output) ~> merge.in0
Source(bypassed) ~> merge.in1
SourceShape(merge.out)
}
)
val probe = TestSubscriber.manualProbe[O]
source.runWith(Sink.asPublisher(false))(materializer).subscribe(probe)
probe.expectSubscription().request(10)
launch(probe)
}
"Bypass string source with" - {
"more element" in assertBypassWorks(List(1,2), List("1","2","3","4")) { probe =>
List(1,2).foreach(probe.expectNext)
probe.expectComplete()
}
"the same element count" in assertBypassWorks(List(1,2,3,4), List("1","2","3","4")) { probe =>
List(1,2,3,4).foreach(probe.expectNext)
probe.expectComplete()
}
"less elements" in assertBypassWorks(List(1,2,3,4), List("1","2")) { probe =>
List(1,2,3,4).foreach(probe.expectNext)
probe.expectComplete()
}
"both elements empty" in assertBypassWorks(List.empty[Int], List.empty[String]) { probe =>
probe.expectComplete()
}
"bypassed elements empty" in assertBypassWorks(List(1,2), List.empty[String]) { probe =>
List(1,2).foreach(probe.expectNext)
probe.expectComplete()
}
"input elements empty" in assertBypassWorks(List.empty[Int], List("1","2")) { probe =>
probe.expectComplete()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment