Skip to content

Instantly share code, notes, and snippets.

@ereshzealous
Created August 26, 2021 06:18
Show Gist options
  • Save ereshzealous/0ecc2f5b6e740acbc9ad0fccb92ecc58 to your computer and use it in GitHub Desktop.
Save ereshzealous/0ecc2f5b6e740acbc9ad0fccb92ecc58 to your computer and use it in GitHub Desktop.
package com.kafka.sample.kafka;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Created on 25/August/2021 By Author Eresh, Gorantla
**/
@Configuration
@Slf4j
public class KafkaConfiguration {
Random random = new Random();
@Bean
public Supplier<Flux<Integer>> fizzBuzzProducer(){
return () -> Flux.interval(Duration.ofSeconds(5)).map(value -> random.nextInt(1000 - 1) + 1).log();
}
@Bean
public Function<Flux<Integer>, Flux<String>> fizzBuzzProcessor(){
return longFlux -> longFlux
.map(i -> evaluateFizzBuzz(i))
.log();
}
@Bean
public Consumer<String> fizzBuzzConsumer(){
return (value) -> log.info("Consumer Received : " + value);
}
private String evaluateFizzBuzz(Integer value) {
if (value % 15 == 0) {
return "FizzBuzz";
} else if (value % 5 == 0) {
return "Buzz";
} else if (value % 3 == 0) {
return "Fizz";
} else {
return String.valueOf(value);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment