Skip to content

Instantly share code, notes, and snippets.

@orwir
Last active October 9, 2020 15:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save orwir/328ed47e2078e5c9483e0b67dde97f56 to your computer and use it in GitHub Desktop.
Save orwir/328ed47e2078e5c9483e0b67dde97f56 to your computer and use it in GitHub Desktop.
Reactive operators for LiveData
// see code files below
import android.arch.lifecycle.LiveData
import android.arch.lifecycle.MediatorLiveData
fun combineDataOf(vararg sources: LiveData<*>): LiveData<Array<*>> = Combine(sources)
internal class Combine(sources: Array<out LiveData<*>>) : MediatorLiveData<Array<*>>() {
private val collector = HashMap<Int, Any?>()
init {
sources.forEachIndexed { index, source ->
addSource(source) { value ->
collector[index] = value
if (collector.size == sources.size) { //all sources should post data at least once
postValue(Array(collector.size, { i -> collector[i] }))
}
}
}
}
}
import android.arch.lifecycle.LiveData
import android.arch.lifecycle.MediatorLiveData
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
fun <T> LiveData<T>.debounce(delay: Long, unit: TimeUnit = TimeUnit.MILLISECONDS): LiveData<T> = Debounce(this, delay, unit)
internal class Debounce<T>(source: LiveData<T>, delay: Long, unit: TimeUnit) : MediatorLiveData<T>() {
private var future: Future<*>? = null
private val executor = Executors.newSingleThreadExecutor()
init {
addSource(source) {
future?.cancel(true)
future = executor.submit {
Thread.sleep(unit.toMillis(delay))
postValue(it)
}
}
}
}
import android.arch.lifecycle.LiveData
import android.arch.lifecycle.MediatorLiveData
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
fun <T> LiveData<T>.delay(delay: Long, unit: TimeUnit = TimeUnit.MILLISECONDS): LiveData<T> = Delay(this, delay, unit)
internal class Delay<T>(source: LiveData<T>, delay: Long, unit: TimeUnit) : MediatorLiveData<T>() {
private val executor = Executors.newSingleThreadExecutor()
init {
addSource(source) {
executor.execute {
Thread.sleep(unit.toMillis(delay))
postValue(it)
}
}
}
}
import android.arch.lifecycle.LiveData
import android.arch.lifecycle.MediatorLiveData
fun <T> LiveData<T>.distinct(): LiveData<T> = Distinct(this)
internal class Distinct<T>(source: LiveData<T>) : MediatorLiveData<T>() {
init {
addSource(source) {
if (it != value) {
postValue(it)
}
}
}
}
import android.arch.lifecycle.LifecycleOwner
import android.arch.lifecycle.LiveData
import android.arch.lifecycle.MediatorLiveData
import android.arch.lifecycle.Observer
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
fun <T> LiveData<T>.take(count: Int): LiveData<T> = Take(this, count)
internal class Take<T>(source: LiveData<T>, count: Int) : MediatorLiveData<T>() {
private var takes = 0
private val observers = HashSet<Observer<T>>()
private val lock = ReentrantLock()
init {
addSource(source) {
if (takes++ < count) {
value = it
} else {
lock.withLock {
removeSource(source)
observers.forEach(::removeObserver)
observers.clear()
}
}
}
}
override fun observe(owner: LifecycleOwner, observer: Observer<T>) {
lock.withLock {
super.observe(owner, observer)
observers.add(observer)
}
}
override fun observeForever(observer: Observer<T>) {
lock.withLock {
super.observeForever(observer)
observers.add(observer)
}
}
}
import android.arch.lifecycle.LiveData
import android.arch.lifecycle.MediatorLiveData
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
fun <T> LiveData<T>.throttle(interval: Long, unit: TimeUnit = TimeUnit.MILLISECONDS): LiveData<T> = Throttle(this, interval, unit)
internal class Throttle<T>(source: LiveData<T>, interval: Long, unit: TimeUnit) : MediatorLiveData<T>() {
@Volatile
private var element: T? = null
@Volatile
private var future: Future<*>? = null
private val executor = Executors.newSingleThreadExecutor()
init {
addSource(source) {
element = it
if (future == null) {
future = executor.submit {
Thread.sleep(unit.toMillis(interval))
postValue(element)
future = null
}
}
}
}
}
fun zipDataOf(vararg sources: LiveData<*>): LiveData<Array<*>> = Zip(sources)
internal class Zip(sources: Array<out LiveData<*>>) : MediatorLiveData<Array<*>>() {
private val collector = HashMap<Int, Queue<Any?>>()
init {
sources.forEachIndexed { index, source ->
addSource(source) { value ->
collector.getOrPut(index, defaultValue = { LinkedList<Any?>() }).offer(value)
val ready = collector.values
.map { it.isNotEmpty() }
.reduce { all, single -> all && single }
if (ready && collector.size == sources.size) {
postValue(Array(collector.size) { i -> collector[i]!!.poll() })
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment