Last active
December 17, 2015 15:10
-
-
Save schaitanya/5630161 to your computer and use it in GitHub Desktop.
Create backup, and re-index elastic search data with scan and scroll
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "re-index", | |
"version": "0.0.1", | |
"description": "Create backup, and re-index elastic search data with scan and scroll", | |
"main": "re-index.coffee", | |
"author": "Chaitanya Surapaneni", | |
"license": "MIT", | |
"dependencies": { | |
"request": "~2.21.0", | |
"async": "~0.2.8", | |
"underscore": "~1.4.4", | |
"date-utils": "~1.2.12", | |
"progress": "~0.1.0" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
request = require 'request' | |
exec = require('child_process').exec | |
async = require 'async' | |
_ = require 'underscore' | |
dateUtil = require 'date-utils' | |
ProgressBar = require 'progress' | |
index = process.argv[2]?.split('--index=')[1] | |
#/usr/local/elasticsearch/data/nodes/0 | |
ES_HOME = process.argv[3]?.split('--es-home=')[1] | |
#/data/es-backup | |
BACKUP_DIR = process.argv[4]?.split('--backup-dir=')[1] | |
helper = """ | |
Usage: | |
coffee re-index.coffee --index=[index-name] --es-home=[] --backup-dir=[] | |
""" | |
unless index? and ES_HOME? and BACKUP_DIR? | |
console.log helper | |
process.exit() | |
server = "http://127.0.0.1:9200" | |
date = new Date().toFormat('YYYY-MM-DD-HH-MI-SS-PP') | |
new_index = "#{index}-#{date}".replace(/\ /g,'').toLowerCase() | |
type = 'document' | |
aliases = [] | |
async.waterfall([ | |
# Backup data from all nodes | |
(cb) -> | |
#return cb() #uncomment to not backup | |
file = "#{BACKUP_DIR}/Backup-#{index}-#{date}.tar.gz" | |
exec "cd #{ES_HOME} && nice -n 19 tar cvf #{file} #{index}", { maxBuffer: 5000*1024 }, (err, stdout, stderr) => | |
return cb err if err | |
console.log "Backed up to #{file}" | |
return cb() | |
# Check if Index exists | |
, (cb) -> | |
opts = | |
uri: "#{server}/#{index}" | |
method: "HEAD" | |
request opts, (e, r) => | |
err = if e or r.statusCode isnt 200 then "Index not found" else null | |
return cb err | |
# Check for aliases | |
, (cb) -> | |
console.log "Checking aliases" | |
opts = | |
uri: "#{server}/#{index}/_aliases" | |
json: true | |
request opts, (err, response, body) => | |
aliases = _.without _.flatten([_.first(_.keys body), _.keys _.values(body)?[0]?.aliases]), index | |
console.log "Aliases found: #{aliases}" unless _.isEmpty aliases | |
console.log 'No alias found.' if _.isEmpty aliases | |
cb() | |
# Initalize search and use the _scroll_id for next queries | |
, (cb) -> | |
opts = | |
uri: "#{server}/#{index}/#{type}/_search?search_type=scan&scroll=1m&size=500" | |
json: true | |
request opts, (e, r, b) => | |
console.log e if e | |
return cb e, b | |
, (b, callbacks) -> | |
console.log 'Processing data...' | |
scroll_id = b._scroll_id | |
opts = | |
uri: "#{server}/_search/scroll?scroll=10m&scroll_id=#{scroll_id}" | |
json: true | |
request opts, (e, r, b) => | |
console.log "Found: #{b.hits.total} document(s)" | |
bar = new ProgressBar "processing [:bar] :percent :elapseds elapsed", | |
cimplete: '=' | |
incomplete: ' ' | |
width: 30 | |
total: b.hits.total | |
items = b.hits.hits | |
totalHits = items.length | |
async.whilst -> | |
return items.length > 0 | |
, (callback) -> | |
pushData = ("{\"index\": { \"_index\" : \"#{new_index}\", \"_type\": \"#{type}\", \"_id\": \"#{i._id}\" }\n" + JSON.stringify(i._source) for i in items).join('\n') + '\n' | |
opts = | |
uri: "#{server}/_bulk" | |
method: 'POST' | |
body: pushData | |
# Submit _bulk query | |
request opts, (er, re, bo) => | |
bar.tick items.length | |
opts = | |
uri: "#{server}/_search/scroll?scroll=10m&scroll_id=#{scroll_id}" | |
json: true | |
# Get new scroll Id | |
request opts, (err, resp, body) => | |
totalHits += body.hits.hits.length | |
items = body.hits.hits | |
scroll_id = body._scroll_id | |
return callback() | |
, (err) -> | |
console.log "\nProcessed items: #{totalHits}" | |
callbacks() | |
# Delete old-index | |
, (cb) -> | |
console.log "Deleting old index" | |
opts = | |
uri: "#{server}/#{index}" | |
method: 'DELETE' | |
request opts, (e, r, b) => | |
e = "Error deleting index #{index}" if e | |
cb e | |
# Creating new alias and append existing aliases | |
, (cb) -> | |
console.log "Creating new alias and append existing aliases" | |
data = {} | |
data.actions = [ | |
{ add: { index: new_index, alias: index }} | |
] | |
_.forEach aliases, (alias) -> | |
data.actions.push { add: { index: new_index, alias: alias }} | |
opts = | |
uri: "#{server}/_aliases" | |
method: "POST" | |
body: JSON.stringify data | |
request opts, (e, r, b) => | |
e = "Error creating aliases for index #{new_index}" if e | |
cb e | |
], (e, r) -> | |
return console.log e if e | |
console.log 'Finished re-indexing' | |
process.exit() | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment