Skip to content

Instantly share code, notes, and snippets.

Last active September 20, 2024 13:22
Show Gist options
  • Save skydoves/fd60599cc81b8ed8c053fede01f47c77 to your computer and use it in GitHub Desktop.
Save skydoves/fd60599cc81b8ed8c053fede01f47c77 to your computer and use it in GitHub Desktop.
// Designed and developed by skydoves (Jaewoong Eum)
public class OnetimeWhileSubscribed(
private val stopTimeout: Long,
private val replayExpiration: Long = Long.MAX_VALUE,
) : SharingStarted {
private val hasCollected: MutableStateFlow<Boolean> = MutableStateFlow(false)
init {
require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" }
require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" }
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
combine(hasCollected, subscriptionCount) { collected, counts ->
collected to counts
.transformLatest { pair ->
val (collected, count) = pair
if (count > 0 && !collected) {
hasCollected.value = true
} else {
if (replayExpiration > 0) {
.dropWhile {
it != SharingCommand.START
} // don't emit any STOP/RESET_BUFFER to start with, only START
.distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment