Last active June 29, 2018 18:40
Using Elasticsearch as a Spark data source

Install the essentials.

$ brew update && brew install elasticsearch && brew install apache-spark

Start ES.

$ elasticsearch

Add some test data.

$ curl -X PUT localhost:9200/megacorp/employees/1 -d '
    "first_name" : "John",
    "last_name" :  "Smith",
    "age" :        25,
    "about" :      "I love to go rock climbing",
    "interests": [ "sports", "music" ]

$ curl -X PUT localhost:9200/megacorp/employees/2 -d '
    "first_name" :  "Jane",
    "last_name" :   "Smith",
    "age" :         32,
    "about" :       "I like to collect rock albums",
    "interests":  [ "music" ]

$ curl -X PUT localhost:9200/megacorp/employees/3 -d '
    "first_name" :  "Douglas",
    "last_name" :   "Fir",
    "age" :         35,
    "about":        "I like to build cabinets",
    "interests":  [ "forestry" ]

Start the Spark shell with ES integration.

$ spark-shell --packages org.elasticsearch:elasticsearch-spark_2.10:2.1.0.Beta4

Play around in the shell.

scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._

scala> val rdd = sc.esRDD("megacorp/employees", "")
rdd: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[1] at RDD at AbstractEsRDD.scala:17

scala> rdd.count
res1: Long = 3

RDD entries are (ID, key->value map) tuples.

scala> rdd.first
res2: (String, scala.collection.Map[String,AnyRef]) = (1,Map(first_name -> John, last_name -> Smith, age -> 25, about -> I love to go rock climbing, interests -> Buffer(sports, music)))

scala> rdd.filter(_._2("age").asInstanceOf[Long] > 30).map(_._2("first_name")).take(2)
res3: Array[AnyRef] = Array(Jane, Douglas)
I was rushing into network issue and resolved the issue. Thank you!

