Skip to content

Instantly share code, notes, and snippets.

@remcowesterhoud
Created November 7, 2016 13:55
Show Gist options
  • Save remcowesterhoud/64422607e4d8a945a1771e2d81ea9878 to your computer and use it in GitHub Desktop.
Save remcowesterhoud/64422607e4d8a945a1771e2d81ea9878 to your computer and use it in GitHub Desktop.
Proof of Concept - Fetching Twitter followers
package Twitter_streams
import java.util.concurrent.{ConcurrentLinkedQueue, ExecutorService, Executors}
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.{Done, NotUsed}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import twitter4j.{ResponseList, Twitter, TwitterFactory, User}
import twitter4j.auth.AccessToken
import twitter4j.conf.ConfigurationBuilder
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.collection.JavaConversions._
/**
* Created by RemcoW on 2-11-2016.
*/
object Main extends App {
implicit val system: ActorSystem = ActorSystem("twitter-stream")
val executorService: ExecutorService = Executors.newCachedThreadPool()
val ec: ExecutionContext = ExecutionContext.fromExecutorService(executorService)
val log: LoggingAdapter = Logging.getLogger(system, Main)
implicit val materializer = ActorMaterializer()(system)
val startTime = System.nanoTime()
// Twitter User ID (followers from this account will be fetched)
val userId = 33674079L
val output = new ConcurrentLinkedQueue[User]()
Console.println(s"Fetching follower profiles for $userId")
val client = TwitterClient()
// Create Stream components
val source: Source[Seq[Long], NotUsed] = Source(TwitterHelpers.getFollowers(userId).get).grouped(100)
val flow: Flow[Seq[Long], ResponseList[User], NotUsed] = Flow[Seq[Long]].mapAsyncUnordered(100)((ids: Seq[Long]) => {
Future { client.lookupUsers(ids.toArray) }(ec)
})
val sink: Sink[ResponseList[User], Future[Done]] = Sink.foreach[ResponseList[User]] { users =>
for (user <- users.toIterable){
output.offer(user)
}
}
// Create Stream
source.via(flow).runWith(sink).onComplete {
_ =>
Console.println(s"Fetched ${output.size()} profiles")
val endTime = System.nanoTime()
Console.println(s"Time taken: ${(endTime - startTime) / 1000000000.00}s")
system.terminate()
Runtime.getRuntime.exit(0)
}(ec)
}
object TwitterClient {
val appKey: String = "Instert application key here"
val appSecret: String = "Insert application secret here"
val accessToken: String = "Insert access token here"
val accessTokenSecret: String = "Insert access token secret here"
def apply(): Twitter = {
val factory = new TwitterFactory(new ConfigurationBuilder().build())
val t = factory.getInstance()
t.setOAuthConsumer(appKey, appSecret)
t.setOAuthAccessToken(new AccessToken(accessToken, accessTokenSecret))
t
}
}
object TwitterHelpers {
def getFollowers(userId: Long): Try[Set[Long]] = {
Try({
val followerIds = mutable.Set[Long]()
var cursor = -1L
do {
val client = TwitterClient()
val res = client.friendsFollowers().getFollowersIDs(userId, cursor, 5000)
res.getIDs.toList.foreach(x => followerIds.add(x))
if (res.hasNext) {
cursor = res.getNextCursor
}
else {
cursor = -1
}
} while (cursor > 0)
followerIds.toSet
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment