Skip to content

Instantly share code, notes, and snippets.

@fumin
Created December 12, 2012 02:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save fumin/4264293 to your computer and use it in GitHub Desktop.
Save fumin/4264293 to your computer and use it in GitHub Desktop.
futures implementation
require 'fiber'
require 'eventmachine'
require 'rest-core'
class BlockingClient
def initialize *args
@client = RC::Universal.new(*args)
end
def get path, params={}, opts={}
@client.get(path, params, opts).tap{}
end
end
class CallbackClient
def initialize *args
@client = RC::Universal.new(*args)
end
def get path, params={}, opts={}
@client.get(path, params, opts){ |res|
yield res
}
end
end
class CallbackFuture
class Result
attr_accessor :res
def initialize res; @res = res; end
end
def initialize
@fiber = Fiber.new{
# The block of `yield` here should
# always resume this current fiber
# Ex: CallbackFuture.new{ |future_fiber|
# future_fiber.resume :abcd
# }
yield Fiber.current
# Block here and wait until reentrance...
res = Fiber.yield
# There are two possible scenarios and outcomes:
# 1. `tap_me` is called first, and we'll wait for
# ourselves being resumed. We then pass the result
# to `tap_me`
# 2. The block passed in is called first, and thus
# `res` is already the result. In this case, we'll
# wait for `tap_me` to be called later.
if res.is_a?(Fiber)
res.transfer Result.new(Fiber.yield)
else
Fiber.yield.transfer Result.new(res)
end
}
@fiber.resume
end
def tap_me
return @res if @res
# As discussed in `initialize`, depending on `res`:
# 1. If `tap_me` is called later and we get `res` immediately, return it.
# 2. If `tap_me` is called first, we try again to get the result of the
# callback of `initialize`
res = @fiber.resume Fiber.current
@res = if res.is_a?(Result)
res
else
Fiber.yield
end.res
end
end
class Future
def initialize
@thread = Thread.new{yield}
end
def tap_me
@thread.value
end
end
class FutureClient
def initialize *args
@client = if fiber_possible?
CallbackClient.new *args
else
BlockingClient.new *args
end
end
def get path, params={}, opts={}
if fiber_possible?
CallbackFuture.new{ |future_fiber|
# According to the design of CallbackFuture,
# the final action of this block should always be
# `future_fiber.transfer`.
# Also, since the callback {|resp| ...} is in the root fiber,
# we need to wrap everything in a new Fiber
# in order to `future_fiber.transfer`
@client.get(path, params, opts){ |resp|
Fiber.new{
final_resp = if block_given?
yield resp
else
resp
end
future_fiber.transfer final_resp
}.resume
}
}
else
job = lambda{ @client.get(path, params, opts) }
if block_given?
Future.new{ yield(job.call) }
else
Future.new(&job)
end
end
end
def fiber_possible?
# We should also check that we aren't in the RootFiber here.
# However, since this is impossible now, comment it
EM.reactor_running? # and RootFiber != Fiber.current
end
end
def q str, m=nil
p = lambda{puts "\e[33m=> #{str.inspect}\e[0m"}
if m
m.synchronize(&p)
else
p.call
end
end
def github_use_case
c = FutureClient.new(json_response: true,
site: 'https://api.github.com',
timeout: 120,
log_method: lambda{|s| puts s})
futures = []
%w[rubytaiwan godfat fumin].each{ |user|
futures << c.get("/users/#{user}/repos", per_page: 100){ |repos|
rs = repos.reject{|r| r['fork']}
most_watched = rs.max_by{|r| r['watchers']}['name']
most_size = rs.max_by{|r| r['size']}['name']
watch_contri = c.get("/repos/#{user}/#{most_watched}/contributors")
size_contri = c.get("/repos/#{user}/#{most_size}/contributors")
most_watched_most_contri = watch_contri.tap_me.max_by{|c| c['contributions']}
most_size_most_contri = size_contri.tap_me.max_by{|c| c['contributions']}
q "Most contributed user for most watched: #{user}/#{most_watched}:"
q most_watched_most_contri['login']
q "Most contributed user for most size: #{user}/#{most_size}:"
q most_size_most_contri['login']
repos
}
}
futures.map{|f| f.tap_me}
end
def google_use_case
c = FutureClient.new(site: 'https://www.google.com',
log_method: lambda{|s| puts s})
futures = []
%w[rubytaiwan godfat].each{ |user|
futures << c.get("/search", q: user){ |repos|
watch_contri = c.get("/search", q: "#{user}r")
size_contri = c.get("/search", q: "#{user}t")
most_watched_most_contri = watch_contri.tap_me.to_s[0..10]
most_size_most_contri = size_contri.tap_me.to_s[0..10]
q "Most contributed user for most watched: #{user}:"
q most_watched_most_contri
q "Most contributed user for most size: #{user}:"
q most_size_most_contri
repos
}
}
futures.map{|f| f.tap_me; f.tap_me; f.tap_me}
end
class EM_USE; end
%w[github google].each do |g|
EM_USE.module_eval <<-RUBY
def em_#{g}_use_case
EM.run do
Fiber.new{
resp = #{g}_use_case
puts resp.map(&:size)
EM.stop
}.resume
end
end
RUBY
end
EM_USE.new.em_github_use_case
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment