Skip to content

Instantly share code, notes, and snippets.

@channingwalton
Created April 28, 2016 08:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save channingwalton/545d9f58662172a97e32e518adac8d3a to your computer and use it in GitHub Desktop.
Save channingwalton/545d9f58662172a97e32e518adac8d3a to your computer and use it in GitHub Desktop.
Some concurrency testing for Rx
package reactive
import java.util.concurrent.TimeUnit
import rx.lang.scala.{Observable, Subject}
import scala.concurrent.duration.Duration
/**
* The problem: can two threads banging away from two sources be merged and have predictable results?
*
* In the first example we use 'merge'
*
* In the second we use a subject into which the generators push values.
*
* The test is to see if a buffer collecting the resulting values has everything we expect
*
* The results show that merging is successfully, but using a Subject is not.
*/
object RxScalaTinkering1 extends App {
var buffer: List[Gen] = Nil
case class Gen(g: String, v: Long)
val generator1: Observable[Gen] =
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g1", x))
val generator2: Observable[Gen] =
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g2", x))
val sub = generator1.merge(generator2).subscribe(v ⇒ { buffer = v :: buffer })
Thread.sleep(1000)
val result = buffer.reverse
val g1 = result.filter(_.g == "g1").map(_.v)
val g2 = result.filter(_.g == "g2").map(_.v)
println(g1.zipWithIndex)
println(g2.zipWithIndex)
println("g1 " + g1.zipWithIndex.forall { case (v, i) ⇒ v == i })
println("g2 " + g2.zipWithIndex.forall { case (v, i) ⇒ v == i })
}
/**
* Using a Subject.
*/
object RxScalaTinkering2 extends App {
var buffer: List[Gen] = Nil
val subject = Subject[Gen]()
val subscription = subject.subscribe(v ⇒ buffer = v :: buffer)
case class Gen(g: String, v: Long)
val generator1: Observable[Gen] =
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g1", x))
generator1.subscribe(subject.onNext _)
val generator2: Observable[Gen] =
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g2", x))
generator2.subscribe(subject.onNext _)
Thread.sleep(1000)
val result = buffer.reverse
val g1 = result.filter(_.g == "g1").map(_.v)
val g2 = result.filter(_.g == "g2").map(_.v)
println(g1.zipWithIndex)
println(g2.zipWithIndex)
println("g1 " + g1.zipWithIndex.forall { case (v, i) ⇒ v == i })
println("g2 " + g2.zipWithIndex.forall { case (v, i) ⇒ v == i })
}
/**
* Using two subjects and merging is successful
*/
object RxScalaTinkering3 extends App {
var buffer: List[Gen] = Nil
val subject1 = Subject[Gen]()
val subject2 = Subject[Gen]()
val subscription = subject1.merge(subject2).subscribe(v ⇒ buffer = v :: buffer)
case class Gen(g: String, v: Long)
val generator1: Observable[Gen] =
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g1", x))
generator1.subscribe(subject1.onNext _)
val generator2: Observable[Gen] =
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g2", x))
generator2.subscribe(subject2.onNext _)
Thread.sleep(1000)
val result = buffer.reverse
val g1 = result.filter(_.g == "g1").map(_.v)
val g2 = result.filter(_.g == "g2").map(_.v)
println(g1.zipWithIndex)
println(g2.zipWithIndex)
println("g1 " + g1.zipWithIndex.forall { case (v, i) ⇒ v == i })
println("g2 " + g2.zipWithIndex.forall { case (v, i) ⇒ v == i })
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment