Skip to content

Instantly share code, notes, and snippets.

@kretes
Last active January 4, 2017 15:43
Show Gist options
  • Save kretes/acbdcf918dcc4144d6b0e449262e49d2 to your computer and use it in GitHub Desktop.
Save kretes/acbdcf918dcc4144d6b0e449262e49d2 to your computer and use it in GitHub Desktop.
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import pl.allegro.offer.imagescore.floki.streams.TupledFlow
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
implicit val as = ActorSystem()
implicit val m = ActorMaterializer()
implicit val ec = ExecutionContext.global
def doLong() = {
Range(1, Integer.MAX_VALUE / 20).foreach(_ + 2)
}
doLong()
// bez niczego - 23 sec
val bef = System.currentTimeMillis()
val result: Future[_] = Source(Range(1,16)).via(Flow[Int].
map(a => {
doLong()
println(s"1st flow done long ($a)")
a
})
map(a => {
doLong()
println(s"2st flow done long ($a)")
a
})).toMat(Sink.seq)(Keep.right).run()
println(s"result = ${Await.result(result,1 minute)}")
println(s"time = ${(System.currentTimeMillis() - bef)}")
//sam async - rownolegle ida 1 i 2 ale po jednym - 14 sec
val bef = System.currentTimeMillis()
val result: Future[_] = Source(Range(1,16)).via(Flow[Int].
map(a => {
doLong()
println(s"1st flow done long ($a)")
a
}).async
map(a => {
doLong()
println(s"2st flow done long ($a)")
a
})).toMat(Sink.seq)(Keep.right).run()
println(s"result = ${Await.result(result,1 minute)}")
println(s"time = ${(System.currentTimeMillis() - bef)}")
//mapAsync(4) - pierwsze idzie rownlegle w 4, drugie idzie potem - 14 sec
val bef = System.currentTimeMillis()
val result: Future[_] = Source(Range(1,16)).via(Flow[Int].
mapAsync(4)(a => Future.apply({
doLong()
println(s"1st flow done long ($a)")
a
})).
map(a => {
doLong()
println(s"2st flow done long ($a)")
a
})).toMat(Sink.seq)(Keep.right).run()
println(s"result = ${Await.result(result,1 minute)}")
println(s"time = ${(System.currentTimeMillis() - bef)}")
//mapAsync(4) + async - 16 sec
val bef = System.currentTimeMillis()
val result: Future[_] = Source(Range(1,16)).via(Flow[Int].
mapAsync(4)(a => Future.apply({
doLong()
println(s"1st flow done long ($a)")
a
})).async
map(a => {
doLong()
println(s"2st flow done long ($a)")
a
})).toMat(Sink.seq)(Keep.right).run()
println(s"result = ${Await.result(result,1 minute)}")
println(s"time = ${(System.currentTimeMillis() - bef)}")
//2 x mapAsync(4) - 12 sec
val bef = System.currentTimeMillis()
val result: Future[_] = Source(Range(1,16)).via(Flow[Int].
mapAsync(4)(a => Future.apply({
doLong()
println(s"1st flow done long ($a)")
a
}))
.mapAsync(4)(a => Future.apply({
doLong()
println(s"2st flow done long ($a)")
a
}))).toMat(Sink.seq)(Keep.right).run()
println(s"result = ${Await.result(result,1 minute)}")
println(s"time = ${(System.currentTimeMillis() - bef)}")
//2 x mapAsync(4) z tupledflow - 15 sec
val bef = System.currentTimeMillis()
val result: Future[_] = Source(Range(1,16)).via(Flow[Int].
mapAsync(4)(a => Future.apply({
doLong()
println(s"1st flow done long ($a)")
a
})).via(TupledFlow.tupledFlow(Flow[Int]
.mapAsync(4)(a => Future.apply({
doLong()
println(s"2st flow done long ($a)")
a
}))))).toMat(Sink.seq)(Keep.right).run()
println(s"result = ${Await.result(result,1 minute)}")
println(s"time = ${(System.currentTimeMillis() - bef)}")
def tupledFlowAsync[In, Out](parallelism: Int)(flow: Flow[In, Out, _])(implicit ec: ExecutionContext): Flow[In, (In, Out), NotUsed] = {
Flow[In].mapAsync(parallelism)(in => Future(in)).flatMapConcat(in => Source.single(in).via(flow).map(out => in -> out))
}
def tupledFlowAsync4[In, Out](flow: Flow[In, Out, _]): Flow[In, (In, Out), NotUsed] = {
tupledFlowAsync(4)(flow)(ExecutionContext.global)
}
//2 x mapAsync(4) z tupledflowAsync - 16 sec
val bef = System.currentTimeMillis()
val result: Future[_] = Source(Range(1,16)).via(Flow[Int].
mapAsync(4)(a => Future.apply({
doLong()
println(s"1st flow done long ($a)")
a
})).via(tupledFlowAsync4(Flow[Int]
.mapAsync(4)(a => Future.apply({
doLong()
println(s"2st flow done long ($a)")
a
}))))).toMat(Sink.seq)(Keep.right).run()
println(s"result = ${Await.result(result,1 minute)}")
println(s"time = ${(System.currentTimeMillis() - bef)}")
//mapAsync(4) z tupledflowAsync + map = 16 sec
val bef = System.currentTimeMillis()
val result: Future[_] = Source(Range(1,16)).via(Flow[Int].
mapAsync(4)(a => Future.apply({
doLong()
println(s"1st flow done long ($a)")
a
})).via(tupledFlowAsync4(Flow[Int]
.map(a => {
doLong()
println(s"2st flow done long ($a)")
a
})))).toMat(Sink.seq)(Keep.right).run()
println(s"result = ${Await.result(result,1 minute)}")
println(s"time = ${(System.currentTimeMillis() - bef)}")
def tupledMapAsync4[In, Out](map: (In => Future[Out])): Flow[In, (In, Out), NotUsed] = {
Flow[In].mapAsync(4)(in => map(in).map(out => in -> out))
}
//mapAsync(4) z tupledMapAsync - 12 sec
val bef = System.currentTimeMillis()
val result: Future[_] = Source(Range(1,16)).via(Flow[Int].
mapAsync(4)(a => Future.apply({
doLong()
println(s"1st flow done long ($a)")
a
})).via(tupledMapAsync4(a => Future.apply({
doLong()
println(s"2st flow done long ($a)")
a
})))).toMat(Sink.seq)(Keep.right).run()
println(s"result = ${Await.result(result,1 minute)}")
println(s"time = ${(System.currentTimeMillis() - bef)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment