Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Backing code to Stack Overflow PublishSubject question
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.PublishSubject
import java.util.*
// see https://stackoverflow.com/questions/44634758/unexpected-behavior-with-rxjava2-publishsubject
fun withAutoConnect() {
val subject = PublishSubject.create<Int>()
val o1: Observable<String> =
subject.observeOn(Schedulers.newThread()).map { i: Int ->
println("${Thread.currentThread()} | ${Date()} | map => $i")
i.toString()
}.publish().autoConnect()
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 1")
subject.onNext(1)
Thread.sleep(1000)
println("${Thread.currentThread()} | ${Date()} | submitting 2")
subject.onNext(2)
Thread.sleep(1000)
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (4) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 3")
subject.onNext(3)
Thread.sleep(2000)
}
fun withOtherSubject() {
val subject = PublishSubject.create<Int>()
val o1 = PublishSubject.create<String>()
subject.observeOn(Schedulers.newThread()).subscribe { i: Int ->
println("${Thread.currentThread()} | ${Date()} | map => $i")
o1.onNext(i.toString())
}
o1.observeOn(Schedulers.newThread()).subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}
o1.observeOn(Schedulers.newThread()).subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}
o1.observeOn(Schedulers.newThread()).subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 1")
subject.onNext(1)
Thread.sleep(1000)
println("${Thread.currentThread()} | ${Date()} | submitting 2")
subject.onNext(2)
Thread.sleep(1000)
o1.observeOn(Schedulers.newThread()).subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (4) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 3")
subject.onNext(3)
Thread.sleep(2000)
}
fun main(args: Array<String>) {
println("======= Auto Connect =======")
withAutoConnect()
println("======= Other Subject =======")
withOtherSubject()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment