Skip to content

Instantly share code, notes, and snippets.

@bijukunjummen
Created July 26, 2020 23:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bijukunjummen/fba4ee855a55fb4b635858a87b4033fa to your computer and use it in GitHub Desktop.
Save bijukunjummen/fba4ee855a55fb4b635858a87b4033fa to your computer and use it in GitHub Desktop.
fun produce(targetRate: Int, upto: Long): Flux<Long> {
    val delayBetweenEmits: Long = 1000L / targetRate
 
    return Flux.generate(
        { 1L },
        { state: Long, sink: SynchronousSink<Long> ->
            sleep(delayBetweenEmits)
            val nextState: Long = state + 1
            if (state > upto) {
                sink.complete()
                nextState
            } else {
                LOGGER.info("Emitted {}", state)
                sink.next(state)
                nextState
            }
        }
    )
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment