Skip to content

Instantly share code, notes, and snippets.

@serdarmumcu
Created March 28, 2024 09:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save serdarmumcu/5a9b8c2542f250994e875408fe07ddf2 to your computer and use it in GitHub Desktop.
Save serdarmumcu/5a9b8c2542f250994e875408fe07ddf2 to your computer and use it in GitHub Desktop.
@Service
public class EventPublisherService {
@Autowired
private OutboxRepository outboxRepository;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private EventProcessingService eventProcessingService;
@Scheduled(fixedDelay = 10000)
public void publishEvents() {
List<Outbox> events = outboxRepository.findAll();
List<Long> eventIdsForDeletion = new ArrayList<>();
CompletableFuture<?>[] futures = events.stream().map(event ->
kafkaTemplate.send("messages-topic", event.getPayload()).thenApply(sendResult -> {
synchronized (eventIdsForDeletion) {
eventIdsForDeletion.add(event.getId());
}
return null;
}).exceptionally(throwable -> {
// Log error or implement retry mechanism
System.out.println("Error sending event to Kafka: " + throwable.getMessage());
return null;
})
).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
eventProcessingService.deleteProcessedEvents(eventIdsForDeletion);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment