Skip to content

Instantly share code, notes, and snippets.

@dcode
Last active November 21, 2019 13:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dcode/c886badfd01af5eaeeaaf3550faa1b7a to your computer and use it in GitHub Desktop.
Save dcode/c886badfd01af5eaeeaaf3550faa1b7a to your computer and use it in GitHub Desktop.
A super simple (i.e. no error handling) script to parse a list of CSVs and write them to Elasticsearch using the bulk API. Requires Python 3 and the Elasticsearch Python client (pip3 install elasticsearch).
#!/usr/bin/env python3
import argparse
from pathlib import Path
import csv
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import TransportError
from elasticsearch.helpers import bulk, streaming_bulk
parser = argparse.ArgumentParser(description='Simple upload of a CSV to Elasticsearch for analysis')
#group = parser.add_mutually_exclusive_group()
#group.add_argument('-v', '--verbose', action='store_true')
#group.add_argument('-q', '--quiet', action='store_true')
parser.add_argument('-u', '--url', help='The URL to use for Elasticsearch. Defaults to http://127.0.0.1:9200/', default='http://127.0.0.1:9200/')
parser.add_argument('-i', '--index', help='The index to write the data to. Defaults to the name of the first file without the extension.')
parser.add_argument('--custom-headers', help='A comma separated list of custom field names')
parser.add_argument('filename', type=argparse.FileType('r'), help='The file(s) to read. Note that they should all have the same field arragement per invocation.', nargs='+')
args = parser.parse_args()
if args.index == None:
args.index = Path(args.filename[0].name).stem
if args.custom_headers != None:
args.custom_headers = args.custom_headers.split(',')
print( args)
client = Elasticsearch(args.url)
for item in args.filename:
reader = csv.DictReader(item, fieldnames=args.custom_headers)
for ok, result in streaming_bulk(
client,
reader,
index=args.index,
doc_type='doc',
chunk_size=500
):
action, result = result.popitem()
doc_id = '/%s/doc/%s' % (args.index, result['_id'])
# process the information from ES whether the document has been
# successfully indexed
if not ok:
print('Failed to %s document %s: %r' % (action, doc_id, result))
else:
print(doc_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment