Skip to content

Instantly share code, notes, and snippets.

@cb372
Last active June 29, 2018 18:40
Show Gist options
  • Save cb372/1d7b1abbbf0c643f2903 to your computer and use it in GitHub Desktop.
Save cb372/1d7b1abbbf0c643f2903 to your computer and use it in GitHub Desktop.
Using Elasticsearch as a Spark data source

Install the essentials.

$ brew update && brew install elasticsearch && brew install apache-spark

Start ES.

$ elasticsearch

Add some test data.

$ curl -X PUT localhost:9200/megacorp/employees/1 -d '
{
    "first_name" : "John",
    "last_name" :  "Smith",
    "age" :        25,
    "about" :      "I love to go rock climbing",
    "interests": [ "sports", "music" ]
}
'

$ curl -X PUT localhost:9200/megacorp/employees/2 -d '
{
    "first_name" :  "Jane",
    "last_name" :   "Smith",
    "age" :         32,
    "about" :       "I like to collect rock albums",
    "interests":  [ "music" ]
}
'

$ curl -X PUT localhost:9200/megacorp/employees/3 -d '
{
    "first_name" :  "Douglas",
    "last_name" :   "Fir",
    "age" :         35,
    "about":        "I like to build cabinets",
    "interests":  [ "forestry" ]
}
'

Start the Spark shell with ES integration.

$ spark-shell --packages org.elasticsearch:elasticsearch-spark_2.10:2.1.0.Beta4

Play around in the shell.

scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._

scala> val rdd = sc.esRDD("megacorp/employees", "")
rdd: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[1] at RDD at AbstractEsRDD.scala:17

scala> rdd.count
...
...
res1: Long = 3

RDD entries are (ID, key->value map) tuples.

scala> rdd.first
res2: (String, scala.collection.Map[String,AnyRef]) = (1,Map(first_name -> John, last_name -> Smith, age -> 25, about -> I love to go rock climbing, interests -> Buffer(sports, music)))

scala> rdd.filter(_._2("age").asInstanceOf[Long] > 30).map(_._2("first_name")).take(2)
..
..
res3: Array[AnyRef] = Array(Jane, Douglas)
@yuta-imai
Copy link

I was rushing into network issue and spark.es.nodes.wan.only=true resolved the issue. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment