Skip to content

Instantly share code, notes, and snippets.

@honzakral
Last active September 27, 2018 19:38
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save honzakral/0fb3f76da529ee380d08 to your computer and use it in GitHub Desktop.
Save honzakral/0fb3f76da529ee380d08 to your computer and use it in GitHub Desktop.
CLI for elaasticsearch-py helpers

Elasticsearch CLI

Experimental CLI interface for the helpers in the python library.

Main purpose is to expose the bulk functionality to enable rapid loading of data into an elasticsearch cluster. Combined with the scan command it can also be used to reindex data from elasticsearch into a different index or cluster.

Installation

To use just install elasticsearch and click:

pip install elasticsearch click
#!/usr/bin/env python
import json
import time
import logging
import click
from elasticsearch import Elasticsearch, helpers
client = None
@click.group()
@click.option('--host', '-h', multiple=True, metavar='<host:port>',
help='hostname:port of running elasticsearch, can be specified multiple times.')
@click.option('--sniff', is_flag=True,
help='Inspect the cluster to get a list of currently active nodes for load balancing.')
@click.option('--timeout', type=int, default=10, metavar='<timeout>',
help='Connection timeout in seconds, default 10.')
@click.option('--log-level', type=click.Choice(('DEBUG', 'INFO', 'WARNING', 'ERROR')),
help='Turn on logging to stderr on specified log level. Use INFO to see all http requests.')
def cli(host, timeout, sniff, log_level):
"""
Set of commands to work with data in elasticsearch. To use this for reindex
"my-index" from localhost to otherhost, just run:
\b
escli --host localhost:9200 scan --index my-index --include-meta - | escli --host otherhost:9200 bulk -
To reindex data to a different index:
\b
escli scan --index my-index --include-meta --meta-keys _type,_id - | escli bulk --index new-index -
"""
global client
if log_level:
logging.basicConfig(level=getattr(logging, log_level))
client = Elasticsearch(host or None, timeout=timeout, sniff_on_start=sniff)
@cli.command('bulk', short_help='load documents into elasticsearch')
@click.option('--ignore-errors', is_flag=True,
help='Ignore errors, just report their number.')
@click.option('--no-meta', is_flag=True,
help=(
'Do not try and parse the file for json, just pass it on as '
'strings. This means that the json doc cannot be used to extract ID or '
'other metadata.'
))
@click.option('--threads', type=int, default=4, metavar='<thread_count>',
help='Degree of parallelization, default 4.')
@click.option('--index', metavar='<index>',
help='Default index, if not overriden by the _index key in the json docs.')
@click.option('--doc-type', metavar='<doc_type>',
help='Default type, if not overriden by the _type key in the json docs.')
@click.option('--chunk-size', type=int, default=500, metavar='<chunk_size>',
help='Maximum number of documents to be sent to es in one bulk request.')
@click.option('--max-chunk-bytes', type=int, default=100 * 1024 * 1024, metavar='<max_chunk_bytes>',
help='Maximum size of the http request in bytes.')
@click.option('--notify-every', type=int, default=10000, metavar='<N>',
help='Print a summary every <N> documents, default 10000.')
@click.argument('input', type=click.File('r'))
def bulk(input, threads, notify_every, no_meta, ignore_errors, **kwargs):
"""
Loads data into elasticsearch using the python client's bulk helpers.
The file should contain a json document per line in any format accepted by
the python helpers:
\b
http://elasticsearch-py.readthedocs.org/en/master/helpers.html#bulk-helpers
For maximum perfomance you can disable the parsing of the documents in
python by specifying --no-meta option and providing <index> and <doc_type>
values. All documents will then be indexed with random IDs allocated by
elasticsearch:
\b
for x in `seq 100000`
do
echo '{"title": "Document '$x'", "number": '$x'}'
done | escli bulk --index my-index --doc-type my-doc --no-meta
"""
bulk_helper = helpers.streaming_bulk
bulk_kwargs = {}
if threads > 1:
bulk_helper = helpers.parallel_bulk
bulk_kwargs['thread_count'] = threads
if not no_meta:
actions = map(json.loads, input)
elif kwargs['doc_type'] is None or kwargs['index'] is None:
# no meta means that we have to have index and doc_type set
raise click.BadParameter(
'We need index and doc-type defaults if we don\'t parse the json to extract the information.',
param_hint='--meta')
else:
# no meta, just pass in the string for more performance
actions = input
if ignore_errors:
bulk_kwargs.update({
'raise_on_exception': False,
'raise_on_error': False,
})
cnt, err_cnt = 0, 0
bulk_kwargs.update(kwargs)
start = time.time()
for ok, _ in bulk_helper(client, actions, **bulk_kwargs):
cnt += 1
if not ok:
err_cnt += 1
if notify_every and cnt % notify_every == 0:
duration = time.time() - start
click.echo('%d documents (%d failed) written in %f seconds (%f docs/sec)' % (
cnt, err_cnt, duration, cnt / duration), err=True)
duration = time.time() - start
click.echo('DONE')
click.echo('%d documents (%d failed) written in %f seconds (%f docs/sec)' % (
cnt, err_cnt, duration, cnt / duration), err=True)
@cli.command('scan', short_help='dump data from elasticsearch')
@click.option('--index', metavar='<index>',
help='Index to dump, use comma separated values to specify multiple indices.')
@click.option('--doc-type', metavar='<doc_type>',
help='Doc type to dump, use comma separated values to specify multiple types.')
@click.option('--query', metavar='<query>',
help='Query as json you want to run.')
@click.option('--scroll', default='5m',
help='Timeout for which to leave the scroll context alive, default 5m.')
@click.option('--ignore-errors', is_flag=True,
help='Ignore errors.')
@click.option('--size', type=int, default=100,
help='Number of documents (per shard) to retrieve at once.')
@click.option('--include-meta', is_flag=True,
help='Include metadata (_index, _type, _id, ...) in the output.')
@click.option('--meta-keys',
help=(
'When including metadata, limit the keys to be included.'
' _source is always included. Example (only include _type and _id):'
' --meta-keys _type,_id'
))
@click.option('--notify-every', type=int, default=10000, metavar='<N>',
help='Print a summary every <N> documents, default 10000.')
@click.argument('output', type=click.File('w'))
def scan(output, query, scroll, ignore_errors, include_meta, meta_keys, notify_every, **kwargs):
"""
Dump all data from <index>/<doc_type> that match <query> into OUTPUT.
"""
if meta_keys and not include_meta:
raise click.BadParameter('You cannot specify meta keys if you are not including metadata.',
param_hint='--meta-keys')
# request all relevant metadata fields
fields = ('_source', '_parent', '_routing', '_timestamp')
if meta_keys:
meta_keys = set(k.strip() for k in meta_keys.split(','))
meta_keys.add('_source')
fields = tuple(f for f in fields if f in meta_keys)
cnt = 0
start = time.time()
for doc in helpers.scan(client, scroll=scroll, raise_on_error=not
ignore_errors, query=query, fields=fields, **kwargs):
cnt += 1
if not include_meta:
doc = doc['_source']
elif meta_keys:
doc = dict((k, v) for (k, v) in doc.items() if k in meta_keys)
output.write(json.dumps(doc))
output.write('\n')
if notify_every and cnt % notify_every == 0:
duration = time.time() - start
click.echo('%d documents read in %f seconds (%f docs/sec)' % (
cnt, duration, cnt / duration), err=True)
duration = time.time() - start
click.echo('%d documents read in %f seconds (%f docs/sec)' % (
cnt, duration, cnt / duration), err=True)
if __name__ == '__main__':
cli()
@jkryanchou
Copy link

pretty coooool~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment