Skip to content

Instantly share code, notes, and snippets.

@cenkbircanoglu
Created February 9, 2015 12:32
Show Gist options
  • Save cenkbircanoglu/fb127d504594c7d5a62b to your computer and use it in GitHub Desktop.
Save cenkbircanoglu/fb127d504594c7d5a62b to your computer and use it in GitHub Desktop.
ElasticSearch Querying with Spark
package net.egemsoft.rrd.elasticPaths;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.KryoSerializer;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import java.io.Serializable;
/**
* Created by cenk on 09/02/15.
*/
public class ElasticSearch implements Serializable {
private Configuration config;
private SparkConf sparkConf;
private JavaPairRDD esRDD;
public Configuration setConfig() {
Configuration conf = new Configuration();
conf.set("es.resource", "path");
conf.set("es.query", "?leaf=true");
conf.set("es.nodes", "host");
return conf;
}
public ElasticSearch() {
this.config = setConfig();
this.sparkConf = sparkConf();
}
public SparkConf sparkConf() {
SparkConf sc = new SparkConf().setMaster("local").setAppName("Elastic");
sc.set("spark.serializer", KryoSerializer.class.getName());
return sc;
}
public void doSelect() {
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
esRDD = jsc.newAPIHadoopRDD(config, EsInputFormat.class, Text.class, MapWritable.class);
esRDD.cache();
}
public void displayCount() {
long docCount = esRDD.count();
System.out.println(docCount);
}
public void displayData() {
for (Object row : esRDD.collect()) {
System.out.println(row.toString());
}
}
public JavaPairRDD getEsRDD() {
return esRDD;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment