Skip to content

Instantly share code, notes, and snippets.

@tagomoris
Created June 1, 2012 07:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tagomoris/2849939 to your computer and use it in GitHub Desktop.
Save tagomoris/2849939 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
require 'getoptlong'
optparser = GetoptLong.new(['--type', '-t', GetoptLong::REQUIRED_ARGUMENT],
['--base-path', '-b', GetoptLong::REQUIRED_ARGUMENT],
['--chunk-size', '-c', GetoptLong::REQUIRED_ARGUMENT],
['--host', '-h', GetoptLong::REQUIRED_ARGUMENT],
['--port', '-p', GetoptLong::REQUIRED_ARGUMENT],
['--user', '-u', GetoptLong::REQUIRED_ARGUMENT],
['--seconds', '-s', GetoptLong::REQUIRED_ARGUMENT],
['--label', '-l', GetoptLong::REQUIRED_ARGUMENT])
type,base_path,seconds,host,port,username,chunk_size = nil,'/tmp',3600,nil,nil,nil,(10*1024*1024)
label = type
optparser.each do |optname,arg|
case optname
when '--type' then type = arg
when '--base-path' then base_path = arg
when '--chunk-size' then chunk_size = arg.to_i
when '--host' then host = arg
when '--port' then port = arg.to_i
when '--user' then username = arg
when '--seconds' then seconds = arg.to_i
when '--label' then label = arg
end
end
unless type and ['httpfs', 'webhdfs', 'cachedwebhdfs'].include?(type)
raise RuntimeError, "invalid type: '#{type}'"
end
unless host
raise RuntimeError, "host must be specified"
end
unless port
port = if type == 'httpfs' then 14000 else 50070 end
end
unless username
raise RuntimeError, "user must be specified"
end
chunk = "a" * chunk_size
chunk.force_encoding("ASCII-8BIT")
def try_httpfs(path, host, port, username, chunk, lasting)
require 'net/http'
client = Net::HTTP.new(host, port)
end_at = Time.now + lasting
header = {'Content-Type' => 'application/octet-stream'}
response = client.request_put('/webhdfs/v1' + path + '?op=CREATE&data=true&user.name=' + username, chunk, header)
if response.code.to_i >= 400
raise RuntimeError, "failed to put new file with code #{response.code}: #{host}:#{port} hdfs://#{path}\n#{response.message}"
end
counter,failed = 1,0
while Time.now < end_at
response = client.request_post('/webhdfs/v1' + path + '?op=append&data=true&user.name=' + username, chunk, header)
counter += 1
failed += 1 if response.code.to_i >= 400
end
return {:type => 'httpfs', :count => counter, :failed => failed}
end
def try_webhdfs(path, host, port, username, chunk, lasting)
require 'webhdfs'
end_at = Time.now + lasting
client = WebHDFS::Client.new(host, port, username)
unless client.create(path, chunk)
raise RuntimeError, "failed to create new file, #{host}:#{port}, hdfs://#{path}"
end
counter,failed = 1,0
while Time.now < end_at
unless client.append(path, chunk)
failed += 1
end
counter += 1
end
return {:type => 'webhdfs', :count => counter, :failed => failed}
end
def try_cachedwebhdfs(path, host, port, username, chunk, lasting)
require 'net/http'
require 'uri'
require 'webhdfs'
actual_path = '/webhdfs/v1' + path
end_at = Time.now + lasting
client = WebHDFS::Client.new(host, port, username)
unless client.create(path, chunk)
raise RuntimeError, "failed to create new file, #{host}:#{port}, hdfs://#{path}"
end
counter,failed = 1,0
fail_detail = {:namenode => 0, :datanode => 0}
cached = nil
client = nil
while Time.now < end_at
if cached.nil?
response = Net::HTTP.start(host, port) do |http|
http.request_post(actual_path + '?op=APPEND&user.name=' + username, '')
end
unless response.is_a?(Net::HTTPRedirection) and response['location']
counter += 1
failed += 1
fail_detail[:namenode] += 1
next
end
uri = URI.parse(response['location'])
cached = {:host => uri.host, :port => uri.port, :path => (if uri.query then uri.path + '?' + uri.query else uri.path end)}
client = Net::HTTP.new(cached[:host], cached[:port])
end
response = client.request_post(cached[:path], chunk)
counter += 1
if response.code.to_i >= 300
failed += 1
fail_detail[:datanode] += 1
cached = nil
client.finish
client = nil
end
end
return {:type => 'cachedwebhdfs', :count => counter, :failed => failed, :detail => fail_detail}
end
path = base_path + '/' + type + '.txt'
result = case type
when 'httpfs'
try_httpfs(path, host, port, username, chunk, seconds)
when 'webhdfs'
try_webhdfs(path, host, port, username, chunk, seconds)
when 'cachedwebhdfs'
try_cachedwebhdfs(path, host, port, username, chunk, seconds)
end
warn "output type:#{result[:type]}, written:#{result[:count]}, failed:#{result[:failed]}"
if result[:detail] # for cachedwebhdfs
warn "\t namenode fail:#{result[:detail][:namenode]}, datanode fail:#{result[:detail][:datanode]}"
end
rate = (chunk.length * 8 * (result[:count] - result[:failed]) / (1000 * seconds)).floor / 1000
warn "rate: #{rate} Mbps"
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment