Skip to content

Instantly share code, notes, and snippets.

@staakk
Last active June 17, 2018 21:23
Show Gist options
  • Save staakk/4f277023cd789fe216926d0199c395b4 to your computer and use it in GitHub Desktop.
Save staakk/4f277023cd789fe216926d0199c395b4 to your computer and use it in GitHub Desktop.
Processing only the latest emitted element.
import io.reactivex.BackpressureStrategy
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.PublishSubject
import io.reactivex.subjects.Subject
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val subject = PublishSubject.create<Int>().toSerialized()
val producer = Producer(subject)
val consumer = Consumer(subject)
producer.run()
}
class Consumer(subject: Subject<Int>) {
init {
subject.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.newThread(), false, 1)
.subscribeOn(Schedulers.newThread())
.subscribe(
this::onNext
)
}
private fun onNext(i: Int) {
try {
println("${Thread.currentThread().id}: subscribe\t$i")
Thread.sleep(1600) // do some work
} catch (e: InterruptedException) {
}
}
}
class Producer(val subject: Subject<Int>) : Runnable {
private var count = 0
override fun run() {
while (true) {
try {
Thread.sleep(500)
subject.onNext(count)
println("${Thread.currentThread().id}: emit\t\t$count")
count++
} catch (e: InterruptedException) {
subject.onComplete()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment