Skip to content

Instantly share code, notes, and snippets.

@rmihael
Created October 17, 2017 13:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rmihael/4f37bce239c9265ec69f2dc695ffd405 to your computer and use it in GitHub Desktop.
Save rmihael/4f37bce239c9265ec69f2dc695ffd405 to your computer and use it in GitHub Desktop.
ES latency check
from __future__ import unicode_literals, absolute_import, division, print_function
from gevent import monkey
monkey.patch_socket()
import random
import string
from time import time
import traceback
import click
import sys
from elasticsearch import Elasticsearch
from hdrh.histogram import HdrHistogram
from gevent.pool import Pool
from django.conf import settings
click.disable_unicode_literals_warning = True
def build_document(_):
key = "".join(random.sample(string.ascii_lowercase, 20))
doc = {"unique_field": key}
return doc
def measure_single_document(cluster, document_type, i, index_name):
document = build_document(i)
cluster.index(index=index_name, doc_type=document_type, body=document)
start = time()
while True:
query = {
"query": {
"term": {"unique_field": document["unique_field"]}
}
}
result = cluster.search(index=index_name, doc_type=document_type, body=query)
now = time()
if result["hits"]["total"] > 0:
break
else:
if now - start > 20:
click.echo("timeout", err=True)
return
return int((now - start) * 1000) # milliseconds
def run_latency_test(cluster, workers, index_name, document_type, count):
histogram = HdrHistogram(lowest_trackable_value=1, highest_trackable_value=60 * 60 * 1000, significant_figures=1)
pool = Pool(size=workers)
with click.progressbar(xrange(count), length=count, file=sys.stderr) as items:
values = pool.imap(lambda i: measure_single_document(cluster, document_type, i, index_name), items)
for v in values:
if v is not None:
histogram.record_value(v)
return histogram
@click.command()
@click.argument("es_host")
@click.argument("count", type=click.INT)
@click.option("--workers", type=click.INT, default=10)
def main(es_host, count, workers):
document_type = "latency_check_document_type"
cluster = Elasticsearch(
[{'host': es_host, 'port': 9200}],
sniff_on_start=True,
retry_on_timeout=False,
sniff_timeout=3,
sniff_on_connection_fail=True,
sniffer_timeout=20)
index_name = settings.MIGRATION_CONTACT_INDEX_SEARCH_BY
try:
hist = run_latency_test(cluster, workers, index_name, document_type, count)
except Exception:
traceback.print_exc()
else:
hist.output_percentile_distribution(out_file=sys.stdout, output_value_unit_scaling_ratio=1)
finally:
click.echo("Removing %s documents" % document_type, err=True)
cluster.delete_by_query(index_name, doc_type=document_type, body={"query": {"match_all": {}}})
if __name__ == "__main__":
main()
@dadoonet
Copy link

I think you should just remove the index instead of running a delete by query. But it won't change anything. Just that it will be faster to cleanup things :)

@rambai
Copy link

rambai commented Mar 12, 2020

hello, How to run this script?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment