Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save 3014zhangshuo/6d34e2fe595e9576c9a3828ec9291643 to your computer and use it in GitHub Desktop.
Save 3014zhangshuo/6d34e2fe595e9576c9a3828ec9291643 to your computer and use it in GitHub Desktop.
Bulk delete of documents from Elasticsearch in Ruby on Rails

Bulk delete of documents from Elasticsearch in Ruby on Rails

When someone deletes his account we need to delete data from the elasticsearch. All account data are removed by bulk SQL query but elasticsearch-model cannot be notified to destroy related elasticsearch documents. I had to find out how to do it. It had to be done in the background job otherwise it could take too long.

After looking around in the source code of elasticsearch ruby gems, Elastcisearch API and with a little help from Karmi (author of elasticsearch gems) I found a solution. It consists of the following three things.

Elasticsearch scroll-scan API

Elasticsearch scroll-scan API is designed for scrolling through large amount of documents from single search request. It also disables sorting and scroll through data in the most efficient way.

With Elasticsearch gem using this API is very easy, you can find how to use it in the API documentation of elasticsearch-api gem and I also show it below.

Elasticsearch bulk API

Elasticsearch provides bulk API for manipulation of stored documents. I needed to delete all documents from destroyed account. Bulk API makes it quite easy, only thing I had to do is to go through all documents and create payload for bulk API in the following form

{ "delete": { "_index": "index_name", "_type": "index_type", "_id": "1" } }
{ "delete": { "_index": "index_name", "_type": "index_type", "_id": "1" } }
...

I found how to do it in the source code of elasticsearch-rails gem.

Lets go to look at some code

I created following DelayedJob job (description is below the code).

class ElasticsearchCleanUpJob < Struct.new(:options)
  BATCH_SIZE = 100

  def perform
    raise "Account id is required" if options[:account_id].blank?
    
    klass = constantize_klass(options[:klass_name])
    delete(klass)
  end
  
  def delete(klass = nil)
    client = Elasticsearch::Client.new
    response = client.search(search_query(klass))

    while (response = client.scroll(scroll_id: response['_scroll_id'], scroll: '5m')) && response['hits']['hits'].any?
      client.bulk body: bulkify_delete(response)
    end 
  end

private
  
  def search_query(klass)
    scan_search = {
      search_type: 'scan', 
      scroll:      '5m', 
      size:        BATCH_SIZE, 
      body:        search_data_body
    }
    scan_search.merge!(index: klass.index_name, type: klass.document_type) if klass.present?
    
    scan_search
  end
  
  def search_data_body
    { 
      filter: { 
        term: { 
          account_id: options[:account_id]
        } 
      }
    }
  end
  
  def bulkify_delete(response)
    response['hits']['hits'].map { |r| { delete: { _index: r['_index'], _type: r['_type'], _id: r['_id'] } } }
  end
  
  def constantize_klass(klass_name)
    return nil if klass_name.blank?
    
    klass_name.constantize
  end
end

For those who don't use DelayedJob: DelayedJob requires implementation of perform method which will be performed when the job is taken from the queue and processed by worker.

All magic happens in the delete(klass = nil) method.

def delete(klass = nil)
  client = Elasticsearch::Client.new
  response = client.search(search_query(klass))

  while (response = client.scroll(scroll_id: response['_scroll_id'], scroll: '5m')) && response['hits']['hits'].any?
    client.bulk body: bulkify_delete(response)
  end 
end

Because I didn't want to delete data through Elasticsearch::Model I had to create custom Elasticsearch::Client instance. The delete method uses Elasticsearch scroll-scan API to scroll through all the documents of given account and deletes them.

Elasticsearch scroll-scan API consists of two parts searching and scrolling.

First, we have to find all documents we want to delete. That's done by client.search method.

response = client.search(search_query(klass))

search_query(klass) method returns following parameters to client.search method:

  • search_type - is set to scan, to disable ordering of documents
  • scroll - takes duration how long should Elasticsearch remember search context
  • size - how many records are returned per shard (if you have 5 shards and size is 100 you will get 500 results per one scroll)
  • body - search payload (defined in the search_data_body method)
  • index and type are used only if class, which includes Elasticsearch::Model, is given

This search does not return any documents, but it returns scroll_id which is used by scroll API.

while (response = client.scroll(scroll_id: response['_scroll_id'], scroll: '5m')) && response['hits']['hits'].any?
  client.bulk body: bulkify_delete(response)
end 

client.scroll iterates through all documents which responds to search payload. It takes only two params:

  • scroll_id - defines search context
  • scroll - same as above in the case of search

client.scroll returns records for given iteration and new scroll_id (only the last one is valid).

Each set of returned documents is transformed by bulkify_document method to bulk delete payload and sent to elasticsearch which deletes all of the required documents.

def bulkify_delete(response)
  response['hits']['hits'].map { |r| { delete: { _index: r['_index'], _type: r['_type'], _id: r['_id'] } } }
end

Usage

Deleting all account data with this job is quite easy. It requires only one line of code:

Delayed::Job.enqueue ElasticsearchCleanUpJob.new(account_id: account.id)

Or you can delete account data only for particular index.

Delayed::Job.enqueue ElasticsearchCleanUpJob.new(account_id: account.id, klass_name: 'Invoice')

This code deletes all account invoices from Elasticsearch invoices index.

Sources

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment