Skip to content

Instantly share code, notes, and snippets.

@cotdp
Created June 2, 2014 09:22
Show Gist options
  • Save cotdp/b3512dd1328f10ee9257 to your computer and use it in GitHub Desktop.
Save cotdp/b3512dd1328f10ee9257 to your computer and use it in GitHub Desktop.
package spark.examples
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonAST._
object HDFSDeleteExample {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local[8]", "HDFSDeleteExample")
// This is our HDFS output path
val output = "hdfs://localhost:9000/tmp/wimbledon_top_mentions"
// Setup HDFS, you can manipulate the config used by your application to override the defaults
val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.setInt( "dfs.block.size", 1073741824 ) // Like a 1G block size
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
// Produces a RDD[String] of JSON messages from Twitter
val lines = sc.textFile("wimbledon/*.json.gz")
// Extract the number of 'mentions' for each user in the dataset
val top_mentions = lines.flatMap(line => {
try {
Seq( parse(line) )
} catch { case _ : Throwable => { Seq() } } // Ignore any exceptions (broken lines etc.)
}).flatMap(json => {
val user_mentions = json \ "entities" \ "user_mentions"
user_mentions.children
}).map(user_mention => {
val screen_name = compact(render(user_mention \ "screen_name"))
( screen_name, 1 )
}).reduceByKey(_+_).cache()
// Delete the existing path, ignore any exceptions thrown if the path doesn't exist
try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch { case _ : Throwable => { } }
top_mentions.saveAsTextFile(output)
// Print the top-100 mentioned users
top_mentions.top(100)(Ordering.by(_._2)).foreach(println)
/* Result:
("Wimbledon",291562)
("serenawilliams",42305)
("andy_murray",28684)
("frankieboyle",24653)
("BBCSport",16664)
("FedererNews",12880)
("Georgia_Ford",12235)
("ATPWorldTour",12177)
("espn",11494)
("TenisExtra",11338)
("DjokerNole",10815)
("RafaelNadal",9338)
("piersmorgan",8721)
("100porcientenis",8106)
("TeamRFederer",6837)
("SuperSportBlitz",6620)
("Charles_HRH",6540)
*/
sc.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment