When someone deletes his account we need to delete data from the elasticsearch. All account data are removed by bulk SQL query but elasticsearch-model
cannot be notified to destroy related elasticsearch documents. I had to find out how to do it. It had to be done in the background job otherwise it could take too long.
After looking around in the source code of elasticsearch ruby gems, Elastcisearch API and with a little help from Karmi (author of elasticsearch gems) I found a solution. It consists of the following three things.
- Elasticsearch scroll-scan API (thanks Karmi for suggesting it)
- Elasticsearch bulk API
- Elasticsearch gem
Elasticsearch scroll-scan API is designed for scrolling through large amount of documents from single search request. It also disables sorting and scroll through data in the most efficient way.
With Elasticsearch gem using this API is very easy, you can find how to use it in the API documentation of elasticsearch-api gem and I also show it below.
Elasticsearch provides bulk API for manipulation of stored documents. I needed to delete all documents from destroyed account. Bulk API makes it quite easy, only thing I had to do is to go through all documents and create payload for bulk API in the following form
{ "delete": { "_index": "index_name", "_type": "index_type", "_id": "1" } }
{ "delete": { "_index": "index_name", "_type": "index_type", "_id": "1" } }
...
I found how to do it in the source code of elasticsearch-rails gem.
I created following DelayedJob job (description is below the code).
class ElasticsearchCleanUpJob < Struct.new(:options)
BATCH_SIZE = 100
def perform
raise "Account id is required" if options[:account_id].blank?
klass = constantize_klass(options[:klass_name])
delete(klass)
end
def delete(klass = nil)
client = Elasticsearch::Client.new
response = client.search(search_query(klass))
while (response = client.scroll(scroll_id: response['_scroll_id'], scroll: '5m')) && response['hits']['hits'].any?
client.bulk body: bulkify_delete(response)
end
end
private
def search_query(klass)
scan_search = {
search_type: 'scan',
scroll: '5m',
size: BATCH_SIZE,
body: search_data_body
}
scan_search.merge!(index: klass.index_name, type: klass.document_type) if klass.present?
scan_search
end
def search_data_body
{
filter: {
term: {
account_id: options[:account_id]
}
}
}
end
def bulkify_delete(response)
response['hits']['hits'].map { |r| { delete: { _index: r['_index'], _type: r['_type'], _id: r['_id'] } } }
end
def constantize_klass(klass_name)
return nil if klass_name.blank?
klass_name.constantize
end
end
For those who don't use DelayedJob: DelayedJob requires implementation of perform
method which will be performed when the job is taken from the queue and processed by worker.
All magic happens in the delete(klass = nil)
method.
def delete(klass = nil)
client = Elasticsearch::Client.new
response = client.search(search_query(klass))
while (response = client.scroll(scroll_id: response['_scroll_id'], scroll: '5m')) && response['hits']['hits'].any?
client.bulk body: bulkify_delete(response)
end
end
Because I didn't want to delete data through Elasticsearch::Model
I had to create custom Elasticsearch::Client
instance.
The delete
method uses Elasticsearch scroll-scan API to scroll through all the documents of given account and deletes them.
Elasticsearch scroll-scan API consists of two parts searching and scrolling.
First, we have to find all documents we want to delete. That's done by client.search
method.
response = client.search(search_query(klass))
search_query(klass)
method returns following parameters to client.search
method:
search_type
- is set toscan
, to disable ordering of documentsscroll
- takesduration
how long should Elasticsearch remember search contextsize
- how many records are returned per shard (if you have 5 shards and size is 100 you will get 500 results per one scroll)body
- search payload (defined in thesearch_data_body
method)index
andtype
are used only if class, which includesElasticsearch::Model
, is given
This search does not return any documents, but it returns scroll_id
which is used by scroll API.
while (response = client.scroll(scroll_id: response['_scroll_id'], scroll: '5m')) && response['hits']['hits'].any?
client.bulk body: bulkify_delete(response)
end
client.scroll
iterates through all documents which responds to search payload. It takes only two params:
scroll_id
- defines search contextscroll
- same as above in the case ofsearch
client.scroll
returns records for given iteration and new scroll_id
(only the last one is valid).
Each set of returned documents is transformed by bulkify_document
method to bulk delete payload and sent to elasticsearch which deletes all of the required documents.
def bulkify_delete(response)
response['hits']['hits'].map { |r| { delete: { _index: r['_index'], _type: r['_type'], _id: r['_id'] } } }
end
Deleting all account data with this job is quite easy. It requires only one line of code:
Delayed::Job.enqueue ElasticsearchCleanUpJob.new(account_id: account.id)
Or you can delete account data only for particular index.
Delayed::Job.enqueue ElasticsearchCleanUpJob.new(account_id: account.id, klass_name: 'Invoice')
This code deletes all account invoices from Elasticsearch invoices
index.
- readmes and source codes of elasticsearch (v1.0.6) and elasticsearch-rails (v0.1.6) gems
- API documentation of elasticsearch-api gem
- Elasticsearch documentation of scroll API and bulk API