Skip to content

Instantly share code, notes, and snippets.

@satorg
Created February 14, 2018 18:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save satorg/840f0ba50025281b98642167cd7a8869 to your computer and use it in GitHub Desktop.
Save satorg/840f0ba50025281b98642167cd7a8869 to your computer and use it in GitHub Desktop.
Monix Observable.zip3 bug
/*
* Java 1.8.0_162
* Scala 2.12.4
* SBT 1.1.0
* Monix 3.0.0-M3
*/
package satorg.monix_gs
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
import scala.concurrent._
import scala.concurrent.duration._
object BugObservableZip3App extends App {
val ob1 = Observable.fromIterable(1 to 3).map { x => println(s"ob1: $x"); x }
val ob2 = Observable.fromIterable(1 to 4).map { x => println(s"ob2: $x"); x }
val ob3 = Observable.fromIterable(1 to 5).map { x => println(s"ob3: $x"); x }
println("\nEXAMPLE1:")
Await.result(Observable.zip2(ob1, ob2).foreach(xx => println(s"res: $xx")), Duration.Inf)
// example1 works...
println("\nEXAMPLE2:")
Await.result(Observable.zip2(ob2, ob3).foreach(xx => println(s"res: $xx")), Duration.Inf)
// example2 works as well...
println("\nEXAMPLE3:")
Await.result(Observable.zip3(ob1, ob2, ob3).foreach(xx => println(s"res: $xx")), Duration.Inf)
// example3 either hangs or fails with the exception:
//
// java.lang.IllegalStateException: Promise already completed.
// at scala.concurrent.Promise.complete(Promise.scala:49)
// at scala.concurrent.Promise.complete$(Promise.scala:48)
// at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:183)
// at scala.concurrent.Promise.success(Promise.scala:82)
// at scala.concurrent.Promise.success$(Promise.scala:82)
// at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:183)
// at monix.reactive.internal.builders.Zip3Observable.monix$reactive$internal$builders$Zip3Observable$$signalOnComplete$1(Zip3Observable.scala:133)
// at monix.reactive.internal.builders.Zip3Observable$$anon$2.onComplete(Zip3Observable.scala:183)
// at monix.reactive.internal.operators.MapOperator$$anon$1.onComplete(MapOperator.scala:61)
// at monix.reactive.internal.builders.IteratorAsObservable.fastLoop(IteratorAsObservable.scala:176)
// at monix.reactive.internal.builders.IteratorAsObservable.$anonfun$reschedule$1(IteratorAsObservable.scala:109)
// at monix.reactive.internal.builders.IteratorAsObservable.$anonfun$reschedule$1$adapted(IteratorAsObservable.scala:104)
// at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
// at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
// ...
println("DONE") // never is printed :)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment