Created
June 1, 2012 07:33
-
-
Save tagomoris/2849939 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'getoptlong' | |
optparser = GetoptLong.new(['--type', '-t', GetoptLong::REQUIRED_ARGUMENT], | |
['--base-path', '-b', GetoptLong::REQUIRED_ARGUMENT], | |
['--chunk-size', '-c', GetoptLong::REQUIRED_ARGUMENT], | |
['--host', '-h', GetoptLong::REQUIRED_ARGUMENT], | |
['--port', '-p', GetoptLong::REQUIRED_ARGUMENT], | |
['--user', '-u', GetoptLong::REQUIRED_ARGUMENT], | |
['--seconds', '-s', GetoptLong::REQUIRED_ARGUMENT], | |
['--label', '-l', GetoptLong::REQUIRED_ARGUMENT]) | |
type,base_path,seconds,host,port,username,chunk_size = nil,'/tmp',3600,nil,nil,nil,(10*1024*1024) | |
label = type | |
optparser.each do |optname,arg| | |
case optname | |
when '--type' then type = arg | |
when '--base-path' then base_path = arg | |
when '--chunk-size' then chunk_size = arg.to_i | |
when '--host' then host = arg | |
when '--port' then port = arg.to_i | |
when '--user' then username = arg | |
when '--seconds' then seconds = arg.to_i | |
when '--label' then label = arg | |
end | |
end | |
unless type and ['httpfs', 'webhdfs', 'cachedwebhdfs'].include?(type) | |
raise RuntimeError, "invalid type: '#{type}'" | |
end | |
unless host | |
raise RuntimeError, "host must be specified" | |
end | |
unless port | |
port = if type == 'httpfs' then 14000 else 50070 end | |
end | |
unless username | |
raise RuntimeError, "user must be specified" | |
end | |
chunk = "a" * chunk_size | |
chunk.force_encoding("ASCII-8BIT") | |
def try_httpfs(path, host, port, username, chunk, lasting) | |
require 'net/http' | |
client = Net::HTTP.new(host, port) | |
end_at = Time.now + lasting | |
header = {'Content-Type' => 'application/octet-stream'} | |
response = client.request_put('/webhdfs/v1' + path + '?op=CREATE&data=true&user.name=' + username, chunk, header) | |
if response.code.to_i >= 400 | |
raise RuntimeError, "failed to put new file with code #{response.code}: #{host}:#{port} hdfs://#{path}\n#{response.message}" | |
end | |
counter,failed = 1,0 | |
while Time.now < end_at | |
response = client.request_post('/webhdfs/v1' + path + '?op=append&data=true&user.name=' + username, chunk, header) | |
counter += 1 | |
failed += 1 if response.code.to_i >= 400 | |
end | |
return {:type => 'httpfs', :count => counter, :failed => failed} | |
end | |
def try_webhdfs(path, host, port, username, chunk, lasting) | |
require 'webhdfs' | |
end_at = Time.now + lasting | |
client = WebHDFS::Client.new(host, port, username) | |
unless client.create(path, chunk) | |
raise RuntimeError, "failed to create new file, #{host}:#{port}, hdfs://#{path}" | |
end | |
counter,failed = 1,0 | |
while Time.now < end_at | |
unless client.append(path, chunk) | |
failed += 1 | |
end | |
counter += 1 | |
end | |
return {:type => 'webhdfs', :count => counter, :failed => failed} | |
end | |
def try_cachedwebhdfs(path, host, port, username, chunk, lasting) | |
require 'net/http' | |
require 'uri' | |
require 'webhdfs' | |
actual_path = '/webhdfs/v1' + path | |
end_at = Time.now + lasting | |
client = WebHDFS::Client.new(host, port, username) | |
unless client.create(path, chunk) | |
raise RuntimeError, "failed to create new file, #{host}:#{port}, hdfs://#{path}" | |
end | |
counter,failed = 1,0 | |
fail_detail = {:namenode => 0, :datanode => 0} | |
cached = nil | |
client = nil | |
while Time.now < end_at | |
if cached.nil? | |
response = Net::HTTP.start(host, port) do |http| | |
http.request_post(actual_path + '?op=APPEND&user.name=' + username, '') | |
end | |
unless response.is_a?(Net::HTTPRedirection) and response['location'] | |
counter += 1 | |
failed += 1 | |
fail_detail[:namenode] += 1 | |
next | |
end | |
uri = URI.parse(response['location']) | |
cached = {:host => uri.host, :port => uri.port, :path => (if uri.query then uri.path + '?' + uri.query else uri.path end)} | |
client = Net::HTTP.new(cached[:host], cached[:port]) | |
end | |
response = client.request_post(cached[:path], chunk) | |
counter += 1 | |
if response.code.to_i >= 300 | |
failed += 1 | |
fail_detail[:datanode] += 1 | |
cached = nil | |
client.finish | |
client = nil | |
end | |
end | |
return {:type => 'cachedwebhdfs', :count => counter, :failed => failed, :detail => fail_detail} | |
end | |
path = base_path + '/' + type + '.txt' | |
result = case type | |
when 'httpfs' | |
try_httpfs(path, host, port, username, chunk, seconds) | |
when 'webhdfs' | |
try_webhdfs(path, host, port, username, chunk, seconds) | |
when 'cachedwebhdfs' | |
try_cachedwebhdfs(path, host, port, username, chunk, seconds) | |
end | |
warn "output type:#{result[:type]}, written:#{result[:count]}, failed:#{result[:failed]}" | |
if result[:detail] # for cachedwebhdfs | |
warn "\t namenode fail:#{result[:detail][:namenode]}, datanode fail:#{result[:detail][:datanode]}" | |
end | |
rate = (chunk.length * 8 * (result[:count] - result[:failed]) / (1000 * seconds)).floor / 1000 | |
warn "rate: #{rate} Mbps" | |
exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment