Skip to content

Instantly share code, notes, and snippets.

@wafer-li
Last active August 9, 2021 12:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wafer-li/94f2424e92b6b2ad705c663a34b99926 to your computer and use it in GitHub Desktop.
Save wafer-li/94f2424e92b6b2ad705c663a34b99926 to your computer and use it in GitHub Desktop.
Kotlin Flow emit when the upstream emit the same value [windowSize] times
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicInteger
private val defaultKeySelector: (Any?) -> Any? = { it }
private val defaultAreEquivalent: (Any?, Any?) -> Boolean = { old, new -> old == new }
fun <T> Flow<T>.equivalentUntilWindowed(windowSize: Int): Flow<T> {
return this.equivalentUntilWindowedBy(
windowSize = windowSize,
keySelector = defaultKeySelector,
areEquivalent = defaultAreEquivalent
)
}
@Suppress("UNCHECKED_CAST")
fun <T> Flow<T>.equivalentUntilWindowed(
windowSize: Int,
areEquivalent: (old: T, new: T) -> Boolean
): Flow<T> = equivalentUntilWindowedBy(
windowSize = windowSize,
keySelector = defaultKeySelector,
areEquivalent = areEquivalent as (Any?, Any?) -> Boolean,
)
fun <T, K> Flow<T>.equivalentUntilWindowedBy(windowSize: Int, keySelector: (T) -> K): Flow<T> =
equivalentUntilWindowedBy(
windowSize = windowSize,
keySelector = keySelector,
areEquivalent = defaultAreEquivalent
)
fun <T> Flow<T>.equivalentUntilWindowedBy(
windowSize: Int,
keySelector: (T) -> Any?,
areEquivalent: (old: Any?, new: Any?) -> Boolean
): Flow<T> = when {
this is EquivalentUntilWindowedFlowImpl<*>
&& this.keySelector == keySelector
&& this.areEquivalent == areEquivalent
&& this.size == windowSize -> this // same
else -> EquivalentUntilWindowedFlowImpl(
upstream = this,
size = windowSize,
keySelector = keySelector,
areEquivalent = areEquivalent
)
}
class EquivalentUntilWindowedFlowImpl<T>(
private val upstream: Flow<T>,
val size: Int,
@JvmField val keySelector: (T) -> Any?,
@JvmField val areEquivalent: (old: Any?, new: Any?) -> Boolean
) : Flow<T> {
private val count = AtomicInteger()
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>) {
var previousKey: Any? = null
upstream.collect upstreamCollect@{ value ->
val key = keySelector(value)
if (areEquivalent(previousKey, key)) {
val originCount = count.get()
if (originCount == size) {
return@upstreamCollect
}
val newCount = count.incrementAndGet()
if (checkCount(newCount)) {
collector.emit(value)
}
} else {
previousKey = key
count.set(0)
}
}
}
private fun checkCount(newCount: Int): Boolean {
return newCount == size
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment