Skip to content

Instantly share code, notes, and snippets.

@sallum
Created June 16, 2014 15:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sallum/020f7847917fbadde123 to your computer and use it in GitHub Desktop.
Save sallum/020f7847917fbadde123 to your computer and use it in GitHub Desktop.
Code to write to elasticsearch
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.io.{ MapWritable, Text, NullWritable }
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd._
import org.elasticsearch.hadoop.mr.EsOutputFormat
object TweetsElasticSearch {
def main(args: Array[String]): Unit = {
def toWritable(in: Map[String, String]) = {
val m = new MapWritable
for ((k, v) <- in)
m.put(new Text(k), new Text(v))
m
}
val sc = new SparkContext("local", "Elasticsearch-hadoop")
val tweet = Map("user" -> "example", "post_date" -> "2009-11-15T14:12:12", "message" -> "trying out Elastic Search")
val tweets = sc.makeRDD(Seq(tweet))
val writables = tweets.map(toWritable)
val output = writables.map { v => (NullWritable.get: Object, v: Object) }
sc.hadoopConfiguration.set("es.resource", "tweets/tweet")
sc.hadoopConfiguration.set("es.nodes", "172.17.10.20:9200")
sc.hadoopConfiguration.setBoolean("mapred.map.tasks.speculative.execution", false)
sc.hadoopConfiguration.setBoolean("mapred.reduce.tasks.speculative.execution", false)
output.saveAsNewAPIHadoopFile("/tweets", classOf[NullWritable], classOf[MapWritable],
classOf[EsOutputFormat], sc.hadoopConfiguration)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment