Skip to content

Instantly share code, notes, and snippets.

@igorcosta
Forked from scotthaleen/elastic_bulk_ingest.py
Created January 6, 2017 00:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save igorcosta/0fc9e6928e1a788e0258461b51e1fbf9 to your computer and use it in GitHub Desktop.
Save igorcosta/0fc9e6928e1a788e0258461b51e1fbf9 to your computer and use it in GitHub Desktop.
Bulk Index json to elastic search
from pyspark import SparkContext, SparkConf
import json
import argparse
def fn_to_doc(line):
try:
doc = {}
data = json.loads(line)
doc['data'] = data
return [json.dumps(doc)]
except:
return []
if __name__ == "__main__":
desc='elastic search ingest'
parser = argparse.ArgumentParser(
description=desc,
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=desc)
parser.add_argument("input_path", help="lines of json to ingest")
parser.add_argument("es_resource", help="index and doc_type (my-index/doc)")
parser.add_argument("--es_nodes", default="127.0.0.1", help="es.nodes")
parser.add_argument("--es_port", default="9200", help="es.port")
args = parser.parse_args()
conf = SparkConf().setAppName("Elastic Ingest")
sc = SparkContext(conf=conf)
es_write_conf = {
"es.nodes" : args.es_nodes,
"es.port" : args.es_port,
"es.resource" : args.es_resource,
#"es.nodes.client.only" : "true",
"es.input.json" : "yes"
}
hdfs_path = args.input_path
d = sc.textFile(hdfs_path).map(lambda x : ("key", x))
d.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.apache.hadoop.io.Text",
#valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
#!/usr/bin/env bash
set +x
set -e
INDEX=my_index
DOC_TYPE=my_doc_type
MAPPING_FILE=my_mapping
response=$(curl -XHEAD -i --write-out %{http_code} --silent --output /dev/null "localhost:9200/${INDEX}")
if [[ "$response" -eq 404 ]]; then
printf "create index ${INDEX}\n"
curl -s -XPOST "http://localhost:9200/${INDEX}" -d '{ "settings": { "index": { "mapping.allow_type_wrapper": true } } }'
fi
response=$(curl -XHEAD -i --write-out %{http_code} --silent --output /dev/null "localhost:9200/${INDEX}/${DOC_TYPE}")
if [[ "$response" -eq 200 ]]; then
printf "delete doc_type\n"
curl -XDELETE "localhost:9200/${INDEX}/${DOC_TYPE}"
fi
printf "create doc_type\n"
curl -s -XPUT "http://localhost:9200/${INDEX}/${DOC_TYPE}/_mapping" --data-binary "@${MAPPING_FILE}"
printf "ingest documents\n"
spark-submit --master local[*] --driver-memory 8g --jars lib/elasticsearch-hadoop-2.1.1.jar --conf spark.storage.memoryFraction=.8 spark/elastic_bulk_ingest.py "my/path/input/part-*" "${INDEX}/${DOC_TYPE}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment