Skip to content

Instantly share code, notes, and snippets.

@csabahenk
Last active July 10, 2017 11:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save csabahenk/1389b608b27881782d9e63cc4c14b9e1 to your computer and use it in GitHub Desktop.
Save csabahenk/1389b608b27881782d9e63cc4c14b9e1 to your computer and use it in GitHub Desktop.
Elasticsearch test script that indexes Mediawiki dumps
#!/usr/bin/env ruby
require 'yaml'
require 'net/http'
require 'socket'
require 'uri'
require 'open-uri'
require 'pathname'
require 'fileutils'
require 'etc'
require 'json'
require 'logger'
require 'optparse'
require 'time'
require 'shellwords'
require 'zlib'
CONFFILE = "/etc/elasticsearch/elasticsearch.yml"
OPTS = {
statedir: "/var/local/elastictest",
width: 4,
elasticservice: {
'2.4' => [%w[sudo -u elasticsearch], :elasticsearch,
%w[-Des.default.path.home=/usr/share/elasticsearch
-Des.default.path.conf=/etc/elasticsearch]].flatten,
'5.x' => [%w[sudo -u elasticsearch env ES_HOME=/usr/share/elasticsearch],
:elasticsearch, "-Edefault.path.conf=/etc/elasticsearch"].flatten
},
elasticsearch: "/usr/share/elasticsearch/bin/elasticsearch",
mediawiki: "en.wikiquote.org",
datauri: nil, # uri of a dump of :mediawiki
wikisettings: nil,
chunklines: 500,
debug: false,
strace: false,
straceopts: "-v -efile,desc -f -s500 -y -tt",
tracefiles: [],
}
ELASTICUSER = "elasticsearch"
ELASTICPORT = 9200
ERRORPATTERN = /failed|Exception/
# Ruby 2.0 compat shim
unless [].respond_to? :to_h
class Array
def to_h
h = {}
each { |k,v| h[k] = v }
h
end
end
end
# utility func for iterated field retrieval
def ipath a,*i
i.flatten.each { |j| a = a[j] }
a
end
# step the counter in pathname (eg. /foo/bar01 -> /foo/bar02)
# return [old counter, new pathname]
def nextpath path
(bp = File.basename(path)) =~ /\d+/
k = $&.to_i
w = $&.size
[k, File.join(File.dirname(path), bp.sub($&, "%0#{w}d" % (k+1)))]
end
def savefile handle:nil, path:nil, targetdir:nil, blocksize: 1<<20, size:nil
[handle, path].compact.empty? and raise ArgumentError,
"one of handle and path should be specified"
path ||= handle.path
targetdir ||= File.dirname path
base = File.basename path
esc = proc {|*a| a.join.shellescape }
puts "Saving #{path} ..."
obj,sig = handle ? [handle, [:instance_eval]] : [Kernel, [:open, path]]
obj.send(*sig) { |h|
size ||= h.stat.size - h.pos
open("|pv -c -N #{esc["from: ", base]} -s #{size} | gzip " \
"| pv -c -N #{esc["to: ", base, ".gz"]} > " \
"#{esc[File.join(targetdir, base), ".gz"]}", "w") { |z|
while b = h.read(blocksize)
z << b
end
}
}
puts "OK."
end
exit if ENV["SYNTAX_TEST"] == "1"
############
# load confs
############
arrayopts = %i[straceopts]
conf = nil
op = OptionParser.new
optnormalize = Hash.new { |h,x| x }.merge! Hash => Array,
NilClass => String,
Fixnum => Integer
OPTS.each { |k,v|
op.on("--#{k} arg", optnormalize[v.class]) { |x| OPTS[k] = x }
}
op.on("-c", "--conf=<yaml>", "config file") { |f| conf = f }
op.parse!
OPTS.merge! case conf
when "-"
YAML.load STDIN
when String
YAML.load_file conf
when nil
{}
else
raise "wtf"
end
%i[strace debug].each { |o|
ENV[o.to_s.upcase] == "1" and OPTS[o] = true
}
$elasticconf = YAML.load_file CONFFILE
arrayopts.each { |o|
case OPTS[o]
when String
OPTS[o] = OPTS[o].shellsplit
end
}
################################################
# create statedir and get entry of highest index
################################################
FileUtils.mkpath OPTS[:statedir]
sde = Dir.glob(File.join OPTS[:statedir], "*").grep(/\d/)
sde.empty? and sde << File.join(OPTS[:statedir], "0"*OPTS[:width])
sdh = sde.map { |d|
File.basename(d) =~ /\d+/
[$&.to_i, d]
}.to_h
statedir_hi = sdh[sdh.keys.max]
##################################################
# associate next paths to various params we manage
##################################################
$resource,genh = {},{}
(%w[cluster.name path.data path.logs].
map {|k| [k, $elasticconf[k]] } << [:statedir, statedir_hi]
).each { |k,v| genh[k],$resource[k] = nextpath v }
###################################################################
# check if the generation numbers agree and define actual generation
###################################################################
unless genh.values.uniq.size == 1
raise "divergence in parameter generations: #{genh}.inspect"
end
startmsg = "Run #{genh.values[0] + 1}."
puts startmsg
######################
# create the new paths
######################
$resource.each { |k,pa|
# if not absolute, it's not really a path
Pathname.new(pa).absolute? or next
if File.exist? pa
# XXX: transactional rollback would be neat
raise "path for parameter #{k.inspect}: #{pa} already exists"
end
FileUtils.mkpath pa
if String === k
# paths that occur in conf file
File.chown nil, Etc.getgrnam(ELASTICUSER).gid, pa
File.chmod 0775, pa
end
}
################
# set up logging
################
logfiles = []
logfiles << File.join($resource[:statedir], "test.log")
$logger = Logger.new logfiles.last
$logger.level = Logger::DEBUG
$logger.info startmsg
if OPTS[:debug]
logfiles << File.join($resource[:statedir], "debug.log")
$dbglogger = Logger.new logfiles.last
$dbglogger.level = Logger::DEBUG
else
class Mock
def method_missing *a, **kw, &b
end
end
$dbglogger = Mock.new
end
if OPTS[:strace]
$stracelog = File.join($resource[:statedir], "strace.log")
logfiles << $stracelog
end
logfiles.each { |f|
FileUtils.ln_sf File.join(*[$resource[:statedir], f].map { |d| File.basename d}),
File.dirname($resource[:statedir])
}
begin
#############
# update conf
#############
$elasticconf.merge! $resource.select { |k,pa| String === k }
unless File.exist? "#{CONFFILE}.orig"
FileUtils.cp CONFFILE, "#{CONFFILE}.orig"
end
open("#{CONFFILE}.tmp", "w") { |f| f << $elasticconf.to_yaml }
File.rename "#{CONFFILE}.tmp", CONFFILE
###############################################
# check elastic version and set up service argv
###############################################
versionstr = nil
3.times { |i|
$elasticversion and break
puts "Checking for Elasticsearch version...#{i.zero? ? "" : " (#{i+1}. attempt)"}"
IO.popen([OPTS[:elasticsearch], "--version"], &:read) =~ /Version:\s+(\S+)/
versionstr = $1
$elasticversion = case versionstr
when /\A2\.4/
'2.4'
when /\A5\.\d/
'5.x'
end
}
unless $elasticversion
raise "unknown elasticsearch version #{versionstr}"
end
case OPTS[:elasticservice]
when Hash
OPTS[:elasticservice] = OPTS[:elasticservice][$elasticversion]
end
"#{versionstr} belongs to #{$elasticversion} family.".instance_eval {
puts self
$logger.info "Elasticsearch version: #{self}"
}
if OPTS[:strace]
OPTS[:elasticservice] = [%w[sudo strace], OPTS[:straceopts], "-o", $stracelog,
"--", OPTS[:elasticservice]].flatten
end
#####################
# Opening trace files
#####################
$tracing = OPTS[:tracefiles].map { |f|
h = open f, "r+"
h.seek 0, File::SEEK_END
[f, h]
}.to_h
#####################
# start elasticsearch
#####################
"Starting Elasticsearch service.".instance_eval {
puts self
$logger.info self
}
$elasticpidfile = File.join($resource['path.logs'], "elastic.pid")
pip = IO.pipe
elasticargs = OPTS[:elasticservice].map { |e| OPTS[e] || e } +
["-p", $elasticpidfile]
$dbglogger.debug elasticargs
$elasticdaemon = Process.spawn *elasticargs,
out: pip[1], err: %i[child out], rlimit_nofile: 1<<16
pip[1].close
$elastic_out = pip[0]
$elasticlogfile = File.join($resource[:statedir], "elastic.log")
File.symlink File.join($resource['path.logs'], "#{$resource['cluster.name']}.log"), $elasticlogfile
# XXX copy-paste
FileUtils.ln_sf File.join(*[$resource[:statedir], $elasticlogfile].map { |d| File.basename d}),
File.dirname($resource[:statedir])
##################################
# connect to Elasticsearch service
##################################
$elasticconn = Net::HTTP.new Socket.gethostname, ELASTICPORT
puts "Trying to connect to Elasticsearch ..."
health = loop do
begin
break $elasticconn.get("/_cat/health?v").body
rescue Errno::ECONNREFUSED
sleep 5
end
end
puts "OK."
$logger.info "Elasticsearch health: #{health}"
#############################
# create index for :mediawiki
#############################
$logger.info "Creating index for #{OPTS[:mediawiki]}."
wikisettings = open(OPTS[:wikisettings]||"https://#{OPTS[:mediawiki]}/w/api.php?action=cirrus-settings-dump&format=json&formatversion=2") { |f| JSON.load f }
$logger.info $elasticconn.request(Net::HTTP::Put.new("/#{OPTS[:mediawiki]}?pretty"),
{'analysis' => ipath(wikisettings, %w[content page index analysis]),
'number_of_shards' => 1, 'number_of_replicas' => 0}.to_json
).body
case $elasticversion
when '2.4'
$logger.info $elasticconn.request(Net::HTTP::Put.new("/#{OPTS[:mediawiki]}/_mapping/page?pretty"),
JSON.load(wikisettings.to_json.
gsub('"index_analyzer"', '"analyzer"').
gsub('"position_offset_gap"', '"position_increment_gap"'))['content'].to_json
).body
end
######################
# open the data stream
######################
$datastream = Zlib::GzipReader.new(if URI.parse(OPTS[:datauri]).scheme
# with Net:HTTP it's a pita to
# 1) deal with https and redirects
# 2) get an IO object for the response body
# so instead of using it, we just call out to curl
IO.popen ["curl", "-Ls", OPTS[:datauri]]
else
open OPTS[:datauri]
end)
######################
# data indexing thread
######################
$chan_r,chan_w = IO.pipe
Thread.new {
begin
databuf,$databuf_idx = [],1
data_committed = nil
loop do
l = $datastream.gets
l and databuf << l
data_committed = false
if databuf.size == OPTS[:chunklines] or (not l and not databuf.empty?)
begin
rsp = $elasticconn.request_post("/#{OPTS[:mediawiki]}/_bulk", databuf.join)
rescue => exc
"POST failure: #{exc.class}: #{exc.message}".instance_eval {
puts self
$logger.warn self
}
break
end
rspdata = JSON.load rsp.body
$logger.info "Chunk #{$databuf_idx}: took #{rspdata['took']}"
databuf.clear
$databuf_idx += 1
data_committed = true
end
l or break
end
# if we are at EOF of data, but the last bits of
# data has just been committed, then we take a rest
# to allow Elasticsearch to react on it, and continue
# the loop to check that reaction. Otherwise (ie. in
# the next turn) we break.
data_committed and sleep(0.1)
rescue Exception => x
$logger.error "index thread got exception #{x.inspect}"
ensure
chan_w << "Q"
end
}
##########################################
# checking ElasticSearch output for errors
##########################################
$error = false
turn = 0
loop do
turn += 1
ra,_,_ = IO.select [$elastic_out, $chan_r]
if ra.include? $chan_r
break
end
if ra.include? $elastic_out
l = $elastic_out.gets
$dbglogger.debug { [turn,l].inspect }
case l
when ERRORPATTERN
"Error at chunk #{$databuf_idx}: #{l.size > 500 ? "#{l[0...500]} ...(#{l.size})" : l}".instance_eval {
puts self
$logger.error self
}
$error = true
break
when nil
break
end
end
end
#########
# cleanup
#########
$logger.info "Cleaning up ..."
tclean = Time.now
$datastream.closed? or $datastream.close
elasticpid = begin
Integer IO.read($elasticpidfile).strip
rescue => exc
"Elastic pid file not available: #{exc.class}: #{exc.message}".instance_eval {
puts self
$logger.warn self
}
$elasticdaemon
end
Process.kill :TERM, elasticpid
fork do
sleep 1
begin
Process.kill :KILL, elasticpid
rescue Errno::ESRCH
end
end
Process.wait $elasticdaemon
# if no error was captured on stdout, scrub the log file to check if there
# is some under the radar error there...
unless $error
errors = IO.readlines($elasticlogfile).grep(ERRORPATTERN)
unless errors.empty?
t = errors.map { |l|
begin
Time.parse l[0...32]
rescue ArgumentError
end
}.compact.min || Time.at(0)
if t <= tclean
{warn: "Elastic log contains UNNOTICED ERROR!",
error: "Error: #{errors.first}"}.each { |lv,msg|
puts msg
$logger.send lv, msg
}
$error = true
else
# We are not concerned if error is later then start of cleaning
# up, as then it's just the cleanup that caused an error.
# We just log the top of the error trace (as log file will be
# deleted, a last chance to show something).
errors[0..1].each { |e|
"Cleanup inflicted error: #{e}".instance_eval {
puts self
$logger.info self
}
}
end
end
end
# ditch Elasticsearch data if run was successful (= uninteresting)
if $error
["Retaining Elasticsearch data at #{$resource['path.data']}.",
"Disk usage: #{`df -h #{$resource['path.data'].shellescape}`}"].each { |msg|
puts msg
$logger.info msg
}
OPTS[:strace] and savefile path: $stracelog
$tracing.each { |f,h| savefile path: f, handle: h, targetdir: $resource[:statedir] }
else
FileUtils.rmtree $resource.values_at('path.data', 'path.logs')
end
OPTS[:strace] and File.delete $stracelog
$tracing.each_value { |h| h.truncate 0 }
$logger.info "Exiting."
puts
rescue => exc
$logger.fatal exc
raise
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment