Skip to content

Instantly share code, notes, and snippets.

@ceteri
Last active March 28, 2019 03:38
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ceteri/835565935da932cb59a2 to your computer and use it in GitHub Desktop.
Save ceteri/835565935da932cb59a2 to your computer and use it in GitHub Desktop.
Twitter Streaming Classifier
val sc = new SparkContext(new SparkConf())
val ssc = new StreamingContext(sc, Seconds(intervalSecs))
val tweetStream = TwitterUtils.createStream(ssc, Utils.getAuth)
.map(gson.toJson(_))
tweetStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
val outputRDD = rdd.repartition(partitionsEachInterval)
val ts = time.milliseconds.toString
val path = outputDirectory + "/tweets_" + ts
outputRDD.saveAsTextFile(path)
numTweetsCollected += count
if (numTweetsCollected > numTweetsToCollect) {
System.exit(0)
}
}
})
ssc.start()
ssc.awaitTermination()
$ ls -lth
total 0
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:37 tweets_1427366240000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:37 tweets_1427366230000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:37 tweets_1427366220000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:36 tweets_1427366210000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:36 tweets_1427366200000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:36 tweets_1427366190000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:36 tweets_1427366180000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:36 tweets_1427366170000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:36 tweets_1427366160000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:35 tweets_1427366150000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:35 tweets_1427366140000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:35 tweets_1427366130000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:35 tweets_1427366120000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:35 tweets_1427366110000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:35 tweets_1427366100000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:34 tweets_1427366090000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:34 tweets_1427366080000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:34 tweets_1427366070000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:34 tweets_1427366060000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:34 tweets_1427366050000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:34 tweets_1427366040000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:33 tweets_1427366030000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:33 tweets_1427366020000
drwxr-xr-x 6 ceteri wheel 204B Mar 26 07:33 tweets_1427366010000
------Total count by languages Lang, count(*)---
[en,3856]
[ja,3029]
[ar,777]
[es,696]
[ru,344]
[id,258]
[pt,241]
[tr,207]
[th,201]
[fr,198]
[ko,193]
[en-gb,94]
[it,79]
[de,78]
[pl,47]
[nl,26]
[sv,13]
[el,12]
[zh-tw,8]
[en-GB,8]
[hi,6]
[da,6]
[uk,6]
[ca,6]
[no,5]
CLUSTER 1:
TLあんまり見ないけど
@くれたっら
いつでもくっるよ٩(δωδ)۶
そういえばディスガイアも今日か
CLUSTER 4:
قالوا العروبه روحت بعد صدام
واقول مع سلمان تحيى العروبه
RT @vip588: √ للمتواجدين الان √ زيادة متابعين √ فولو مي vip588 √ فولو باك √ رتويت للتغريدة √ فولو للي عمل رتويت √ اللي ما يلتزم ما بيستفيد …
ن سورة
CLUSTER 9:
Narcoestaciones clausuradas: Cierran seis gasolinerías de 19 que se encuentran ligadas al crimen en Michoacán
RT @SoyBuenCatre: En ocasiones me toca sacar plumas o decir DI-VI-NO para no desbordar tanta masculinidad.
RT @1DirectionCo: se me partió lo que me de quedaba de corazón
root
|-- contributorsIDs: array (nullable = true)
| |-- element: string (containsNull = false)
|-- createdAt: string (nullable = true)
|-- currentUserRetweetId: integer (nullable = true)
|-- hashtagEntities: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- end: integer (nullable = true)
| | |-- start: integer (nullable = true)
| | |-- text: string (nullable = true)
|-- id: long (nullable = true)
|-- inReplyToScreenName: string (nullable = true)
|-- inReplyToStatusId: long (nullable = true)
|-- inReplyToUserId: long (nullable = true)
|-- isFavorited: boolean (nullable = true)
|-- isPossiblySensitive: boolean (nullable = true)
|-- isTruncated: boolean (nullable = true)
|-- mediaEntities: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- displayURL: string (nullable = true)
| | |-- end: integer (nullable = true)
| | |-- expandedURL: string (nullable = true)
| | |-- id: long (nullable = true)
| | |-- mediaURL: string (nullable = true)
| | |-- mediaURLHttps: string (nullable = true)
| | |-- sizes: struct (nullable = true)
| | | |-- 0: struct (nullable = true)
| | | | |-- height: integer (nullable = true)
| | | | |-- resize: integer (nullable = true)
| | | | |-- width: integer (nullable = true)
| | | |-- 1: struct (nullable = true)
| | | | |-- height: integer (nullable = true)
| | | | |-- resize: integer (nullable = true)
| | | | |-- width: integer (nullable = true)
| | | |-- 2: struct (nullable = true)
| | | | |-- height: integer (nullable = true)
| | | | |-- resize: integer (nullable = true)
| | | | |-- width: integer (nullable = true)
| | | |-- 3: struct (nullable = true)
| | | | |-- height: integer (nullable = true)
| | | | |-- resize: integer (nullable = true)
| | | | |-- width: integer (nullable = true)
| | |-- start: integer (nullable = true)
| | |-- type: string (nullable = true)
| | |-- url: string (nullable = true)
|-- retweetCount: integer (nullable = true)
|-- retweetedStatus: struct (nullable = true)
| |-- contributorsIDs: array (nullable = true)
| | |-- element: string (containsNull = false)
| |-- createdAt: string (nullable = true)
| |-- currentUserRetweetId: integer (nullable = true)
| |-- hashtagEntities: array (nullable = true)
| | |-- element: struct (containsNull = false)
| | | |-- end: integer (nullable = true)
| | | |-- start: integer (nullable = true)
| | | |-- text: string (nullable = true)
| |-- id: long (nullable = true)
| |-- inReplyToScreenName: string (nullable = true)
| |-- inReplyToStatusId: long (nullable = true)
| |-- inReplyToUserId: long (nullable = true)
| |-- isFavorited: boolean (nullable = true)
| |-- isPossiblySensitive: boolean (nullable = true)
| |-- isTruncated: boolean (nullable = true)
| |-- mediaEntities: array (nullable = true)
| | |-- element: struct (containsNull = false)
| | | |-- displayURL: string (nullable = true)
| | | |-- end: integer (nullable = true)
| | | |-- expandedURL: string (nullable = true)
| | | |-- id: long (nullable = true)
| | | |-- mediaURL: string (nullable = true)
| | | |-- mediaURLHttps: string (nullable = true)
| | | |-- sizes: struct (nullable = true)
| | | | |-- 0: struct (nullable = true)
| | | | | |-- height: integer (nullable = true)
| | | | | |-- resize: integer (nullable = true)
| | | | | |-- width: integer (nullable = true)
| | | | |-- 1: struct (nullable = true)
| | | | | |-- height: integer (nullable = true)
| | | | | |-- resize: integer (nullable = true)
| | | | | |-- width: integer (nullable = true)
| | | | |-- 2: struct (nullable = true)
| | | | | |-- height: integer (nullable = true)
| | | | | |-- resize: integer (nullable = true)
| | | | | |-- width: integer (nullable = true)
| | | | |-- 3: struct (nullable = true)
| | | | | |-- height: integer (nullable = true)
| | | | | |-- resize: integer (nullable = true)
| | | | | |-- width: integer (nullable = true)
| | | |-- start: integer (nullable = true)
| | | |-- type: string (nullable = true)
| | | |-- url: string (nullable = true)
| |-- retweetCount: integer (nullable = true)
| |-- source: string (nullable = true)
| |-- text: string (nullable = true)
| |-- urlEntities: array (nullable = true)
| | |-- element: struct (containsNull = false)
| | | |-- displayURL: string (nullable = true)
| | | |-- end: integer (nullable = true)
| | | |-- expandedURL: string (nullable = true)
| | | |-- start: integer (nullable = true)
| | | |-- url: string (nullable = true)
| |-- user: struct (nullable = true)
| | |-- createdAt: string (nullable = true)
| | |-- description: string (nullable = true)
| | |-- descriptionURLEntities: array (nullable = true)
| | | |-- element: string (containsNull = false)
| | |-- favouritesCount: integer (nullable = true)
| | |-- followersCount: integer (nullable = true)
| | |-- friendsCount: integer (nullable = true)
| | |-- id: long (nullable = true)
| | |-- isContributorsEnabled: boolean (nullable = true)
| | |-- isFollowRequestSent: boolean (nullable = true)
| | |-- isGeoEnabled: boolean (nullable = true)
| | |-- isProtected: boolean (nullable = true)
| | |-- isVerified: boolean (nullable = true)
| | |-- lang: string (nullable = true)
| | |-- listedCount: integer (nullable = true)
| | |-- location: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- profileBackgroundColor: string (nullable = true)
| | |-- profileBackgroundImageUrl: string (nullable = true)
| | |-- profileBackgroundImageUrlHttps: string (nullable = true)
| | |-- profileBackgroundTiled: boolean (nullable = true)
| | |-- profileBannerImageUrl: string (nullable = true)
| | |-- profileImageUrl: string (nullable = true)
| | |-- profileImageUrlHttps: string (nullable = true)
| | |-- profileLinkColor: string (nullable = true)
| | |-- profileSidebarBorderColor: string (nullable = true)
| | |-- profileSidebarFillColor: string (nullable = true)
| | |-- profileTextColor: string (nullable = true)
| | |-- profileUseBackgroundImage: boolean (nullable = true)
| | |-- screenName: string (nullable = true)
| | |-- showAllInlineMedia: boolean (nullable = true)
| | |-- statusesCount: integer (nullable = true)
| | |-- timeZone: string (nullable = true)
| | |-- translator: boolean (nullable = true)
| | |-- url: string (nullable = true)
| | |-- utcOffset: integer (nullable = true)
| |-- userMentionEntities: array (nullable = true)
| | |-- element: struct (containsNull = false)
| | | |-- end: integer (nullable = true)
| | | |-- id: long (nullable = true)
| | | |-- name: string (nullable = true)
| | | |-- screenName: string (nullable = true)
| | | |-- start: integer (nullable = true)
|-- source: string (nullable = true)
|-- text: string (nullable = true)
|-- urlEntities: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- displayURL: string (nullable = true)
| | |-- end: integer (nullable = true)
| | |-- expandedURL: string (nullable = true)
| | |-- start: integer (nullable = true)
| | |-- url: string (nullable = true)
|-- user: struct (nullable = true)
| |-- createdAt: string (nullable = true)
| |-- description: string (nullable = true)
| |-- descriptionURLEntities: array (nullable = true)
| | |-- element: string (containsNull = false)
| |-- favouritesCount: integer (nullable = true)
| |-- followersCount: integer (nullable = true)
| |-- friendsCount: integer (nullable = true)
| |-- id: long (nullable = true)
| |-- isContributorsEnabled: boolean (nullable = true)
| |-- isFollowRequestSent: boolean (nullable = true)
| |-- isGeoEnabled: boolean (nullable = true)
| |-- isProtected: boolean (nullable = true)
| |-- isVerified: boolean (nullable = true)
| |-- lang: string (nullable = true)
| |-- listedCount: integer (nullable = true)
| |-- location: string (nullable = true)
| |-- name: string (nullable = true)
| |-- profileBackgroundColor: string (nullable = true)
| |-- profileBackgroundImageUrl: string (nullable = true)
| |-- profileBackgroundImageUrlHttps: string (nullable = true)
| |-- profileBackgroundTiled: boolean (nullable = true)
| |-- profileBannerImageUrl: string (nullable = true)
| |-- profileImageUrl: string (nullable = true)
| |-- profileImageUrlHttps: string (nullable = true)
| |-- profileLinkColor: string (nullable = true)
| |-- profileSidebarBorderColor: string (nullable = true)
| |-- profileSidebarFillColor: string (nullable = true)
| |-- profileTextColor: string (nullable = true)
| |-- profileUseBackgroundImage: boolean (nullable = true)
| |-- screenName: string (nullable = true)
| |-- showAllInlineMedia: boolean (nullable = true)
| |-- statusesCount: integer (nullable = true)
| |-- timeZone: string (nullable = true)
| |-- translator: boolean (nullable = true)
| |-- url: string (nullable = true)
| |-- utcOffset: integer (nullable = true)
|-- userMentionEntities: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- end: integer (nullable = true)
| | |-- id: long (nullable = true)
| | |-- name: string (nullable = true)
| | |-- screenName: string (nullable = true)
| | |-- start: integer (nullable = true)
val sc = new SparkContext(new SparkConf())
val sqlContext = new SQLContext(sc)
val tweetTable = sqlContext.jsonFile(tweetInput).cache()
tweetTable.registerTempTable("tweetTable")
tweetTable.printSchema()
val query = "SELECT user.lang, COUNT(*) as cnt FROM tweetTable GROUP BY user.lang ORDER BY cnt DESC LIMIT 25"
sqlContext.sql(query).collect.foreach(println)
val texts = sqlContext.sql("SELECT text from tweetTable").map(_.head.toString)
val vectors = texts.map(Utils.featurize).cache()
val model = KMeans.train(vectors, numClusters, numIterations)
sc.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(outputModelDir)
val some_tweets = texts.take(100)
for (i <- 0 until numClusters) {
println(s"\nCLUSTER $i:")
some_tweets.foreach { t =>
if (model.predict(Utils.featurize(t)) == i) {
println(t)
}
}
}
val sc = new SparkContext(new SparkConf())
val ssc = new StreamingContext(conf, Seconds(5))
val tweets = TwitterUtils.createStream(ssc, Utils.getAuth)
val statuses = tweets.map(_.getText)
val model = new KMeansModel(ssc.sparkContext.objectFile[Vector](modelFile.toString).collect())
val filteredTweets = statuses
.filter(t => model.predict(Utils.featurize(t)) == clusterNumber)
filteredTweets.print()
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment