Created
October 16, 2017 21:26
-
-
Save zgohr/a393d9186da05731a81973670d3dbb80 to your computer and use it in GitHub Desktop.
Zero downtime elasticsearch reindex that uses elasticsearch-dsl doctypes and sql alchemy entities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def reindex(self, entities): | |
# Note - for some period of time, this doubles the memory usage of your ES cluster | |
# If this is a problem, the ES reindex function can take source/destination cluster so you can | |
# throw away the entire old cluster after everything is migrated. | |
# This assumes you use an alias (shown as `self.index` in this example) for your API calls | |
# 1. Create a new index with a unique name | |
# Set all of the index settings on the new index | |
# Set all of the doctype mappings on it | |
# 2. Use the Elasticsearch reindex helper to copy all documents over to new index | |
# 3. Swap the alias so API calls will use the new index. Drop the old index. | |
# 4. Reload all documents onto the new index to get any newly mapped fields that may need to be indexed | |
old_index = list(self.es.indices.get_alias(self.index).keys())[0] | |
new_index = self._generate_actual_index() # Unique name of the underlying index | |
self.create_index(new_index) # create the index with any index and analyzer settings | |
# initialize the DocType mappings on the new index | |
for entity in entities: | |
# All of the entities in ES implement `doc_type()` | |
doc_type = entity.doc_type() | |
doc_type.init(new_index, using=self.es) | |
helpers.reindex(self.es, old_index, new_index) | |
self.es.indices.put_alias(new_index, self.index) | |
self.es.indices.update_aliases({ | |
"actions": [ | |
{"remove": {"index": old_index, "alias": self.index}}, | |
{"add": {"index": new_index, "alias": self.index}} | |
] | |
}) | |
self.es.indices.delete(old_index) | |
for entity in entities: | |
# Some function to pull batches out of the database (limit/offset) | |
# Then use the `streaming_bulk` ES helper to push them | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment