Skip to content

Instantly share code, notes, and snippets.

@mananai
Last active March 30, 2023 10:51
Show Gist options
  • Save mananai/9038ab9533b9f56bcddd195aa504002c to your computer and use it in GitHub Desktop.
Save mananai/9038ab9533b9f56bcddd195aa504002c to your computer and use it in GitHub Desktop.
A callable (task) to collect tweets from sample stream
class SampleStreamCollector implements Callable<Integer> {
private static final int TWITTER_API_FORBIDDEN_ERROR = 403;
private static final int MAX_RECONNECT_COUNT = 20;
private static final int SAMPLE_STREAM_REQUEST_DELAY = 15*60*1000/50;
private final Predicate<Tweet> predicate;
private final String[] bearerTokenArray;
private final BlockingQueue<Tweet[]> tweetsQueue;
private final AtomicBoolean isTerminated;
private Logger logger = Logger.getLogger(this.getClass().getName());
SampleStreamCollector(Predicate<Tweet> predicate,
String[] bearerTokenArray, BlockingQueue<Tweet[]> tweetsQueue, AtomicBoolean isTerminated) {
super();
if ( tweetsQueue == null || predicate == null || bearerTokenArray == null || isTerminated == null)
throw new IllegalArgumentException();
this.predicate = predicate;
this.bearerTokenArray = bearerTokenArray;
this.tweetsQueue = tweetsQueue;
this.isTerminated = isTerminated;
}
@Override
public Integer call() throws Exception {
int tweetCount = 0;
BearerTokens bearerTokens = new BearerTokens( bearerTokenArray);
try {
Set<String> tweetFields = new HashSet<>(
Arrays.asList("created_at", "conversation_id", "lang", "public_metrics", "referenced_tweets")); // Set<String>
Set<String> expansions = new HashSet<>(Arrays.asList("author_id", "referenced_tweets.id")); // Set<String> |
int reconnectCount = 0;
do {//Reconnecting loop
BufferedReader tweetReader = null;
logger.fine((reconnectCount > 0 ? "Reconnecting": "Connecting") + " to Twitter...");
try {
TwitterCredentialsBearer bearer = new TwitterCredentialsBearer(bearerTokens.getNext());
TwitterApi apiInstance = new TwitterApi(bearer);
if ( reconnectCount > 0) {
try {
Thread.sleep(SAMPLE_STREAM_REQUEST_DELAY);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
InputStream result = apiInstance.tweets().sampleStream().tweetFields(tweetFields)
.expansions(expansions).execute(MAX_RECONNECT_COUNT);
logger.fine("Connected");
@SuppressWarnings("serial")
Type localVarReturnType = new TypeToken<StreamingTweetResponse>() {
}.getType();
tweetReader = new BufferedReader(new InputStreamReader(result));
logger.info("Streaming...");
while ( !isTerminated.get() ) {// Reading data loop
String line = tweetReader.readLine();
if (line == null) {
logger.warning("Line is null");
break;
}
else if (line.isEmpty())
continue;
Object jsonObject = JSON.getGson().fromJson(line, localVarReturnType);
StreamingTweetResponse response = (StreamingTweetResponse) jsonObject;
Tweet tweet = response.getData();
if (tweet == null ) {
logger.warning("Tweet is null");
continue;
}
if ( predicate.test(tweet) ) {//if ( lang.equals(tweet.getLang())) {
List<Tweet> includedTweets = response.getIncludes().getTweets();
Tweet retweeted = null;
Tweet quoted = null;
Tweet repliedTo = null;
tweetCount++;
if (tweet.getReferencedTweets() != null) {
for (var ref : tweet.getReferencedTweets()) {
for (Tweet includedTweet : includedTweets) {
if (includedTweet.getId().equals(ref.getId())) {
switch (ref.getType()) {
case RETWEETED:
retweeted = includedTweet;
tweetCount++;
break;
case QUOTED:
quoted = includedTweet;
tweetCount++;
break;
case REPLIED_TO:
repliedTo = includedTweet;
tweetCount++;
break;
}
break;
}
}
}
}
try {
tweetsQueue.put(new Tweet[] { tweet, retweeted, quoted, repliedTo });
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}//while (!isTerminated)
}
catch (IOException | ApiException e) { // To handle connection reset
if ( e instanceof ApiException) {
if (((ApiException)e).getCode()==TWITTER_API_FORBIDDEN_ERROR)
logger.severe("API Forbidden");
else
throw e;
}
else
logger.severe("Connection reset or socket timeout");
e.printStackTrace(System.err);
reconnectCount++;
}
finally {
if (tweetReader != null ) {
try {
tweetReader.close();
} catch (IOException e1) {
logger.warning("Exception while closing a reader");
e1.printStackTrace(System.err);
}
}
}
} while (!isTerminated.get() && reconnectCount <= MAX_RECONNECT_COUNT ); // do while not terminated
} catch (ApiException e) {
logger.severe("Exception when calling TweetsApi#sampleStream");
logger.severe("Status code: " + e.getCode());
logger.severe("Reason: " + e.getResponseBody());
logger.severe("Response headers: " + e.getResponseHeaders());
e.printStackTrace(System.err);
} catch (Exception e) {
e.printStackTrace(System.err);
}
finally {
try {
//Poison pill
tweetsQueue.put(new Tweet[] {null, null, null, null});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.printf("%s has processed %,d tweets\n", this.getClass().getSimpleName(), tweetCount);
}
return tweetCount;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment