Skip to content

Instantly share code, notes, and snippets.

@peavers
Last active October 30, 2021 21:59
Show Gist options
  • Save peavers/a874ace2067f122cad11d47b5785eaf4 to your computer and use it in GitHub Desktop.
Save peavers/a874ace2067f122cad11d47b5785eaf4 to your computer and use it in GitHub Desktop.
Utility class for working with Flux publishers.
import java.time.Duration;
import lombok.experimental.UtilityClass;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;
@UtilityClass
public class FluxUtils {
/**
* Create the effect of rate limiting. The flux is split into windows of smaller amounts, and
* paused for the duration emitting the next value.
*
* <p>Call example flux.transform(flux -> FluxUtils.rateLimit(flux, 20, Duration.ofMinutes(1))
*
* @param flux Flux to rate limit the throughput of items
* @param size int how many items to let through in each window
* @param delay Duration how long to wait between releasing each window of items
*/
public static <T> Publisher<T> rateLimit(
final Flux<T> flux, final int size, final Duration delay) {
return flux.window(size).zipWith(Flux.interval(Duration.ZERO, delay)).flatMap(Tuple2::getT1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment