+-------------- +
Kafka Listener --> Business Logic --> Repository(RedisClient) --> | |
| Redis Streams |
API Controller --> Repository(Redis Puller) -------------------> | |
+---------------+
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@RestController | |
public class CustomerController { | |
private final RedisPuller redisPuller; | |
@GetMapping(value="/customer/{customerId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE) | |
public Flux<CustomerUpdatedEvent> getCustomerData(@PathVariable String customerId) { | |
return redisPuller.getStreamEvents(customerId, Instant.now().toEpochMilli()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private final Map<String,EventStream> streamsList = new ConcurrentHashMap<>(); | |
public Flux<CustomerUpdatedEvent> getStreamEvents(String eventStreamId, long fromMs) { | |
//add new eventStream to Set | |
EventStream eventStream = new EventStream(eventStreamId, fromMs, 0, new HashSet()); | |
streamsList.putIfAbsent(eventStreamId, eventStream); | |
//create a Flux and add the Sink to the EventStream to be used for publishing | |
Flux<Object> events = Flux.create(sink -> eventStream.addStream(sink)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RedisPuller implements AutoCloseable { | |
private final RedisTemplate<String,String> redisTemplate; | |
private final Map<String,EventStream> streamsList = new ConcurrentHashMap<>(); | |
private final Thread pullerThread = getPullerThread(this.streamsList); | |
public RedisPuller(RedisTemplate<String,String> redisTemplate) { | |
this.redisTemplate = redisTemplate; | |
this.pullerThread.start(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@RequiredArgsConstructor | |
public class RedisPuller { | |
private final RedisTemplate<String,String> redisTemplate; | |
private final Map<String,EventStream> streamsList = new ConcurrentHashMap<>(); | |
public void runPuller() { | |
//create stream array of StreamOffset objects | |
Set<StreamOffset<String>> streams = buildStreamOffsets(this.streamsList.values().stream()); | |
StreamOffset<String>[] streamArray = new StreamOffset[streams.size()]; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Value | |
public class EventStream { | |
private final String eventStreamId; | |
private final Long offsetTimeMs; | |
private final Long offsetCount; | |
private final Set<FluxSink<Object>> fluxSinks; | |
public void addStream(FluxSink<Object> newFluxSinkObject) { | |
synchronized(this) { | |
fluxSinks.add(newFluxSinkObject); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Map<String,EventStream> streamsList = new ConcurrentHashMap<>(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Repository | |
@RequiredArgsConstructor | |
public class CustomerRepository<E extends Identifiable> { | |
private final RedisTemplate<String,E> staticTemplate; | |
public void save(E event) { | |
staticTemplate.opsForStream().add(ObjectRecord.create(customer.getStringId(), event)); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
@RequiredArgsConstructor | |
public class CustomerListener { | |
private final CustomerRepository<CustomerUpdatedEvent> customerRepository; | |
@StreamListener("customer-events") | |
public void customerEventsConsumer(Flux<DataUpdatedEvent> event) { | |
event.map(this::parseEvent) | |
.doOnNext(MyService::doBusinessLogic) // returns CustomerUpdatedEvent |
Kafka Topic A, Partition 3 --> Service Instance 2 --> Reactive Mongo --> Service Instance 1 --> Event stream listener /customer1
NewerOlder