import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import twitter4j.conf.ConfigurationContext
import twitter4j.auth.AuthorizationFactory
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.Authorization

object Main {
  val CONSUMER_KEY = "";
	val CONSUMER_SECRET = "";
	val ACCESS_TOKEN_KEY = "";
	val ACCESS_TOKEN_SECRET = "";
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir","D:/sw/hadoop/hadoop-common-2.2.0-bin-master/");
    System.setProperty("spark.driver.allowMultipleContexts", "true");
    System.setProperty("twitter4j.oauth.consumerKey", CONSUMER_KEY);
    System.setProperty("twitter4j.oauth.consumerSecret", CONSUMER_SECRET);
    System.setProperty("twitter4j.oauth.accessToken", ACCESS_TOKEN_KEY);
    System.setProperty("twitter4j.oauth.accessTokenSecret", ACCESS_TOKEN_SECRET);
    
    
    val cfg = new SparkConf();
    cfg.setMaster("local[*]").setAppName("Spark Streaming Example in Scala");    
    val ctx = new SparkContext(cfg);
    val ssc = new StreamingContext(cfg,Seconds(30));
    
    val twitterConfig = ConfigurationContext.getInstance();
		val twitterAuth = AuthorizationFactory.getInstance(twitterConfig);
		
		val filters = Seq("Brexit");
		val stream = TwitterUtils.createStream(ssc, None,filters,StorageLevel.MEMORY_AND_DISK_2).repartition(10);
		
		
		val tagEntities = stream.flatMap { x => Seq(x.getUser,x.getText,x.getId)}
		
		tagEntities.saveAsTextFiles("D:/keyur/tech/data/spark/twitter-stream/", "")
		//stream.filter { x => x.getText.contains("China") }.map { x => (x.getId,x.getText) }.saveAsTextFiles("D:/keyur/tech/data/spark/twitter-stream/", "")
		
		ssc.start()
		ssc.awaitTermination()
		ssc.stop(true)
		
		if(ctx!=null)
		  ctx.stop()
  }
}