Skip to content

Instantly share code, notes, and snippets.

@edpaget
Created May 19, 2014 16:55
Show Gist options
  • Save edpaget/0477f18c6e574d1e937b to your computer and use it in GitHub Desktop.
Save edpaget/0477f18c6e574d1e937b to your computer and use it in GitHub Desktop.
name := "Spark Test"
version := "0.1.0-SNAPSHOT"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "1.0.4"
libraryDependencies += "org.mongodb" % "mongo-java-driver" % "2.11.4"
libraryDependencies += "org.mongodb" % "mongo-hadoop-core" % "1.0.0"
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.conf.Configuration
import org.bson.BSONObject
import org.bson.BasicBSONObject
object ClassificationCount {
def main(args: Array[String]) {
val sc = new SparkContext("local", "ClassificationCount")
val config = new Configuration()
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/asteroid.asteroid_classifications")
config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/asteroid.count")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
val countsRDD = mongoRDD.map(_ => ("Classification", 1))
.reduceByKey((a, b) => a + b)
val saveRDD = countsRDD.map((tuple) => {
var bson = new BasicBSONObject()
bson.put("classification", tuple._1)
bson.put("count", tuple._2)
(null, bson)
})
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment