Created
November 7, 2016 13:55
-
-
Save remcowesterhoud/64422607e4d8a945a1771e2d81ea9878 to your computer and use it in GitHub Desktop.
Proof of Concept - Fetching Twitter followers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Twitter_streams | |
import java.util.concurrent.{ConcurrentLinkedQueue, ExecutorService, Executors} | |
import akka.actor.ActorSystem | |
import akka.event.{Logging, LoggingAdapter} | |
import akka.{Done, NotUsed} | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{Flow, Sink, Source} | |
import twitter4j.{ResponseList, Twitter, TwitterFactory, User} | |
import twitter4j.auth.AccessToken | |
import twitter4j.conf.ConfigurationBuilder | |
import scala.collection.mutable | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.Try | |
import scala.collection.JavaConversions._ | |
/** | |
* Created by RemcoW on 2-11-2016. | |
*/ | |
object Main extends App { | |
implicit val system: ActorSystem = ActorSystem("twitter-stream") | |
val executorService: ExecutorService = Executors.newCachedThreadPool() | |
val ec: ExecutionContext = ExecutionContext.fromExecutorService(executorService) | |
val log: LoggingAdapter = Logging.getLogger(system, Main) | |
implicit val materializer = ActorMaterializer()(system) | |
val startTime = System.nanoTime() | |
// Twitter User ID (followers from this account will be fetched) | |
val userId = 33674079L | |
val output = new ConcurrentLinkedQueue[User]() | |
Console.println(s"Fetching follower profiles for $userId") | |
val client = TwitterClient() | |
// Create Stream components | |
val source: Source[Seq[Long], NotUsed] = Source(TwitterHelpers.getFollowers(userId).get).grouped(100) | |
val flow: Flow[Seq[Long], ResponseList[User], NotUsed] = Flow[Seq[Long]].mapAsyncUnordered(100)((ids: Seq[Long]) => { | |
Future { client.lookupUsers(ids.toArray) }(ec) | |
}) | |
val sink: Sink[ResponseList[User], Future[Done]] = Sink.foreach[ResponseList[User]] { users => | |
for (user <- users.toIterable){ | |
output.offer(user) | |
} | |
} | |
// Create Stream | |
source.via(flow).runWith(sink).onComplete { | |
_ => | |
Console.println(s"Fetched ${output.size()} profiles") | |
val endTime = System.nanoTime() | |
Console.println(s"Time taken: ${(endTime - startTime) / 1000000000.00}s") | |
system.terminate() | |
Runtime.getRuntime.exit(0) | |
}(ec) | |
} | |
object TwitterClient { | |
val appKey: String = "Instert application key here" | |
val appSecret: String = "Insert application secret here" | |
val accessToken: String = "Insert access token here" | |
val accessTokenSecret: String = "Insert access token secret here" | |
def apply(): Twitter = { | |
val factory = new TwitterFactory(new ConfigurationBuilder().build()) | |
val t = factory.getInstance() | |
t.setOAuthConsumer(appKey, appSecret) | |
t.setOAuthAccessToken(new AccessToken(accessToken, accessTokenSecret)) | |
t | |
} | |
} | |
object TwitterHelpers { | |
def getFollowers(userId: Long): Try[Set[Long]] = { | |
Try({ | |
val followerIds = mutable.Set[Long]() | |
var cursor = -1L | |
do { | |
val client = TwitterClient() | |
val res = client.friendsFollowers().getFollowersIDs(userId, cursor, 5000) | |
res.getIDs.toList.foreach(x => followerIds.add(x)) | |
if (res.hasNext) { | |
cursor = res.getNextCursor | |
} | |
else { | |
cursor = -1 | |
} | |
} while (cursor > 0) | |
followerIds.toSet | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment