Skip to content

Instantly share code, notes, and snippets.

@simonmorley
Last active January 19, 2017 21:36
Show Gist options
  • Save simonmorley/5901be947407ea427f0a to your computer and use it in GitHub Desktop.
Save simonmorley/5901be947407ea427f0a to your computer and use it in GitHub Desktop.
Import and Alias Mulitple Indexes To Elasticsearch Rails
# A collection of Rake tasks to facilitate importing data from yout models into Elasticsearch.
# This will import the index as an alias, update the alias and delete the old ones
# It will also allow the importation of indexes via arguments
# Add this e.g. into the `lib/tasks/elasticsearch.rake` file in your Rails application:
#
# require 'elasticsearch/rails/tasks/import'
#
# To import the records from your `Article` model, run:
#
# $ bundle exec rake environment elasticsearch:import:model CLASS='MyModel, MyOtherModel'
#
STDOUT.sync = true
STDERR.sync = true
begin; require 'ansi/progressbar'; rescue LoadError; end
namespace :elasticsearch do
task :import => 'import:model'
namespace :import do
task :model do
if ENV['CLASS'].to_s == ''
puts '='*90, 'USAGE', '='*90, import_model_desc, ""
exit(1)
end
@date ||= Time.now.strftime '%Y%m%d%H%M%S'
klasses = ENV['CLASS'].split(",")
klasses.each do |klass|
klass = eval(klass.to_s)
@client = klass.__elasticsearch__.client
@alias_name = "#{klass.index_name}_#{@date}"
unless ENV['DEBUG']
begin
klass.__elasticsearch__.client.transport.logger.level = Logger::WARN
rescue NoMethodError; end
begin
klass.__elasticsearch__.client.transport.tracer.level = Logger::WARN
rescue NoMethodError; end
end
def import_index(klass)
total = klass.count rescue nil
pbar = ANSI::Progressbar.new(klass.to_s, total) rescue nil
pbar.__send__ :show if pbar
total_errors = klass.import force: ENV.fetch('FORCE', true),
batch_size: ENV.fetch('BATCH', 1000).to_i,
index: @alias_name,
type: ENV.fetch('TYPE', nil) do |response|
pbar.inc response['items'].size if pbar
STDERR.flush
STDOUT.flush
end
pbar.finish if pbar
puts "[CREATED ALIASES] #{total_errors} errors occurred" unless total_errors.zero?
end
def delete_old_aliases(index_name)
aliases = @client.indices.get_aliases(index: index_name).keys
aliases.each do |alias_name|
begin
if Time.parse(alias_name.gsub(/locations_/, '')) < 1.weeks.ago
@client.indices.delete index: alias_name
puts "Deleted alias #{alias_name}"
end
rescue
## Needs rescue message ###
end
end
puts '[IMPORT] Done'
end
def create_alias(index_name)
@client.indices.delete index: index_name rescue false
@client.indices.update_aliases body: {
actions: [
{ add: { index: @alias_name, alias: index_name } },
]
}
puts "[UPDATED ALIAS]"
end
### Import to alias ###
import_index(klass)
### LINK THE ALIAS TO THE INDEX NAME ###
create_alias(klass.index_name)
### UPDATE OLD INDICES ###
delete_old_aliases(klass.index_name)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment