Skip to content

Instantly share code, notes, and snippets.

@danielmitterdorfer
Created August 23, 2018 07:29
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielmitterdorfer/69359c521a85bfeb7f8cbc46cfb8072c to your computer and use it in GitHub Desktop.
Save danielmitterdorfer/69359c521a85bfeb7f8cbc46cfb8072c to your computer and use it in GitHub Desktop.
Replication Benchmark

Requirements

To run this benchmark you need to install Rally.

Usage

Store the files above in ~/tracks/replication and run with:

esrally --car=4gheap --track-path=~/tracks/replication --distribution-version=6.3.0 --target-host=IP_node_0:39200,IP_node_1:39200,IP_node_2:39200

Note that we benchmark against a remote cluster here so be sure to follow the instructions on how to benchmark a remote cluster. Alternatively, setup the cluster yourself and follow the instructions on how to benchmark an existing cluster.

{
"version": 2,
"description": "Taxi rides in New York in 2015",
"indices": [
{
"name": "nyc_taxis",
"body": "index.json",
"types": ["type"]
}
],
"corpora": [
{
"name": "nyc_taxis",
"base-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/nyc_taxis",
"documents": [
{
"source-file": "documents.json.bz2",
"document-count": 165346692,
"compressed-bytes": 4812721501,
"uncompressed-bytes": 79802445255
}
]
}
],
"schedule": [
{
"operation": {
"name": "configure-unthrottled-recovery",
"include-in-reporting": false,
"operation-type": "raw-request",
"method": "PUT",
"path": "/_cluster/settings",
"body": {
"transient" : {
"indices.recovery.max_bytes_per_sec" : "20000mb"
}
}
}
},
{
"operation": "delete-index"
},
{
"operation": {
"operation-type": "create-index",
"settings": {%- if index_settings is defined %} {{index_settings | tojson}} {%- else %} {
}{%- endif %}
}
},
{
"name": "check-cluster-health",
"operation": {
"operation-type": "cluster-health",
"index": "nyc_taxis",
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
}
}
},
{
"operation": {
"name": "index",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(10000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
"warmup-time-period": 240,
"clients": {{bulk_indexing_clients | default(8)}}
},
{
"operation": {
"name": "start-replication",
"include-in-reporting": false,
"operation-type": "raw-request",
"method": "PUT",
"path": "/nyc_taxis/_settings",
"body": {
"index" : {
"number_of_replicas" : 1
}
}
}
},
{
"operation": {
"name": "replicate",
"operation-type": "indices-recovery"
},
"clients": 1,
"warmup-time-period": 120,
"#COMMENT": "Check once a second",
"target-throughput": 1
}
]
}
import json
import logging
class IndicesRecovery:
def __init__(self):
self.logger = logging.getLogger(__name__)
self._completed = False
self._percent_completed = 0.0
self._last_recovered = None
@property
def completed(self):
return self._completed
@property
def percent_completed(self):
return self._percent_completed
def __call__(self, es, params):
response = es.indices.recovery(active_only=True)
self.logger.info(json.dumps(response, indent=2))
if not response:
self._completed = True
self._percent_completed = 1.0
self._last_recovered = None
return 0, "bytes"
else:
recovered = 0
total_size = 0
#total_time_in_millis = 0
for _, idx_data in response.items():
for _, shard_data in idx_data.items():
for shard in shard_data:
idx_size = shard["index"]["size"]
recovered += idx_size["recovered_in_bytes"]
total_size += idx_size["total_in_bytes"]
# translog is not in size but rather in absolute numbers. Ignore it for progress reporting.
# translog = shard_data["translog"]
# we only consider it completed if we get an empty response
self._completed = False
self._percent_completed = recovered / total_size
# this is cumulative so we need to consider the data from last time
if self._last_recovered:
newly_recovered = recovered - self._last_recovered
else:
newly_recovered = recovered
self._last_recovered = recovered
return newly_recovered, "bytes"
def register(registry):
registry.register_runner("indices-recovery", IndicesRecovery())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment