Skip to content

Instantly share code, notes, and snippets.

@randsina
Created September 7, 2017 10:24
Show Gist options
  • Save randsina/1f84b2cfe7cbb0e7c6035a29235b8e12 to your computer and use it in GitHub Desktop.
Save randsina/1f84b2cfe7cbb0e7c6035a29235b8e12 to your computer and use it in GitHub Desktop.
module AdEngine
module Elasticsearch
module Updater
class ImporterEs
CORE_COUNT = 2
DEGREE_OF_PARALLELISM = 32
BATCH_SIZE = 256
def reset!(suffix = Time.zone.now.to_i)
indexes = TestDataIndex.indexes
TestDataIndex.create! suffix#, alias: false
new_index = TestDataIndex.build_index_name(suffix: suffix)
@pner = AdEngine::Elasticsearch::Pner.new
import(new_index)
# Chewy.client.indices.update_aliases(body: { actions: [
# *indexes.map do |index|
# { remove: { index: index, alias: TestDataIndex.index_name } }
# end,
# { add: { index: new_index, alias: TestDataIndex.index_name } }
# ] })
# Chewy.client.indices.delete index: indexes if indexes.present?
end
private
def import(suffix)
logger = Logger.new(STDOUT)
::DataFeedItem.find_in_batches(batch_size: BATCH_SIZE) do |items|
segments = DEGREE_OF_PARALLELISM * CORE_COUNT
# size = items.count / segments + 1
start_time = Time.zone.now
parts = items.each_slice(64).map do |segment|
chunk = ""
segment.each do |product|
data = product.attributes
pner_instance = @pner.call(title: data['offer_title'])
chunk << "{ \"index\" : {\"_index\":\"#{suffix}\",\"_type\":\"data_feed_item\",\"_id\":#{data['id']}} }\n{\"manufacturer\":\"#{data['manufacturer']}\",\"offer_description\":\"#{data['offer_description'].to_s.gsub(/\\'/,"\'").gsub('"', '\\"')}\",\"offer_id\":\"#{data['offer_id']}\",\"offer_title\":\"#{data['offer_title'].gsub('"', '\\"')}\",\"source\":#{data['source']},\"price\":\"#{data['price']}\",\"category\":\"#{data['category']}\",\"sub_category\":\"#{data['sub_category']}\",\"shipping_rate\":\"#{data['shipping_rate']}\",\"merchant\":\"#{data['merchant']}\"#{pner_instance.format}}\n"
end
chunk
end
Chewy.client.bulk(body: parts)
end_time = Time.zone.now
logger.fatal "Items processed - #{items.count / (end_time - start_time)}"
end
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment