Skip to content

Instantly share code, notes, and snippets.

View razorcd's full-sized avatar

Cristian Dugacicu razorcd

View GitHub Profile
@razorcd
razorcd / code9_RedisPuller4.java
Created November 22, 2020 22:18
Scaling reactive APIs - www.razorcodes.com
@RestController
public class CustomerController {
private final RedisPuller redisPuller;
@GetMapping(value="/customer/{customerId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<CustomerUpdatedEvent> getCustomerData(@PathVariable String customerId) {
return redisPuller.getStreamEvents(customerId, Instant.now().toEpochMilli());
}
}
@razorcd
razorcd / code8_RedisPuller3.java
Created November 22, 2020 22:16
Scaling reactive APIs - www.razorcodes.com
private final Map<String,EventStream> streamsList = new ConcurrentHashMap<>();
public Flux<CustomerUpdatedEvent> getStreamEvents(String eventStreamId, long fromMs) {
//add new eventStream to Set
EventStream eventStream = new EventStream(eventStreamId, fromMs, 0, new HashSet());
streamsList.putIfAbsent(eventStreamId, eventStream);
//create a Flux and add the Sink to the EventStream to be used for publishing
Flux<Object> events = Flux.create(sink -> eventStream.addStream(sink));
@razorcd
razorcd / code7_RedisPuller2.java
Created November 22, 2020 22:12
Scaling reactive APIs - www.razorcodes.com
public class RedisPuller implements AutoCloseable {
private final RedisTemplate<String,String> redisTemplate;
private final Map<String,EventStream> streamsList = new ConcurrentHashMap<>();
private final Thread pullerThread = getPullerThread(this.streamsList);
public RedisPuller(RedisTemplate<String,String> redisTemplate) {
this.redisTemplate = redisTemplate;
this.pullerThread.start();
}
@razorcd
razorcd / code6_RedisPuller.java
Created November 22, 2020 22:08
Scaling reactive APIs - www.razorcodes.com
@RequiredArgsConstructor
public class RedisPuller {
private final RedisTemplate<String,String> redisTemplate;
private final Map<String,EventStream> streamsList = new ConcurrentHashMap<>();
public void runPuller() {
//create stream array of StreamOffset objects
Set<StreamOffset<String>> streams = buildStreamOffsets(this.streamsList.values().stream());
StreamOffset<String>[] streamArray = new StreamOffset[streams.size()];
@razorcd
razorcd / code5_EventStream.java
Created November 22, 2020 22:05
Scaling reactive APIs - www.razorcodes.com
@Value
public class EventStream {
private final String eventStreamId;
private final Long offsetTimeMs;
private final Long offsetCount;
private final Set<FluxSink<Object>> fluxSinks;
public void addStream(FluxSink<Object> newFluxSinkObject) {
synchronized(this) {
fluxSinks.add(newFluxSinkObject);
@razorcd
razorcd / code4_streamsList.java
Last active November 22, 2020 22:03
Scaling reactive APIs - www.razorcodes.com
Map<String,EventStream> streamsList = new ConcurrentHashMap<>();
@Repository
@RequiredArgsConstructor
public class CustomerRepository<E extends Identifiable> {
private final RedisTemplate<String,E> staticTemplate;
public void save(E event) {
staticTemplate.opsForStream().add(ObjectRecord.create(customer.getStringId(), event));
};
@razorcd
razorcd / code1_Savingtheevents.java
Last active November 22, 2020 21:56
Scaling reactive APIs - www.razorcodes.com
@Component
@RequiredArgsConstructor
public class CustomerListener {
private final CustomerRepository<CustomerUpdatedEvent> customerRepository;
@StreamListener("customer-events")
public void customerEventsConsumer(Flux<DataUpdatedEvent> event) {
event.map(this::parseEvent)
.doOnNext(MyService::doBusinessLogic) // returns CustomerUpdatedEvent
@razorcd
razorcd / diagram2_BestSolution3.md
Last active January 6, 2021 18:10
Scaling reactive APIs with Redis-Streams - www.razorcodes.com
                                                                    +-------------- +
Kafka Listener  --> Business Logic --> Repository(RedisClient) -->  |               |
                                                                    | Redis Streams |
API Controller  -->  Repository(Redis Puller) ------------------->  |               |
                                                                    +---------------+
@razorcd
razorcd / diagram2_SimpleSolution2.md
Created November 22, 2020 21:42
Scaling reactive APIs with Redis-Streams - www.razorcodes.com
Kafka Topic A, Partition 3 --> Service Instance 2 --> Reactive Mongo --> Service Instance 1 --> Event stream listener /customer1