Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save syedatifakhtar/6b4b055898727ee0391d6b2cdd9376f2 to your computer and use it in GitHub Desktop.
Save syedatifakhtar/6b4b055898727ee0391d6b2cdd9376f2 to your computer and use it in GitHub Desktop.
/*************************************Spark Basics************************************************
**************************************************************************************************/
val someSentences = List("The quick brown fox jumped over the wall","Another sentence","Some more")
someSentences.filter(sentence=> !sentence.toLowerCase.startsWith("some"))
someSentences.map(x=>x.split(" "))
someSentences.flatMap(x=>x.split(" "))
Output:
someSentences: List[String] = List(The quick brown fox jumped over the wall, Another sentence, Some more)
res24: List[String] = List(The quick brown fox jumped over the wall, Another sentence)
res26: List[Array[String]] = List(Array(The, quick, brown, fox, jumped, over, the, wall), Array(Another, sentence), Array(Some, more))
res27: List[String] = List(The, quick, brown, fox, jumped, over, the, wall, Another, sentence, Some, more)
val wordsRDD = sc.textFile("/Users/syedatifakhtar/Notes/wordcount/").flatMap(x=>x.split(" "))
val wordsByCountRDD = wordsRDD
.map(word=>(word,1))
.reduceByKey((a,b)=> a+b )
Output:
wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[492] at flatMap at <console>:45
wordsByCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[494] at reduceByKey at <console>:49
val someWords = sc.textFile("/Users/syedatifakhtar/Notes/wordcount/").flatMap(x=>x.split(" "))
val someOther = someWords.collect()
someWords.persist()
val filteredArticles = someWords.filter(x=> !x.toLowerCase.equals("the") && !x.toLowerCase.equals("a"))
val filteredAndOr = someWords.filter(x=> !x.toLowerCase.equals("and") && !x.toLowerCase.equals("or"))
filteredArticles.take(20)
filteredAndOr.take(30)
Output:
someWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[594] at flatMap at <console>:45
someOther: Array[String] = Array(The, Project, Gutenberg, EBook, of, The, King, James, Bible, "", This, eBook, is, for, the, use, of, anyone, anywhere, at, no, cost, and, with, almost, no, restrictions, whatsoever., "", You, may, copy, it,, give, it, away, or, re-use, it, under, the, terms, of, the, Project, Gutenberg, License, included, with, this, eBook, or, online, at, www.gutenberg.org, "", "", Title:, The, King, James, Bible, "", Release, Date:, March, 2,, 2011, [EBook, #10], [This, King, James, Bible, was, orginally, posted, by, Project, Gutenberg, in, late, 1989], "", Language:, English, "", "", ***, START, OF, THIS, PROJECT, GUTENBERG, EBOOK, THE, KING, JAMES, BIBLE, ***, "", "", "", "", "", "", "", "", "", "", "", "", "", "", The, Old, Testament, of, the, King, James, Version, ...someOtherRDD: someWords.type = MapPartitionsRDD[594] at flatMap at <console>:45
filteredArticles: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[595] at filter at <console>:47
filteredAndOr: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[596] at filter at <console>:47
res51: Array[String] = Array(Project, Gutenberg, EBook, of, King, James, Bible, "", This, eBook, is, for, use, of, anyone, anywhere, at, no, cost, and)
res52: Array[String] = Array(The, Project, Gutenberg, EBook, of, The, King, James, Bible, "", This, eBook, is, for, the, use, of, anyone, anywhere, at, no, cost, with, almost, no, restrictions, whatsoever., "", You, may)
Output:
/*************************************Twitter Streaming With Spark SQL****************************
**************************************************************************************************/
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = ""
val apiSecret = ""
val accessToken = ""
val accessTokenSecret = ""
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
import org.apache.spark.streaming.twitter._
val ssc = new StreamingContext(sc, Seconds(60))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(300))
case class Tweet(createdAt:Long, text:String, screenName:String)
val mappedTweets = twt.map(status=>
Tweet(status.getCreatedAt().getTime()/1000, status.getText().replace("\n"," ").replace("\r"," "), status.getUser().getScreenName())
)
mappedTweets.foreachRDD {rdd=>
rdd.saveAsTextfile
}
val filteredTweets = mappedTweets.filter{tweet=>tweet.text.replace("\n"," ").replace("\r"," ").toLowerCase.contains("happy")}
filteredTweets.foreachRDD{rdd=>
rdd.toDF().registerTempTable("filteredTweets")
}
ssc.start()
%sql
select from_unixtime(createdAt) as created, screenName, text from tweets
%sql
select minute, count(1) as cnt from
(
select substr(from_unixtime(createdAt),0,16) as minute, screenName, text from filteredTweets
) sub1 group by minute
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment