Created
May 19, 2012 05:50
-
-
Save authorNari/2729482 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'msgpack/rpc' | |
class Add | |
def add(str) | |
return eval(str) | |
end | |
def con_add(str) | |
as = MessagePack::RPC::AsyncResult.new | |
Thread.start do | |
as.result(eval(str)) | |
end | |
return as | |
end | |
end | |
if $0 == __FILE__ then | |
$sv_loop = Cool.io::Loop.default | |
server = MessagePack::RPC::Server.new($sv_loop) | |
server.listen('0.0.0.0', ARGV.first, Add.new) | |
server.run | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# $ ruby client.rb method num args.. | |
require 'msgpack/rpc' | |
concurrency = ARGV.shift.to_i | |
method = ARGV.shift | |
th = [] | |
concurrency.times do | |
th << Thread.start do | |
MessagePack::RPC::Client.open('0.0.0.0', 10000) do |c| | |
p c.call(method, *ARGV) | |
end | |
end | |
end | |
th.map(&:join) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# $ ruby client.rb method num args.. | |
require 'msgpack/rpc' | |
concurrency = ARGV.shift.to_i | |
method = ARGV.shift | |
th = [] | |
concurrency.times do | |
th << Thread.start do | |
MessagePack::RPC::Client.open('0.0.0.0', 10000) do |c| | |
uuid = c.call(method, *ARGV) | |
loop do | |
r = c.call(method+"_result", uuid) | |
p r | |
break if r.nil? | |
sleep 0.1 | |
end | |
end | |
end | |
end | |
th.map(&:join) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'msgpack/rpc' | |
class MapReduce | |
ADD1 = {host: '0.0.0.0', port: 10001} | |
ADD2 = {host: '0.0.0.0', port: 10002} | |
ADD3 = {host: '0.0.0.0', port: 10003} | |
def mr_leak(add1, add2, add3) | |
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])] | |
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port]) | |
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port]) | |
r = clis[0].call(:add, add1) | |
r += clis[1].call(:add, add2) | |
r += clis[2].call(:add, add3) | |
clis.map(&:close) | |
return r | |
end | |
def mr_close(add1, add2, add3) | |
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])] | |
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port]) | |
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port]) | |
r = clis[0].call(:add, add1) | |
r += clis[1].call(:add, add2) | |
r += clis[2].call(:add, add3) | |
clis.map(&:close) | |
return r | |
end | |
def mr_close_with_open(add1, add2, add3) | |
r = 0 | |
MessagePack::RPC::Client.open(ADD1[:host], ADD1[:port]) do |cli| | |
r += cli.call(:add, add1) | |
end | |
MessagePack::RPC::Client.open(ADD2[:host], ADD2[:port]) do |cli| | |
r += cli.call(:add, add2) | |
end | |
MessagePack::RPC::Client.open(ADD3[:host], ADD3[:port]) do |cli| | |
r += cli.call(:add, add3) | |
end | |
return r | |
end | |
def mr_async(add1, add2, add3) | |
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])] | |
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port]) | |
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port]) | |
r = 0 | |
fs = [clis[0].callback(:add, add1){|res| r += res.get}] | |
fs << clis[1].callback(:add, add2){|res| r += res.get} | |
fs << clis[2].callback(:add, add3){|res| r += res.get} | |
fs.map(&:join) | |
clis.map(&:close) | |
return r | |
end | |
require "securerandom" | |
def mr_async_on_sv_loop(add1, add2, add3) | |
uuid = SecureRandom.uuid | |
cli1 = MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port], $sv_loop) | |
cli2 = MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port], $sv_loop) | |
cli3 = MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port], $sv_loop) | |
@res ||= {} | |
@res[uuid] = {res: 0, finished: 0} | |
as = MessagePack::RPC::AsyncResult.new | |
cb = ->(c){ | |
->(res){ | |
begin | |
@res[uuid][:res] += res.get | |
@res[uuid][:finished] += 1 | |
if @res[uuid][:finished] == 3 | |
as.result(@res[uuid][:res]) | |
@res.delete(uuid) | |
end | |
ensure | |
c.close | |
end | |
} | |
} | |
cli1.call_async(:con_add, add1).attach_callback(cb.(cli1)) | |
cli2.call_async(:con_add, add2).attach_callback(cb.(cli2)) | |
cli3.call_async(:con_add, add3).attach_callback(cb.(cli3)) | |
return as | |
end | |
def mr_async_on_thread(add1, add2, add3) | |
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])] | |
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port]) | |
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port]) | |
as = MessagePack::RPC::AsyncResult.new | |
Thread.start(0, clis, add1, add2, add3) do |r, clis, add1, add2, add3| | |
begin | |
fs = [clis[0].callback(:con_add, add1){|res| r += res.get}] | |
fs << clis[1].callback(:con_add, add2){|res| r += res.get} | |
fs << clis[2].callback(:con_add, add3){|res| r += res.get} | |
fs.map(&:join) | |
as.result(r) | |
ensure | |
clis.map(&:close) | |
end | |
end | |
return as | |
end | |
def mr_on_thread_with_shared_client(add1, add2, add3) | |
@clis ||= [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port]), | |
MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port]), | |
MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])] | |
as = MessagePack::RPC::AsyncResult.new | |
Thread.start(0, @clis, add1, add2, add3) do |r, clis, add1, add2, add3| | |
begin | |
r += clis[0].call(:con_add, add1) | |
r += clis[1].call(:con_add, add2) | |
r += clis[2].call(:con_add, add3) | |
as.result(r) | |
end | |
end | |
return as | |
end | |
def mr_async_on_sv_loop_with_shared_client(add1, add2, add3) | |
uuid = SecureRandom.uuid | |
@cli1 ||= MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port], $sv_loop) | |
@cli2 ||= MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port], $sv_loop) | |
@cli3 ||= MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port], $sv_loop) | |
@res ||= {} | |
@res[uuid] = {res: 0, finished: 0} | |
as = MessagePack::RPC::AsyncResult.new | |
cb = ->(res){ | |
begin | |
@res[uuid][:res] += res.get | |
@res[uuid][:finished] += 1 | |
if @res[uuid][:finished] == 3 | |
as.result(@res[uuid][:res]) | |
@res.delete(uuid) | |
end | |
end | |
} | |
@cli1.call_async(:con_add, add1).attach_callback(cb) | |
@cli2.call_async(:con_add, add2).attach_callback(cb) | |
@cli3.call_async(:con_add, add3).attach_callback(cb) | |
return as | |
end | |
def mr_for_polling(add1, add2, add3) | |
uuid = SecureRandom.uuid | |
@res ||= {} | |
@res[uuid] = {res: 0, finished: 0} | |
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])] | |
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port]) | |
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port]) | |
Thread.start(0, clis, add1, add2, add3) do |r, clis, add1, add2, add3| | |
begin | |
@res[uuid][:res] += clis[0].call(:con_add, add1) | |
@res[uuid][:finished] += 1 | |
@res[uuid][:res] += clis[1].call(:con_add, add2) | |
@res[uuid][:finished] += 1 | |
@res[uuid][:res] += clis[2].call(:con_add, add3) | |
@res[uuid][:finished] += 1 | |
as.result(r) | |
ensure | |
clis.map(&:close) | |
end | |
end | |
return uuid | |
end | |
def mr_for_polling_result(uuid) | |
res = @res[uuid] | |
if @res[uuid] && @res[uuid][:finished] == 3 | |
@res.delete(uuid) | |
end | |
return res | |
end | |
def mr_fail_on_sv_loop(add1, add2, add3) | |
clis = [ADD1, ADD2, ADD3].each_with_object([]) do |add, clis| | |
clis << MessagePack::RPC::Client.new(add[:host], add[:port]) | |
end | |
[add1, add2, add3].zip(clis).inject(0) do |r, (a, c)| | |
r += c.call(:add, a) | |
end | |
end | |
end | |
if $0 == __FILE__ then | |
$sv_loop = Cool.io::Loop.default | |
server = MessagePack::RPC::Server.new($sv_loop) | |
server.listen('0.0.0.0', ARGV.first, MapReduce.new) | |
server.run | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment