Skip to content

Instantly share code, notes, and snippets.

@mananai
Created March 30, 2023 11:30
Show Gist options
  • Save mananai/1b1dc9665b390a844f86db6baa76b9e2 to your computer and use it in GitHub Desktop.
Save mananai/1b1dc9665b390a844f86db6baa76b9e2 to your computer and use it in GitHub Desktop.
A callable(task) to save tweets to an SQL database
class TwitterDBWriter implements Callable<Integer> {
private final static int TWEETS_BATCH_SIZE = 20;
private final BlockingQueue<Tweet[]> tweetsQueue;
private final Queue<Tweet[]> writingQueue = new LinkedList<>();
private final TwitterDAO twitterDAO ;
TwitterDBWriter(String jdbcURL, BlockingQueue<Tweet[]> tweetsQueue) throws SQLException {
super();
this.twitterDAO = TwitterDAOFactory.create(jdbcURL);
this.tweetsQueue = tweetsQueue;
}
@Override
public Integer call() {
int count = 0;
try {
Tweet[] tweets;
do {
tweets = tweetsQueue.take();
if (!isPoisonPill(tweets))
writingQueue.add(tweets);
if ((writingQueue.size() >= TWEETS_BATCH_SIZE) || (isPoisonPill(tweets) && !writingQueue.isEmpty())) {
count += twitterDAO.saveTweets(writingQueue);
}
} while (!isPoisonPill(tweets));
this.twitterDAO.close();
System.out.printf("%s has inserted or updated %,d tweets\n", this.getClass().getSimpleName(), count);
} catch (InterruptedException | SQLException e) {
e.printStackTrace(System.err);
}
return count;
}
private boolean isPoisonPill(Tweet[] tweetArray) {
return tweetArray[0] == null;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment