Skip to content

Instantly share code, notes, and snippets.

@jorisdevrede
Last active November 16, 2017 15:36
Show Gist options
  • Save jorisdevrede/7d61c374882ba2416ababfc63b29a30f to your computer and use it in GitHub Desktop.
Save jorisdevrede/7d61c374882ba2416ababfc63b29a30f to your computer and use it in GitHub Desktop.
Elasticsearch optimization
from argparse import ArgumentParser
from logbook import Logger, StreamHandler
import json
import requests
import sys
import uuid
StreamHandler(sys.stdout).push_application()
log = Logger(__name__)
class Connection:
"""Creates a connection to Elasticsearch"""
def __init__(self, baseurl):
"""Create a new connection to a master url
:param baseurl: url to an Elasticsearch master (e.g. http://esmaster:9200)"""
self._baseurl = baseurl
def alias_update(self, index, alias, operation='add'):
"""Add or remove an alias for an index
:param index: name of the index to create an alias for
:param alias: name of the alias
:param operation: either add or remove"""
data = {'actions': [
{operation: {'index': index, 'alias': alias}}
]}
response = requests.post(self._baseurl + '/_aliases', data=json.dumps(data))
log.debug('{} - {}'.format(response.status_code, response.text))
def cluster_get_health(self, params):
"""Retrieves the health of a cluster"""
response = requests.get(self._baseurl + '/_cluster/health', params=params)
log.debug('{} - {}'.format(response.status_code, response.text))
return response.json()
def cluster_get_stats(self):
"""Retrieves the statistics of a cluster"""
response = requests.get(self._baseurl + '/_cluster/stats')
log.debug('{} - {}'.format(response.status_code, response.text))
return response.json()
def index_create(self, index, mappings=None, settings=None):
"""Creates a new index"""
data = {}
if mappings is not None:
data['mappings'] = mappings
if settings is not None:
data['settings'] = settings
response = requests.post(self._baseurl + '/' + index, data=json.dumps(data))
log.debug('{} - {}'.format(response.status_code, response.text))
def index_delete(self, index):
"""Deletes an existing index"""
response = requests.delete(self._baseurl + '/' + index)
log.debug('{} - {}'.format(response.status_code, response.text))
def index_forcemerge(self, index):
"""Merges the segments of an index to 1"""
params = {'max_num_segments': 1}
response = requests.post(self._baseurl + '/' + index + '/_forcemerge', params=params)
log.debug('{} - {}'.format(response.status_code, response.text))
def index_get_config(self, index):
"""Retrieves the config of an index"""
response = requests.get(self._baseurl + '/' + index)
log.debug('{} - {}'.format(response.status_code, response.text))
return response.json()
def index_get_stats(self, index):
"""Retrieves the statistics of an index"""
response = requests.get(self._baseurl + '/' + index + '/_stats')
log.debug('{} - {}'.format(response.status_code, response.text))
return response.json()
def index_reindex(self, index, new_index, shards=1):
"""Reindexes an index
Creates a new index with the same mapping and settings as the old index
except for shards and replicas, and reindexes the the data from the old
index into the new.
:param index: old index to reindex from
:param new_index: new index
:param shards: number of shards for the new index"""
index_config = self.index_get_config(index)
if requests.head(self._baseurl + '/' + new_index).status_code == 404:
c_mapping = index_config[index]['mappings']
c_setting = {'index': {
'analysis': index_config[index]['settings']['index']['analysis']}}
self.index_create(new_index, c_mapping, c_setting)
data = {'source': {'index': index},
'dest': {'index': new_index}}
log.info('Reindexing index {} to new_index {} with {} shard(s)'.format(index, new_index, shards))
response = requests.post(self._baseurl + '/_reindex', data=json.dumps(data))
log.debug('{} - {}'.format(response.status_code, response.text))
class ESOptimization:
def __init__(self, baseurl):
self._connection = Connection(baseurl)
def index_replace(self, index, shards):
"""Creates a new index with another number of shards"""
# TODO: include an automatic snapshot
new_index = str(uuid.uuid4())
self._connection.index_reindex(index, new_index, shards)
# TODO: make the timeout dependent on the index size
self._connection.cluster_get_health({'wait_for_status': 'green', 'timeout': '1h'})
self._connection.index_delete(index)
self._connection.alias_update(new_index, index)
self._connection.index_forcemerge(new_index)
def index_optimize_shards(self, index):
index_config = self._connection.index_get_config(index)
index_shards = int(index_config[index]['settings']['index']['number_of_shards'])
index_stats = self._connection.index_get_stats(index)
index_primary_size = int(index_stats['_all']['primaries']['store']['size_in_bytes'])
size_2gb = 2147483648
size_30gb = 32212254720
size_45gb = 48318382080
if index_primary_size < size_2gb and index_shards > 1:
log.info("Reducing index {} to 1 shard".format(index))
self.index_replace(index, 1)
elif index_primary_size / index_shards > size_45gb:
number_of_shards = round(index_primary_size / size_30gb)
if index_shards < number_of_shards:
log.info("Increasing index {} to {} shards".format(index, number_of_shards))
self.index_replace(index, number_of_shards)
if __name__ == '__main__':
parser = ArgumentParser(description="Optimize the sharding of an Elasticsearch index")
parser.add_argument('-i', '--index', dest='index')
parser.add_argument('-u', '--url', dest='url')
args = parser.parse_args()
esop = ESOptimization(args.url)
esop.index_optimize_shards(args.index)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment