Skip to content

Instantly share code, notes, and snippets.

@ypujante
Created July 5, 2017 14:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ypujante/2ab3c3a135272ea4bc4554cbfc287ca7 to your computer and use it in GitHub Desktop.
Save ypujante/2ab3c3a135272ea4bc4554cbfc287ca7 to your computer and use it in GitHub Desktop.
Backing code to Stack Overflow PublishSubject question
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.PublishSubject
import java.util.*
// see https://stackoverflow.com/questions/44634758/unexpected-behavior-with-rxjava2-publishsubject
fun withAutoConnect() {
val subject = PublishSubject.create<Int>()
val o1: Observable<String> =
subject.observeOn(Schedulers.newThread()).map { i: Int ->
println("${Thread.currentThread()} | ${Date()} | map => $i")
i.toString()
}.publish().autoConnect()
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 1")
subject.onNext(1)
Thread.sleep(1000)
println("${Thread.currentThread()} | ${Date()} | submitting 2")
subject.onNext(2)
Thread.sleep(1000)
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (4) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 3")
subject.onNext(3)
Thread.sleep(2000)
}
fun withOtherSubject() {
val subject = PublishSubject.create<Int>()
val o1 = PublishSubject.create<String>()
subject.observeOn(Schedulers.newThread()).subscribe { i: Int ->
println("${Thread.currentThread()} | ${Date()} | map => $i")
o1.onNext(i.toString())
}
o1.observeOn(Schedulers.newThread()).subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}
o1.observeOn(Schedulers.newThread()).subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}
o1.observeOn(Schedulers.newThread()).subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 1")
subject.onNext(1)
Thread.sleep(1000)
println("${Thread.currentThread()} | ${Date()} | submitting 2")
subject.onNext(2)
Thread.sleep(1000)
o1.observeOn(Schedulers.newThread()).subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (4) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 3")
subject.onNext(3)
Thread.sleep(2000)
}
fun main(args: Array<String>) {
println("======= Auto Connect =======")
withAutoConnect()
println("======= Other Subject =======")
withOtherSubject()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment