Skip to content

Instantly share code, notes, and snippets.

@sergio11
Created August 15, 2020 10:34
Show Gist options
  • Save sergio11/2202580387f8623f08500dd32d77a47c to your computer and use it in GitHub Desktop.
Save sergio11/2202580387f8623f08500dd32d77a47c to your computer and use it in GitHub Desktop.
@Slf4j
@RequiredArgsConstructor
@Component
public class TweetsProcessedHandler {
private final ITweetsService tweetService;
/**
*
* @param newProcessedTweet
* @param topic
* @param partition
* @param offset
* @param acknowledgment
* @param deliveryAttempt
*/
@StreamListener(AppStreamsConfig.PROCESSED_TWEETS_CHANNEL)
public void onNewProcessedTweet(
@Payload final TweetDTO newProcessedTweet,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment,
@Header(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT) Integer deliveryAttempt) {
log.info("NewsProcessedTweet with id '{}' and text '{}' received from bus. topic: {}, partition: {}, offset: {}, deliveryAttempt: {}",
newProcessedTweet.getId(), newProcessedTweet.getText(), topic, partition, offset, deliveryAttempt);
try {
tweetService.save(newProcessedTweet);
// commit offset
if (acknowledgment != null) {
acknowledgment.acknowledge();
log.info("Commit Offsets ...");
}
} catch (final Exception ex) {
log.error("Collect Tweet Exception -> " + ex.getMessage());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment