Skip to content

Instantly share code, notes, and snippets.

@JoolsF
Last active June 2, 2022 21:21
Show Gist options
  • Save JoolsF/105e28a8082ca9c1e4be72320f601e74 to your computer and use it in GitHub Desktop.
Save JoolsF/105e28a8082ca9c1e4be72320f601e74 to your computer and use it in GitHub Desktop.
Akka streams example 2
import akka.actor._
import akka.stream._
import akka.stream.scaladsl.Source
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
/*
* Run using foldAsync to accumulate results
*/
object TestStreamApp extends App {
private implicit val executionContext = ExecutionContext.Implicits.global
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
def process(page: Int): Future[Int] =
Future.successful(page)
val l: Stream[Int] = 1 #:: l.map(_ + 1)
val streamedTasks =
Source(l)
.takeWhile(_ <= 5)
.throttle(1, 3 seconds)
.runFoldAsync(0) { (acc, v) =>
process(v).map(v => acc + 1)
}
streamedTasks.onComplete {
case Success(amount) => println(s"Processed $amount values")
case Failure(ex) => println("Failed to completed tasks", ex)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment