Skip to content

Instantly share code, notes, and snippets.

Created June 2, 2014 09:22
Show Gist options
  • Save cotdp/b3512dd1328f10ee9257 to your computer and use it in GitHub Desktop.
Save cotdp/b3512dd1328f10ee9257 to your computer and use it in GitHub Desktop.
package spark.examples
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonAST._
object HDFSDeleteExample {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local[8]", "HDFSDeleteExample")
// This is our HDFS output path
val output = "hdfs://localhost:9000/tmp/wimbledon_top_mentions"
// Setup HDFS, you can manipulate the config used by your application to override the defaults
val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.setInt( "dfs.block.size", 1073741824 ) // Like a 1G block size
val hdfs = org.apache.hadoop.fs.FileSystem.get(new"hdfs://localhost:9000"), hadoopConf)
// Produces a RDD[String] of JSON messages from Twitter
val lines = sc.textFile("wimbledon/*.json.gz")
// Extract the number of 'mentions' for each user in the dataset
val top_mentions = lines.flatMap(line => {
try {
Seq( parse(line) )
} catch { case _ : Throwable => { Seq() } } // Ignore any exceptions (broken lines etc.)
}).flatMap(json => {
val user_mentions = json \ "entities" \ "user_mentions"
}).map(user_mention => {
val screen_name = compact(render(user_mention \ "screen_name"))
( screen_name, 1 )
// Delete the existing path, ignore any exceptions thrown if the path doesn't exist
try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch { case _ : Throwable => { } }
// Print the top-100 mentioned users
/* Result:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment