Skip to content

Instantly share code, notes, and snippets.

@fmasion
Created March 21, 2018 13:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fmasion/24707eaeb728ddaec8522989ed7d9b4d to your computer and use it in GitHub Desktop.
Save fmasion/24707eaeb728ddaec8522989ed7d9b4d to your computer and use it in GitHub Desktop.
package com.lightbend.akka.http.sample
import java.util.UUID
import akka.NotUsed
import akka.actor.{ ActorSystem, Cancellable }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
object MergeSource extends App {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContext = system.dispatcher
case object TICK
val ticker1: Source[TICK.type, Cancellable] = Source.tick(0.second, 1200.millisecond, TICK)
val ticker3: Source[TICK.type, Cancellable] = Source.tick(0.second, 3.second, TICK)
val ticker10: Source[TICK.type, Cancellable] = Source.tick(0.second, 10500.millisecond, TICK)
val cycler123: Source[String, NotUsed] = Source.cycle(() => List("1", "2", "3").iterator)
val cyclerABC: Source[String, NotUsed] = Source.cycle(() => List("A1", "B1", "C1").iterator)
val cyclerABC2: Source[String, NotUsed] = Source.cycle(() => List("A2", "B2", "C2").iterator)
val t123: Source[String, Cancellable] = ticker1.zip(cycler123).map { _._2 } //zip to "sync" with tickers
val tABC: Source[String, Cancellable] = ticker3.zip(cyclerABC).map { _._2 }
val tABC10: Source[String, Cancellable] = ticker10.zip(cyclerABC2).map { _._2 }
val listSource: List[Source[String, Cancellable]] = List(tABC, t123, tABC10)
val source: Source[List[String], Cancellable] = listSource
.map(s => (UUID.randomUUID(), s)) //assign a UUID to each source
.map { case (uuid, source) => source.map(value => (uuid, value)) } //Each source now emits each value with the source's uuid
.reduce(_.merge(_)) // merge in one source of tuples // warn not safe if empty source
.scan(mutable.Map.empty[UUID, String])((acc, next) => acc += (next._1 -> next._2))
.map(_.values.toList)
source.runForeach(println)
}
@fmasion
Copy link
Author

fmasion commented Mar 21, 2018

emit this kind of result :
List()
List(A)
List(A, A)
List(1, A, A)
List(2, A, A)
List(3, A, A)
List(3, B, A)
List(1, B, A)
List(2, B, A)
List(3, B, A)
List(3, C, A)
List(1, C, A)
List(2, C, A)
List(2, A, A)
List(3, A, A)
List(3, A, B)
List(1, A, B)
List(1, B, B)
List(2, B, B)
List(3, B, B)
List(1, B, B)
List(1, C, B)
List(2, C, B)
List(3, C, B)
List(1, C, B)
List(1, A, B)
List(2, A, B)
List(3, A, B)
List(3, A, C)
List(3, B, C)
List(1, B, C)
List(2, B, C)
List(2, C, C)
List(3, C, C)
List(1, C, C)
List(2, C, C)
List(2, A, C)
List(3, A, C)
List(1, A, C)
List(2, A, C)
List(2, B, C)
List(3, B, C)
List(3, B, A)
List(1, B, A)
List(1, C, A)
List(2, C, A)
List(3, C, A)
List(1, C, A)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment