Skip to content

Instantly share code, notes, and snippets.

@bsa7
Last active July 12, 2018 19:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bsa7/c22995b0d313bf8036c4398b9991d6c0 to your computer and use it in GitHub Desktop.
Save bsa7/c22995b0d313bf8036c4398b9991d6c0 to your computer and use it in GitHub Desktop.
Find and repair attributes in elasticsearch
#!/usr/bin/env ruby
# find and replace incorrect field values in elasticsearch indexes.
# Run:
# ruby ./bin/fix_elk_attribute_conflict elk_user=<name> elk_password=<password> index_type=<index_type> host=<host_ip>
# port=<host_port> attribute_name=<attribute_name> attribute_check_regexp='^\d+$' ssl=true
# name, password - nginx basic auth credentials
# index_type=<index_type> - type of elasticsearch indexes
# host=<host_ip> port=<host_port> - host&port of elasticsearch node
# attribute_name=<attribute_name> - name of field which need to correct
# attribute_check_regexp - regexp for check if value is String
# ssl - use https if true, http otherwise
require 'net/http'
require 'uri'
require 'json'
command_line_arguments = Hash[ARGV.map { |x| x.split('=') }]
USER_NAME = command_line_arguments['elk_user']
USER_PASSWORD = command_line_arguments['elk_password']
INDEX_TYPE = command_line_arguments['index_type']
HOST = command_line_arguments['host']
PORT = command_line_arguments['port']
PROTOCOL = command_line_arguments['ssl'].to_s == 'true' ? 'https' : 'http'
ATTRIBUTE_NAME = command_line_arguments['attribute_name']
ATTRIBUTE_CHECK_REGEXP = Regexp.new(command_line_arguments['attribute_check_regexp'])
def update_document(index_id:, doc_id:, request_body:)
url = "#{PROTOCOL}://#{USER_NAME}@#{HOST}:#{PORT}/#{index_id}/#{INDEX_TYPE}/#{doc_id}"
puts url: url
puts request_body: request_body
uri = URI.parse(url)
request = Net::HTTP::Put.new(uri)
request['Content-Type'] = 'application/json'
request.basic_auth USER_NAME, USER_PASSWORD
request.body = JSON.dump(request_body) if request_body
response = Net::HTTP.start(uri.hostname, uri.port) do |http|
http.request(request)
end
data = JSON.parse(response.body)
puts request_complete_with_message: data
end
def find_all_documents(command:, request_body: nil)
uri = URI.parse("#{PROTOCOL}://#{USER_NAME}@#{HOST}:#{PORT}#{command}")
request = Net::HTTP::Get.new(uri)
request['Content-Type'] = 'application/json'
request.basic_auth USER_NAME, USER_PASSWORD
request.body = JSON.dump(request_body) if request_body
response = Net::HTTP.start(uri.hostname, uri.port) do |http|
http.request(request)
end
data = JSON.parse(response.body)
{
result: (data['hits'] || {})['hits'] || [],
total: (data['hits'] || {})['total']
}
end
doc_pointer = 0
page_size = 100
total_count = page_size + 100
total_count_message = true
errors = []
while doc_pointer < total_count do
documents = find_all_documents(command: '/_search?pretty', request_body: {
from: doc_pointer,
size: page_size,
_source: [:_id, "#{ATTRIBUTE_NAME}"],
query: {
exists: {
field: "#{ATTRIBUTE_NAME}"
}
}
})
if total_count_message
puts "Found #{documents[:total]} documents with attribute \"#{ATTRIBUTE_NAME}\"."
puts
total_count = documents[:total]
total_count_message = false
end
printf "\r Reading documents #{doc_pointer} - #{doc_pointer + page_size}"
documents[:result].each do |doc|
attribute_value = doc['_source'][ATTRIBUTE_NAME]
if !attribute_value
errors << {
attribute_value: attribute_value,
doc: doc['_id'],
index: doc['_index'],
message: "Document #{doc['_id']} have empty #{ATTRIBUTE_NAME} == '#{attribute_value}'",
correct_attribute_value: '0',
}
elsif !attribute_value.is_a? String
errors << {
attribute_value: attribute_value,
doc: doc['_id'],
index: doc['_index'],
message: "Document #{doc['_id']} have not string #{ATTRIBUTE_NAME} == '#{attribute_value}'",
correct_attribute_value: attribute_value.to_s,
}
elsif !attribute_value[ATTRIBUTE_CHECK_REGEXP]
errors << {
attribute_value: attribute_value,
doc: doc['_id'],
index: doc['_index'],
message: "Document #{doc['_id']} contains incorrect #{ATTRIBUTE_NAME} == '#{attribute_value}'",
correct_attribute_value: '0',
}
end
end
doc_pointer += page_size
end
puts
if errors.size > 0
puts "============ Errors found ============="
errors.each do |error, index|
puts index
puts error[:message]
puts doc: error[:doc]
puts index: error[:index]
puts ATTRIBUTE_NAME: ATTRIBUTE_NAME
puts attribute_value: error[:attribute_value]
fix_hash = {}
fix_hash[ATTRIBUTE_NAME] = error[:correct_attribute_value]
update_document(index_id: error[:index], doc_id: error[:doc], request_body: fix_hash)
puts 'Value corrected to': error[:correct_attribute_value]
puts
end
else
puts 'All values of attribute is ok.'
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment