Last active
January 4, 2017 15:43
-
-
Save kretes/acbdcf918dcc4144d6b0e449262e49d2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.actor.ActorSystem | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{Flow, Keep, Sink, Source} | |
import pl.allegro.offer.imagescore.floki.streams.TupledFlow | |
import scala.concurrent.duration.DurationInt | |
import scala.concurrent.{Await, ExecutionContext, Future} | |
implicit val as = ActorSystem() | |
implicit val m = ActorMaterializer() | |
implicit val ec = ExecutionContext.global | |
def doLong() = { | |
Range(1, Integer.MAX_VALUE / 20).foreach(_ + 2) | |
} | |
doLong() | |
// bez niczego - 23 sec | |
val bef = System.currentTimeMillis() | |
val result: Future[_] = Source(Range(1,16)).via(Flow[Int]. | |
map(a => { | |
doLong() | |
println(s"1st flow done long ($a)") | |
a | |
}) | |
map(a => { | |
doLong() | |
println(s"2st flow done long ($a)") | |
a | |
})).toMat(Sink.seq)(Keep.right).run() | |
println(s"result = ${Await.result(result,1 minute)}") | |
println(s"time = ${(System.currentTimeMillis() - bef)}") | |
//sam async - rownolegle ida 1 i 2 ale po jednym - 14 sec | |
val bef = System.currentTimeMillis() | |
val result: Future[_] = Source(Range(1,16)).via(Flow[Int]. | |
map(a => { | |
doLong() | |
println(s"1st flow done long ($a)") | |
a | |
}).async | |
map(a => { | |
doLong() | |
println(s"2st flow done long ($a)") | |
a | |
})).toMat(Sink.seq)(Keep.right).run() | |
println(s"result = ${Await.result(result,1 minute)}") | |
println(s"time = ${(System.currentTimeMillis() - bef)}") | |
//mapAsync(4) - pierwsze idzie rownlegle w 4, drugie idzie potem - 14 sec | |
val bef = System.currentTimeMillis() | |
val result: Future[_] = Source(Range(1,16)).via(Flow[Int]. | |
mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"1st flow done long ($a)") | |
a | |
})). | |
map(a => { | |
doLong() | |
println(s"2st flow done long ($a)") | |
a | |
})).toMat(Sink.seq)(Keep.right).run() | |
println(s"result = ${Await.result(result,1 minute)}") | |
println(s"time = ${(System.currentTimeMillis() - bef)}") | |
//mapAsync(4) + async - 16 sec | |
val bef = System.currentTimeMillis() | |
val result: Future[_] = Source(Range(1,16)).via(Flow[Int]. | |
mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"1st flow done long ($a)") | |
a | |
})).async | |
map(a => { | |
doLong() | |
println(s"2st flow done long ($a)") | |
a | |
})).toMat(Sink.seq)(Keep.right).run() | |
println(s"result = ${Await.result(result,1 minute)}") | |
println(s"time = ${(System.currentTimeMillis() - bef)}") | |
//2 x mapAsync(4) - 12 sec | |
val bef = System.currentTimeMillis() | |
val result: Future[_] = Source(Range(1,16)).via(Flow[Int]. | |
mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"1st flow done long ($a)") | |
a | |
})) | |
.mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"2st flow done long ($a)") | |
a | |
}))).toMat(Sink.seq)(Keep.right).run() | |
println(s"result = ${Await.result(result,1 minute)}") | |
println(s"time = ${(System.currentTimeMillis() - bef)}") | |
//2 x mapAsync(4) z tupledflow - 15 sec | |
val bef = System.currentTimeMillis() | |
val result: Future[_] = Source(Range(1,16)).via(Flow[Int]. | |
mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"1st flow done long ($a)") | |
a | |
})).via(TupledFlow.tupledFlow(Flow[Int] | |
.mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"2st flow done long ($a)") | |
a | |
}))))).toMat(Sink.seq)(Keep.right).run() | |
println(s"result = ${Await.result(result,1 minute)}") | |
println(s"time = ${(System.currentTimeMillis() - bef)}") | |
def tupledFlowAsync[In, Out](parallelism: Int)(flow: Flow[In, Out, _])(implicit ec: ExecutionContext): Flow[In, (In, Out), NotUsed] = { | |
Flow[In].mapAsync(parallelism)(in => Future(in)).flatMapConcat(in => Source.single(in).via(flow).map(out => in -> out)) | |
} | |
def tupledFlowAsync4[In, Out](flow: Flow[In, Out, _]): Flow[In, (In, Out), NotUsed] = { | |
tupledFlowAsync(4)(flow)(ExecutionContext.global) | |
} | |
//2 x mapAsync(4) z tupledflowAsync - 16 sec | |
val bef = System.currentTimeMillis() | |
val result: Future[_] = Source(Range(1,16)).via(Flow[Int]. | |
mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"1st flow done long ($a)") | |
a | |
})).via(tupledFlowAsync4(Flow[Int] | |
.mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"2st flow done long ($a)") | |
a | |
}))))).toMat(Sink.seq)(Keep.right).run() | |
println(s"result = ${Await.result(result,1 minute)}") | |
println(s"time = ${(System.currentTimeMillis() - bef)}") | |
//mapAsync(4) z tupledflowAsync + map = 16 sec | |
val bef = System.currentTimeMillis() | |
val result: Future[_] = Source(Range(1,16)).via(Flow[Int]. | |
mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"1st flow done long ($a)") | |
a | |
})).via(tupledFlowAsync4(Flow[Int] | |
.map(a => { | |
doLong() | |
println(s"2st flow done long ($a)") | |
a | |
})))).toMat(Sink.seq)(Keep.right).run() | |
println(s"result = ${Await.result(result,1 minute)}") | |
println(s"time = ${(System.currentTimeMillis() - bef)}") | |
def tupledMapAsync4[In, Out](map: (In => Future[Out])): Flow[In, (In, Out), NotUsed] = { | |
Flow[In].mapAsync(4)(in => map(in).map(out => in -> out)) | |
} | |
//mapAsync(4) z tupledMapAsync - 12 sec | |
val bef = System.currentTimeMillis() | |
val result: Future[_] = Source(Range(1,16)).via(Flow[Int]. | |
mapAsync(4)(a => Future.apply({ | |
doLong() | |
println(s"1st flow done long ($a)") | |
a | |
})).via(tupledMapAsync4(a => Future.apply({ | |
doLong() | |
println(s"2st flow done long ($a)") | |
a | |
})))).toMat(Sink.seq)(Keep.right).run() | |
println(s"result = ${Await.result(result,1 minute)}") | |
println(s"time = ${(System.currentTimeMillis() - bef)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment