import org.apache.spark.SparkConf import org.apache.spark.SparkConf import org.apache.spark.SparkContext import twitter4j.conf.ConfigurationContext import twitter4j.auth.AuthorizationFactory import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel import twitter4j.auth.Authorization object Main { val CONSUMER_KEY = ""; val CONSUMER_SECRET = ""; val ACCESS_TOKEN_KEY = ""; val ACCESS_TOKEN_SECRET = ""; def main(args: Array[String]) { System.setProperty("hadoop.home.dir","D:/sw/hadoop/hadoop-common-2.2.0-bin-master/"); System.setProperty("spark.driver.allowMultipleContexts", "true"); System.setProperty("twitter4j.oauth.consumerKey", CONSUMER_KEY); System.setProperty("twitter4j.oauth.consumerSecret", CONSUMER_SECRET); System.setProperty("twitter4j.oauth.accessToken", ACCESS_TOKEN_KEY); System.setProperty("twitter4j.oauth.accessTokenSecret", ACCESS_TOKEN_SECRET); val cfg = new SparkConf(); cfg.setMaster("local[*]").setAppName("Spark Streaming Example in Scala"); val ctx = new SparkContext(cfg); val ssc = new StreamingContext(cfg,Seconds(30)); val twitterConfig = ConfigurationContext.getInstance(); val twitterAuth = AuthorizationFactory.getInstance(twitterConfig); val filters = Seq("Brexit"); val stream = TwitterUtils.createStream(ssc, None,filters,StorageLevel.MEMORY_AND_DISK_2).repartition(10); val tagEntities = stream.flatMap { x => Seq(x.getUser,x.getText,x.getId)} tagEntities.saveAsTextFiles("D:/keyur/tech/data/spark/twitter-stream/", "") //stream.filter { x => x.getText.contains("China") }.map { x => (x.getId,x.getText) }.saveAsTextFiles("D:/keyur/tech/data/spark/twitter-stream/", "") ssc.start() ssc.awaitTermination() ssc.stop(true) if(ctx!=null) ctx.stop() } }