Skip to content

Instantly share code, notes, and snippets.

@sergio11
Created August 14, 2020 18:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sergio11/915426aaa5c04c044dc7877d38921b31 to your computer and use it in GitHub Desktop.
Save sergio11/915426aaa5c04c044dc7877d38921b31 to your computer and use it in GitHub Desktop.
@Slf4j
@RequiredArgsConstructor
public class TwitterMessageProducer extends MessageProducerSupport {
private final TwitterStream twitterStream;
private final MessageChannel outputChannel;
private final TweetEntityMapper tweetEntityMapper;
@Getter
@Setter
private List<String> terms;
/**
* Init Message Producer
*/
@Override
protected void onInit() {
super.onInit();
logger.info("TwitterMessageProducer - onInit CALLED ");
}
/**
* Start Message Producer
*/
@Override
protected void doStart() {
logger.info("TwitterMessageProducer - doStart CALLED ");
setOutputChannel(outputChannel);
twitterStream.addListener(new StatusListener());
twitterStream.filter(buildFilterQuery());
}
/**
* Stop Message Producer
*/
@Override
protected void doStop() {
logger.info("TwitterMessageProducer - doStop CALLED ");
twitterStream.cleanUp();
twitterStream.clearListeners();
}
/**
* Private Methods
*/
/**
* Build Filter Query
*
* @return
*/
private FilterQuery buildFilterQuery() {
String[] termsArray = null;
if (!CollectionUtils.isEmpty(terms)) {
termsArray = terms.toArray(new String[0]);
}
final FilterQuery filterQuery = new FilterQuery(termsArray);
filterQuery.language(new String[]{"en"});
return filterQuery;
}
class StatusListener extends StatusAdapter {
@Override
public void onStatus(Status status) {
logger.info("TwitterMessageProducer - onStatus sendMessage CALLED ");
sendMessage(MessageBuilder
.withPayload(tweetEntityMapper.entityToDTO(status))
.build());
}
@Override
public void onException(Exception ex) {
logger.info("TwitterMessageProducer - onException CALLED -> " + ex.getMessage());
log.error(ex.getMessage(), ex);
}
@Override
public void onStallWarning(StallWarning warning) {
logger.info("TwitterMessageProducer - onStallWarning CALLED -> " + warning.getMessage());
}
}
@PostConstruct
protected void onPostConstruct() {
logger.info("TwitterMessageProducer - onPostConstruct");
this.start();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment