Skip to content

Instantly share code, notes, and snippets.

@sergio11
Created August 14, 2020 18:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sergio11/edf5da41088544379bde6dbb762220a1 to your computer and use it in GitHub Desktop.
Save sergio11/edf5da41088544379bde6dbb762220a1 to your computer and use it in GitHub Desktop.
@Component
@Slf4j
@RequiredArgsConstructor
public class TweetsIngestHandler {
/**
* Text Analyzer Service
*/
private final ITextAnalyzerService textAnalyzerService;
/**
*
* @param newTweet
* @param acknowledgment
* @return
*/
@StreamListener(AppStreamsConfig.TWEET_INGEST_CHANNEL)
@SendTo(AppStreamsConfig.PROCESSED_TWEETS_CHANNEL)
public TweetDTO onNewTweet(
@Payload final TweetDTO newTweet,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
final TextAnalysisResult analysisResult = textAnalyzerService.analyze(newTweet.getText());
newTweet.setSentimentLabel(analysisResult.getSentiment().name());
newTweet.setSentimentValue(analysisResult.getSentiment().getValue());
newTweet.setEntityMentions(analysisResult.getEntityMentions());
newTweet.setTokensAndNERTags(analysisResult.getTokensAndNERTags());
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
log.debug("Sentiment For Current Tweet -> " + analysisResult.getSentiment().name());
return newTweet;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment