Skip to content

Instantly share code, notes, and snippets.

@skydoves
Last active September 20, 2024 13:22
Show Gist options
  • Save skydoves/fd60599cc81b8ed8c053fede01f47c77 to your computer and use it in GitHub Desktop.
Save skydoves/fd60599cc81b8ed8c053fede01f47c77 to your computer and use it in GitHub Desktop.
OnetimeWhileSubscribed
// Designed and developed by skydoves (Jaewoong Eum)
public class OnetimeWhileSubscribed(
private val stopTimeout: Long,
private val replayExpiration: Long = Long.MAX_VALUE,
) : SharingStarted {
private val hasCollected: MutableStateFlow<Boolean> = MutableStateFlow(false)
init {
require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" }
require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" }
}
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
combine(hasCollected, subscriptionCount) { collected, counts ->
collected to counts
}
.transformLatest { pair ->
val (collected, count) = pair
if (count > 0 && !collected) {
emit(SharingCommand.START)
hasCollected.value = true
} else {
delay(stopTimeout)
if (replayExpiration > 0) {
emit(SharingCommand.STOP)
delay(replayExpiration)
}
emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
}
}
.dropWhile {
it != SharingCommand.START
} // don't emit any STOP/RESET_BUFFER to start with, only START
.distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment