Skip to content

Instantly share code, notes, and snippets.

@robvadai
Last active May 8, 2018 21:34
Show Gist options
  • Save robvadai/d9269a56945d3acb96eef21d4000d76c to your computer and use it in GitHub Desktop.
Save robvadai/d9269a56945d3acb96eef21d4000d76c to your computer and use it in GitHub Desktop.
Queue implementation in Akka Streams
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.logging.log4j.LogManager
import scala.concurrent.Future
object TestQueue extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
private val Logger = LogManager.getLogger(getClass)
val writingFlow = Source.queue[String](8, OverflowStrategy.backpressure).grouped(4).map { msgs =>
Logger.info(s"Writing file of ${msgs.length} messages")
Logger.info(s"Messages: $msgs")
Thread.sleep(2000L)
Logger.info(s"Finished writing")
}.mapAsync(1) { _ =>
Future {
Logger.info(s"Uploading file")
Thread.sleep(10000L)
Logger.info(s"Finished uploading")
}
}.to(Sink.ignore).run()
val messageStreamingFlow = Source.actorRef[String](10, OverflowStrategy.fail).map { msg =>
Logger.info(s"Received $msg")
msg
}.to(Sink.foreach(writingFlow offer _)).run()
(1 to 1000).foreach { msg =>
messageStreamingFlow ! s"$msg"
Thread.sleep(500L)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment