Skip to content

Instantly share code, notes, and snippets.

@schaitanya
Last active December 17, 2015 15:10
Show Gist options
  • Save schaitanya/5630161 to your computer and use it in GitHub Desktop.
Save schaitanya/5630161 to your computer and use it in GitHub Desktop.
Create backup, and re-index elastic search data with scan and scroll
{
"name": "re-index",
"version": "0.0.1",
"description": "Create backup, and re-index elastic search data with scan and scroll",
"main": "re-index.coffee",
"author": "Chaitanya Surapaneni",
"license": "MIT",
"dependencies": {
"request": "~2.21.0",
"async": "~0.2.8",
"underscore": "~1.4.4",
"date-utils": "~1.2.12",
"progress": "~0.1.0"
}
}
request = require 'request'
exec = require('child_process').exec
async = require 'async'
_ = require 'underscore'
dateUtil = require 'date-utils'
ProgressBar = require 'progress'
index = process.argv[2]?.split('--index=')[1]
#/usr/local/elasticsearch/data/nodes/0
ES_HOME = process.argv[3]?.split('--es-home=')[1]
#/data/es-backup
BACKUP_DIR = process.argv[4]?.split('--backup-dir=')[1]
helper = """
Usage:
coffee re-index.coffee --index=[index-name] --es-home=[] --backup-dir=[]
"""
unless index? and ES_HOME? and BACKUP_DIR?
console.log helper
process.exit()
server = "http://127.0.0.1:9200"
date = new Date().toFormat('YYYY-MM-DD-HH-MI-SS-PP')
new_index = "#{index}-#{date}".replace(/\ /g,'').toLowerCase()
type = 'document'
aliases = []
async.waterfall([
# Backup data from all nodes
(cb) ->
#return cb() #uncomment to not backup
file = "#{BACKUP_DIR}/Backup-#{index}-#{date}.tar.gz"
exec "cd #{ES_HOME} && nice -n 19 tar cvf #{file} #{index}", { maxBuffer: 5000*1024 }, (err, stdout, stderr) =>
return cb err if err
console.log "Backed up to #{file}"
return cb()
# Check if Index exists
, (cb) ->
opts =
uri: "#{server}/#{index}"
method: "HEAD"
request opts, (e, r) =>
err = if e or r.statusCode isnt 200 then "Index not found" else null
return cb err
# Check for aliases
, (cb) ->
console.log "Checking aliases"
opts =
uri: "#{server}/#{index}/_aliases"
json: true
request opts, (err, response, body) =>
aliases = _.without _.flatten([_.first(_.keys body), _.keys _.values(body)?[0]?.aliases]), index
console.log "Aliases found: #{aliases}" unless _.isEmpty aliases
console.log 'No alias found.' if _.isEmpty aliases
cb()
# Initalize search and use the _scroll_id for next queries
, (cb) ->
opts =
uri: "#{server}/#{index}/#{type}/_search?search_type=scan&scroll=1m&size=500"
json: true
request opts, (e, r, b) =>
console.log e if e
return cb e, b
, (b, callbacks) ->
console.log 'Processing data...'
scroll_id = b._scroll_id
opts =
uri: "#{server}/_search/scroll?scroll=10m&scroll_id=#{scroll_id}"
json: true
request opts, (e, r, b) =>
console.log "Found: #{b.hits.total} document(s)"
bar = new ProgressBar "processing [:bar] :percent :elapseds elapsed",
cimplete: '='
incomplete: ' '
width: 30
total: b.hits.total
items = b.hits.hits
totalHits = items.length
async.whilst ->
return items.length > 0
, (callback) ->
pushData = ("{\"index\": { \"_index\" : \"#{new_index}\", \"_type\": \"#{type}\", \"_id\": \"#{i._id}\" }\n" + JSON.stringify(i._source) for i in items).join('\n') + '\n'
opts =
uri: "#{server}/_bulk"
method: 'POST'
body: pushData
# Submit _bulk query
request opts, (er, re, bo) =>
bar.tick items.length
opts =
uri: "#{server}/_search/scroll?scroll=10m&scroll_id=#{scroll_id}"
json: true
# Get new scroll Id
request opts, (err, resp, body) =>
totalHits += body.hits.hits.length
items = body.hits.hits
scroll_id = body._scroll_id
return callback()
, (err) ->
console.log "\nProcessed items: #{totalHits}"
callbacks()
# Delete old-index
, (cb) ->
console.log "Deleting old index"
opts =
uri: "#{server}/#{index}"
method: 'DELETE'
request opts, (e, r, b) =>
e = "Error deleting index #{index}" if e
cb e
# Creating new alias and append existing aliases
, (cb) ->
console.log "Creating new alias and append existing aliases"
data = {}
data.actions = [
{ add: { index: new_index, alias: index }}
]
_.forEach aliases, (alias) ->
data.actions.push { add: { index: new_index, alias: alias }}
opts =
uri: "#{server}/_aliases"
method: "POST"
body: JSON.stringify data
request opts, (e, r, b) =>
e = "Error creating aliases for index #{new_index}" if e
cb e
], (e, r) ->
return console.log e if e
console.log 'Finished re-indexing'
process.exit()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment