Last active
July 12, 2018 19:34
-
-
Save bsa7/c22995b0d313bf8036c4398b9991d6c0 to your computer and use it in GitHub Desktop.
Find and repair attributes in elasticsearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# find and replace incorrect field values in elasticsearch indexes. | |
# Run: | |
# ruby ./bin/fix_elk_attribute_conflict elk_user=<name> elk_password=<password> index_type=<index_type> host=<host_ip> | |
# port=<host_port> attribute_name=<attribute_name> attribute_check_regexp='^\d+$' ssl=true | |
# name, password - nginx basic auth credentials | |
# index_type=<index_type> - type of elasticsearch indexes | |
# host=<host_ip> port=<host_port> - host&port of elasticsearch node | |
# attribute_name=<attribute_name> - name of field which need to correct | |
# attribute_check_regexp - regexp for check if value is String | |
# ssl - use https if true, http otherwise | |
require 'net/http' | |
require 'uri' | |
require 'json' | |
command_line_arguments = Hash[ARGV.map { |x| x.split('=') }] | |
USER_NAME = command_line_arguments['elk_user'] | |
USER_PASSWORD = command_line_arguments['elk_password'] | |
INDEX_TYPE = command_line_arguments['index_type'] | |
HOST = command_line_arguments['host'] | |
PORT = command_line_arguments['port'] | |
PROTOCOL = command_line_arguments['ssl'].to_s == 'true' ? 'https' : 'http' | |
ATTRIBUTE_NAME = command_line_arguments['attribute_name'] | |
ATTRIBUTE_CHECK_REGEXP = Regexp.new(command_line_arguments['attribute_check_regexp']) | |
def update_document(index_id:, doc_id:, request_body:) | |
url = "#{PROTOCOL}://#{USER_NAME}@#{HOST}:#{PORT}/#{index_id}/#{INDEX_TYPE}/#{doc_id}" | |
puts url: url | |
puts request_body: request_body | |
uri = URI.parse(url) | |
request = Net::HTTP::Put.new(uri) | |
request['Content-Type'] = 'application/json' | |
request.basic_auth USER_NAME, USER_PASSWORD | |
request.body = JSON.dump(request_body) if request_body | |
response = Net::HTTP.start(uri.hostname, uri.port) do |http| | |
http.request(request) | |
end | |
data = JSON.parse(response.body) | |
puts request_complete_with_message: data | |
end | |
def find_all_documents(command:, request_body: nil) | |
uri = URI.parse("#{PROTOCOL}://#{USER_NAME}@#{HOST}:#{PORT}#{command}") | |
request = Net::HTTP::Get.new(uri) | |
request['Content-Type'] = 'application/json' | |
request.basic_auth USER_NAME, USER_PASSWORD | |
request.body = JSON.dump(request_body) if request_body | |
response = Net::HTTP.start(uri.hostname, uri.port) do |http| | |
http.request(request) | |
end | |
data = JSON.parse(response.body) | |
{ | |
result: (data['hits'] || {})['hits'] || [], | |
total: (data['hits'] || {})['total'] | |
} | |
end | |
doc_pointer = 0 | |
page_size = 100 | |
total_count = page_size + 100 | |
total_count_message = true | |
errors = [] | |
while doc_pointer < total_count do | |
documents = find_all_documents(command: '/_search?pretty', request_body: { | |
from: doc_pointer, | |
size: page_size, | |
_source: [:_id, "#{ATTRIBUTE_NAME}"], | |
query: { | |
exists: { | |
field: "#{ATTRIBUTE_NAME}" | |
} | |
} | |
}) | |
if total_count_message | |
puts "Found #{documents[:total]} documents with attribute \"#{ATTRIBUTE_NAME}\"." | |
puts | |
total_count = documents[:total] | |
total_count_message = false | |
end | |
printf "\r Reading documents #{doc_pointer} - #{doc_pointer + page_size}" | |
documents[:result].each do |doc| | |
attribute_value = doc['_source'][ATTRIBUTE_NAME] | |
if !attribute_value | |
errors << { | |
attribute_value: attribute_value, | |
doc: doc['_id'], | |
index: doc['_index'], | |
message: "Document #{doc['_id']} have empty #{ATTRIBUTE_NAME} == '#{attribute_value}'", | |
correct_attribute_value: '0', | |
} | |
elsif !attribute_value.is_a? String | |
errors << { | |
attribute_value: attribute_value, | |
doc: doc['_id'], | |
index: doc['_index'], | |
message: "Document #{doc['_id']} have not string #{ATTRIBUTE_NAME} == '#{attribute_value}'", | |
correct_attribute_value: attribute_value.to_s, | |
} | |
elsif !attribute_value[ATTRIBUTE_CHECK_REGEXP] | |
errors << { | |
attribute_value: attribute_value, | |
doc: doc['_id'], | |
index: doc['_index'], | |
message: "Document #{doc['_id']} contains incorrect #{ATTRIBUTE_NAME} == '#{attribute_value}'", | |
correct_attribute_value: '0', | |
} | |
end | |
end | |
doc_pointer += page_size | |
end | |
puts | |
if errors.size > 0 | |
puts "============ Errors found =============" | |
errors.each do |error, index| | |
puts index | |
puts error[:message] | |
puts doc: error[:doc] | |
puts index: error[:index] | |
puts ATTRIBUTE_NAME: ATTRIBUTE_NAME | |
puts attribute_value: error[:attribute_value] | |
fix_hash = {} | |
fix_hash[ATTRIBUTE_NAME] = error[:correct_attribute_value] | |
update_document(index_id: error[:index], doc_id: error[:doc], request_body: fix_hash) | |
puts 'Value corrected to': error[:correct_attribute_value] | |
puts | |
end | |
else | |
puts 'All values of attribute is ok.' | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment