Skip to content

Instantly share code, notes, and snippets.

@mathieuancelin
Created June 22, 2017 08:15
Show Gist options
  • Save mathieuancelin/b6185632b933624b514774579300f7f5 to your computer and use it in GitHub Desktop.
Save mathieuancelin/b6185632b933624b514774579300f7f5 to your computer and use it in GitHub Desktop.
import java.util.concurrent.atomic.AtomicInteger
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BroadcastHub, Keep, RunnableGraph, Source}
import scala.concurrent.duration._
implicit val actorSystem = ActorSystem("Pouet")
implicit val materializer = ActorMaterializer.create(actorSystem)
implicit val ec = actorSystem.dispatcher
val count = new AtomicInteger(0)
val producer = Source.tick(1.second, 1.second, s"New message ").map(str => str + count.incrementAndGet())
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
producer.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
val fromProducer: Source[String, NotUsed] = runnableGraph.run()
fromProducer.runForeach(msg => println("consumer1: " + msg))
actorSystem.scheduler.scheduleOnce(3.seconds)(fromProducer.runForeach(msg => println("consumer2: " + msg)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment