Skip to content

Instantly share code, notes, and snippets.

@huyphan
Created June 10, 2014 08:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save huyphan/4abb4f8edd6ce1cac528 to your computer and use it in GitHub Desktop.
Save huyphan/4abb4f8edd6ce1cac528 to your computer and use it in GitHub Desktop.
Spark 1.0 + Hadoop 2.0 + Elasticsearch
$ ADD_JARS=../elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT/dist/elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar bin/spark-shell
### COMMAND ###
val tweet = Map("user" -> "kimchy", "post_date" -> "2009-11-15T14:12:12", "message" -> "trying out Elastic Search")
val tweets = sc.makeRDD(Seq(tweet))
import org.apache.hadoop.io.{MapWritable, Text, NullWritable}
def toWritable(in: Map[String, String]) = {
val m = new MapWritable
for ((k, v) <- in)
m.put(new Text(k), new Text(v))
m
}
val writables = tweets.map(toWritable)
val output = writables.map { v => (NullWritable.get : Object, v : Object) }
import org.elasticsearch.hadoop.mr.EsOutputFormat
sc.hadoopConfiguration.set("es.resource", "tweets/tweet")
output.saveAsNewAPIHadoopFile(
"/test",
classOf[NullWritable],
classOf[MapWritable],
classOf[EsOutputFormat]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment