Last active June 29, 2018 18:40
Using Elasticsearch as a Spark data source

Install the essentials.

$ brew update && brew install elasticsearch && brew install apache-spark

Start ES.

$ elasticsearch

Add some test data.

$ curl -X PUT localhost:9200/megacorp/employees/1 -d '
    "first_name" : "John",
    "last_name" :  "Smith",
    "age" :        25,
    "about" :      "I love to go rock climbing",
    "interests": [ "sports", "music" ]

$ curl -X PUT localhost:9200/megacorp/employees/2 -d '
    "first_name" :  "Jane",
    "last_name" :   "Smith",
    "age" :         32,
    "about" :       "I like to collect rock albums",
    "interests":  [ "music" ]

$ curl -X PUT localhost:9200/megacorp/employees/3 -d '
    "first_name" :  "Douglas",
    "last_name" :   "Fir",
    "age" :         35,
    "about":        "I like to build cabinets",
    "interests":  [ "forestry" ]

Start the Spark shell with ES integration.

$ spark-shell --packages org.elasticsearch:elasticsearch-spark_2.10:2.1.0.Beta4

Play around in the shell.

scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._

scala> val rdd = sc.esRDD("megacorp/employees", "")
rdd: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[1] at RDD at AbstractEsRDD.scala:17

scala> rdd.count
res1: Long = 3

RDD entries are (ID, key->value map) tuples.

scala> rdd.first
res2: (String, scala.collection.Map[String,AnyRef]) = (1,Map(first_name -> John, last_name -> Smith, age -> 25, about -> I love to go rock climbing, interests -> Buffer(sports, music)))

scala> rdd.filter(_._2("age").asInstanceOf[Long] > 30).map(_._2("first_name")).take(2)
res3: Array[AnyRef] = Array(Jane, Douglas)
Oct 10, 2016

@epugh what happens under the hood is this elasticsearch Hadoop driver uses classic elasticsearch HTTP API to get the data. So a simple count will result on fetching all data from elasticsearch, that is very long on large dataset.

Oct 10, 2016

Ho @epugh, maybe you have a network issue, by default, all elasticsearch Hadoop drivers will try to use ES "local IP". But if you run ES on Docker for instance, this can not work. Thats why elastic team has added a configuration called "es.nodes.wan.only":

Running SPARK_LOCAL_IP="" ./bin/spark-shell --master spark://localhost:7077 --conf should help you.

I was rushing into network issue and resolved the issue. Thank you!

