Skip to content

Instantly share code, notes, and snippets.

@miniway
Created May 13, 2020 03:49
Show Gist options
  • Save miniway/9b05996cea406c05c8d23f3f24d84f85 to your computer and use it in GitHub Desktop.
Save miniway/9b05996cea406c05c8d23f3f24d84f85 to your computer and use it in GitHub Desktop.
require 'uri'
require 'json'
require 'faraday'
require 'presto-client'
require 'date'
def get(url, path)
conn = Faraday.new(url: url) {|faraday|
faraday.adapter Faraday.default_adapter
}
response = conn.get{|req|
req.url path
req.headers['Accept'] = 'application/json'
}
if response.status != 200
raise response.body
end
JSON.parse(response.body, max_nesting: 150)
end
def collect_stages(stage, stages)
return stages unless stage
stages << stage
stage.sub_stages.each {|sub| collect_stages(sub, stages)}
stages
end
DURATION_PATTERN = /^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$/
def to_millis(duration)
m = duration.match(DURATION_PATTERN)
value = m[1].to_f
unit = m[2]
case unit
when 'ns'; value / 1000 / 1000
when 'us'; value / 1000
when 'ms'; value
when 's'; value * 1000
when 'm'; value * 1000 * 60
when 'h'; value * 1000 * 60 * 60
when 'd'; value * 1000 * 60 * 60 * 24
else
raise "unknown unit " + unit
end
end
def to_timestamp(dt)
DateTime.parse(dt).to_time.to_f
end
def job_id(query)
query.session.system_properties['job_id'] || 'unknown'
end
def check_query(query)
collect_stages(query.output_stage, Array.new).each do |stage|
tasks = stage.tasks.group_by {|t| t.task_status.state}
running = tasks[:running] || []
finished = tasks[:finished] || []
next if finished.empty? || running.length > (running.length + finished.length) * 0.2 # mostly running
avg_running_time = finished.map {|t| to_millis(t.stats.elapsed_time) - to_millis(t.stats.queued_time)}.inject(0, :+) / finished.length()
next if avg_running_time < 120 * 1000 # ignore short living queries
running.each do |t|
stats = t.stats
running_time = (Time.now.to_f - to_timestamp(stats.first_start_time)) * 1000
if running_time > avg_running_time * 5
puts "SLOW JOB #{job_id(query)} at #{t.task_status.task_id} (#{t.task_status.node_id}) #{(running_time / 1000).to_i}s >> #{(avg_running_time / 1000).to_i}s"
end
end
end
end
coordinator_uri = ARGV[0]
get(coordinator_uri , "/v1/query").each do |q|
query_info = Presto::Client::Models::BasicQueryInfo.decode(q)
next unless query_info.state == :running
query = Presto::Client::Models::QueryInfo.decode(get(coordinator_uri, "/v1/query/#{query_info.query_id}"))
check_query(query)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment