Skip to content

Instantly share code, notes, and snippets.

@authorNari
Created May 19, 2012 05:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save authorNari/2729482 to your computer and use it in GitHub Desktop.
Save authorNari/2729482 to your computer and use it in GitHub Desktop.
require 'msgpack/rpc'
class Add
def add(str)
return eval(str)
end
def con_add(str)
as = MessagePack::RPC::AsyncResult.new
Thread.start do
as.result(eval(str))
end
return as
end
end
if $0 == __FILE__ then
$sv_loop = Cool.io::Loop.default
server = MessagePack::RPC::Server.new($sv_loop)
server.listen('0.0.0.0', ARGV.first, Add.new)
server.run
end
# $ ruby client.rb method num args..
require 'msgpack/rpc'
concurrency = ARGV.shift.to_i
method = ARGV.shift
th = []
concurrency.times do
th << Thread.start do
MessagePack::RPC::Client.open('0.0.0.0', 10000) do |c|
p c.call(method, *ARGV)
end
end
end
th.map(&:join)
# $ ruby client.rb method num args..
require 'msgpack/rpc'
concurrency = ARGV.shift.to_i
method = ARGV.shift
th = []
concurrency.times do
th << Thread.start do
MessagePack::RPC::Client.open('0.0.0.0', 10000) do |c|
uuid = c.call(method, *ARGV)
loop do
r = c.call(method+"_result", uuid)
p r
break if r.nil?
sleep 0.1
end
end
end
end
th.map(&:join)
require 'msgpack/rpc'
class MapReduce
ADD1 = {host: '0.0.0.0', port: 10001}
ADD2 = {host: '0.0.0.0', port: 10002}
ADD3 = {host: '0.0.0.0', port: 10003}
def mr_leak(add1, add2, add3)
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])]
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port])
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])
r = clis[0].call(:add, add1)
r += clis[1].call(:add, add2)
r += clis[2].call(:add, add3)
clis.map(&:close)
return r
end
def mr_close(add1, add2, add3)
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])]
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port])
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])
r = clis[0].call(:add, add1)
r += clis[1].call(:add, add2)
r += clis[2].call(:add, add3)
clis.map(&:close)
return r
end
def mr_close_with_open(add1, add2, add3)
r = 0
MessagePack::RPC::Client.open(ADD1[:host], ADD1[:port]) do |cli|
r += cli.call(:add, add1)
end
MessagePack::RPC::Client.open(ADD2[:host], ADD2[:port]) do |cli|
r += cli.call(:add, add2)
end
MessagePack::RPC::Client.open(ADD3[:host], ADD3[:port]) do |cli|
r += cli.call(:add, add3)
end
return r
end
def mr_async(add1, add2, add3)
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])]
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port])
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])
r = 0
fs = [clis[0].callback(:add, add1){|res| r += res.get}]
fs << clis[1].callback(:add, add2){|res| r += res.get}
fs << clis[2].callback(:add, add3){|res| r += res.get}
fs.map(&:join)
clis.map(&:close)
return r
end
require "securerandom"
def mr_async_on_sv_loop(add1, add2, add3)
uuid = SecureRandom.uuid
cli1 = MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port], $sv_loop)
cli2 = MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port], $sv_loop)
cli3 = MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port], $sv_loop)
@res ||= {}
@res[uuid] = {res: 0, finished: 0}
as = MessagePack::RPC::AsyncResult.new
cb = ->(c){
->(res){
begin
@res[uuid][:res] += res.get
@res[uuid][:finished] += 1
if @res[uuid][:finished] == 3
as.result(@res[uuid][:res])
@res.delete(uuid)
end
ensure
c.close
end
}
}
cli1.call_async(:con_add, add1).attach_callback(cb.(cli1))
cli2.call_async(:con_add, add2).attach_callback(cb.(cli2))
cli3.call_async(:con_add, add3).attach_callback(cb.(cli3))
return as
end
def mr_async_on_thread(add1, add2, add3)
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])]
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port])
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])
as = MessagePack::RPC::AsyncResult.new
Thread.start(0, clis, add1, add2, add3) do |r, clis, add1, add2, add3|
begin
fs = [clis[0].callback(:con_add, add1){|res| r += res.get}]
fs << clis[1].callback(:con_add, add2){|res| r += res.get}
fs << clis[2].callback(:con_add, add3){|res| r += res.get}
fs.map(&:join)
as.result(r)
ensure
clis.map(&:close)
end
end
return as
end
def mr_on_thread_with_shared_client(add1, add2, add3)
@clis ||= [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port]),
MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port]),
MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])]
as = MessagePack::RPC::AsyncResult.new
Thread.start(0, @clis, add1, add2, add3) do |r, clis, add1, add2, add3|
begin
r += clis[0].call(:con_add, add1)
r += clis[1].call(:con_add, add2)
r += clis[2].call(:con_add, add3)
as.result(r)
end
end
return as
end
def mr_async_on_sv_loop_with_shared_client(add1, add2, add3)
uuid = SecureRandom.uuid
@cli1 ||= MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port], $sv_loop)
@cli2 ||= MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port], $sv_loop)
@cli3 ||= MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port], $sv_loop)
@res ||= {}
@res[uuid] = {res: 0, finished: 0}
as = MessagePack::RPC::AsyncResult.new
cb = ->(res){
begin
@res[uuid][:res] += res.get
@res[uuid][:finished] += 1
if @res[uuid][:finished] == 3
as.result(@res[uuid][:res])
@res.delete(uuid)
end
end
}
@cli1.call_async(:con_add, add1).attach_callback(cb)
@cli2.call_async(:con_add, add2).attach_callback(cb)
@cli3.call_async(:con_add, add3).attach_callback(cb)
return as
end
def mr_for_polling(add1, add2, add3)
uuid = SecureRandom.uuid
@res ||= {}
@res[uuid] = {res: 0, finished: 0}
clis = [MessagePack::RPC::Client.new(ADD1[:host], ADD1[:port])]
clis << MessagePack::RPC::Client.new(ADD2[:host], ADD2[:port])
clis << MessagePack::RPC::Client.new(ADD3[:host], ADD3[:port])
Thread.start(0, clis, add1, add2, add3) do |r, clis, add1, add2, add3|
begin
@res[uuid][:res] += clis[0].call(:con_add, add1)
@res[uuid][:finished] += 1
@res[uuid][:res] += clis[1].call(:con_add, add2)
@res[uuid][:finished] += 1
@res[uuid][:res] += clis[2].call(:con_add, add3)
@res[uuid][:finished] += 1
as.result(r)
ensure
clis.map(&:close)
end
end
return uuid
end
def mr_for_polling_result(uuid)
res = @res[uuid]
if @res[uuid] && @res[uuid][:finished] == 3
@res.delete(uuid)
end
return res
end
def mr_fail_on_sv_loop(add1, add2, add3)
clis = [ADD1, ADD2, ADD3].each_with_object([]) do |add, clis|
clis << MessagePack::RPC::Client.new(add[:host], add[:port])
end
[add1, add2, add3].zip(clis).inject(0) do |r, (a, c)|
r += c.call(:add, a)
end
end
end
if $0 == __FILE__ then
$sv_loop = Cool.io::Loop.default
server = MessagePack::RPC::Server.new($sv_loop)
server.listen('0.0.0.0', ARGV.first, MapReduce.new)
server.run
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment