Skip to content

Instantly share code, notes, and snippets.

@adrianva
Created March 19, 2017 00:04
Show Gist options
  • Save adrianva/0bc59ba3bb24f0c7d9b4e89e8c621af9 to your computer and use it in GitHub Desktop.
Save adrianva/0bc59ba3bb24f0c7d9b4e89e8c621af9 to your computer and use it in GitHub Desktop.
Save RDD and/or DataFrame from Spark to Elasticsearch
# Elastic configs
es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "twitter/tweet"
}
es_write_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "twitter/tweet"
}
df = sc.parallelize([Row(message="This is a message", user="mocopera", post_date="2017-03-11T14:12:12")]).toDF()
rdd = df.rdd.map(lambda row: (None, row.asDict()))
# Save RDD to Elastic
row_rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf
)
# Save DF to Elastic
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "twitter/tweet").save(mode="append")
# Read from Elastic
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment