Skip to content

Instantly share code, notes, and snippets.

@bbalakriz
Created May 8, 2020 04:06
Show Gist options
  • Save bbalakriz/a37ea8a0e6b72a615091e628625e274f to your computer and use it in GitHub Desktop.
Save bbalakriz/a37ea8a0e6b72a615091e628625e274f to your computer and use it in GitHub Desktop.
CamelResource.java
@Path("/twitter")
@ApplicationScoped
public class CamelResource {
final Logger LOG = Logger.getLogger(CamelResource.class);
String twitterSearchUri = "twitter-search://#" + "$$TOPIC" + "?count=10&lang=en-us";
@Inject
@Channel("twitter-feeds")
Emitter<Feed> emitter;
@Inject
ConsumerTemplate consumerTemplate;
@Path("/timeline")
@GET
@Produces(MediaType.TEXT_PLAIN)
public String getTimeline(@QueryParam("topic") String topic,
@QueryParam("sample") Integer sample) {
final List<String> tags = new ArrayList<String>();
final String searchQuery = String.format(twitterSearchUri.replace("$$TOPIC", topic));
String tweet = null;
String tweetTimeStamp = null;
sample = (sample == null || sample <= 0 ? 1 : sample);
for (int i = 0; i < sample; i++) {
tweet = consumerTemplate.receiveBodyNoWait(searchQuery, String.class);
if (null != tweet) {
tweetTimeStamp = tweet.substring(0, tweet.indexOf("(")).trim();
// Pattern match hashtag
Matcher matcher = Pattern.compile("#[a-zA-Z0-9-_]*").matcher(tweet);
// Iterate over matches
while (matcher.find()) {
String hashTag = matcher.group();
tags.add(hashTag);
if (hashTag.trim().length() > 1) {
eventTag(hashTag, tweetTimeStamp);
}
}
}
}
LOG.info(tags);
return Response.Status.OK.name();
}
/**
* Publish the hashtags to kafka topic - twitter-feeds
*/
public void eventTag(String tag, String tweetTimeStamp) {
LOG.info("Hashtag: " + tag);
emitter.send(new Feed(formatTag(tag), tweetTimeStamp));
}
/**
* Remove #, leading and trailing spaces and convert to lowercase
*/
public static String formatTag(String tag) {
tag = tag.substring(1).trim().toLowerCase();
return tag;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment