-
-
Save skydoves/fd60599cc81b8ed8c053fede01f47c77 to your computer and use it in GitHub Desktop.
OnetimeWhileSubscribed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Designed and developed by skydoves (Jaewoong Eum) | |
public class OnetimeWhileSubscribed( | |
private val stopTimeout: Long, | |
private val replayExpiration: Long = Long.MAX_VALUE, | |
) : SharingStarted { | |
private val hasCollected: MutableStateFlow<Boolean> = MutableStateFlow(false) | |
init { | |
require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" } | |
require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" } | |
} | |
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = | |
combine(hasCollected, subscriptionCount) { collected, counts -> | |
collected to counts | |
} | |
.transformLatest { pair -> | |
val (collected, count) = pair | |
if (count > 0 && !collected) { | |
emit(SharingCommand.START) | |
hasCollected.value = true | |
} else { | |
delay(stopTimeout) | |
if (replayExpiration > 0) { | |
emit(SharingCommand.STOP) | |
delay(replayExpiration) | |
} | |
emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE) | |
} | |
} | |
.dropWhile { | |
it != SharingCommand.START | |
} // don't emit any STOP/RESET_BUFFER to start with, only START | |
.distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment