Skip to content

Instantly share code, notes, and snippets.

@joshk
Created March 6, 2011 18:05
Show Gist options
  • Save joshk/857465 to your computer and use it in GitHub Desktop.
Save joshk/857465 to your computer and use it in GitHub Desktop.
em deferrable faraday adapter
module Faraday
class Adapter
class EMHttpRequest < Faraday::Adapter
self.supports_parallel_requests = true
def self.setup_parallel_manager(options = {})
EMParallelManager.new
end
class EMParallelManager
def run; true; end
end
begin
require 'em'
require 'em-http'
rescue LoadError, NameError => e
self.load_error = e
end
class Header
include Net::HTTPHeader
def initialize response
@header = {}
response.response_header.each do |key, value|
case key
when "CONTENT_TYPE"; self.content_type = value
when "CONTENT_LENGTH"; self.content_length = value
else; self[key] = value
end
end
end
end
def call(env)
super
http_async = EventMachine::HttpRequest.new(URI::parse(env[:url].to_s))
options = setup_request_options(env)
http = http_async.setup_request(env[:method].to_s.downcase.to_sym, options)
http.callback do
env.update({
:body => http.response,
:status => http.response_header.status.to_i,
:response_headers => Header.new(http)
})
env[:response].finish(env)
end
http.errback do
raise Error::ConnectionFailed.new(Errno::ECONNREFUSED)
end
@app.call(env)
end
def setup_request_options(env)
options = { :head => env[:request_headers] }
options[:ssl] = env[:ssl] if env[:ssl]
if env[:body]
if env[:body].respond_to? :read
options[:body] = env[:body].read
else
options[:body] = env[:body]
end
end
if req = env[:request]
if proxy = req[:proxy]
uri = Addressable::URI.parse(proxy[:uri])
options[:proxy] = {
:host => uri.host,
:port => uri.port
}
if proxy[:username] && proxy[:password]
options[:proxy][:authorization] = [proxy[:username], proxy[:password]]
end
end
# only one timeout currently supported by em http request
if req[:timeout] or req[:open_timeout]
options[:timeout] = [req[:timeout] || 0, req[:open_timeout] || 0].max
end
end
options
end
end
end
end
require 'eventmachine'
require 'em-http'
require 'faraday'
require 'faraday/adapter/em_http_request'
parallel_manager = Faraday::Adapter::EMHttpRequest.setup_parallel_manager
c = Faraday.new(:url => 'http://google.com', :parallel => parallel_manager) do |builder|
builder.adapter :EMHttpRequest
end
EM.run do
c.get('/')
puts 'wait for it all to run'
EM.add_timer(3) { EM.stop }
end
@igrigorik
Copy link

Which version of em-http are you running this with? I'd recommend you use the pre-release (beta) gem, since that'll soon become the 1.0. Few quick observations:

  • :status => http.response_header.http_status.to_i # line 58, I believe should read .status.to_i instead
  • resp = c.get('/') # line 16, will need a :path => '/' with the new beta gem.

More generally, I'm not sure why you're creating the extra deferrables? (I'm probably missing something obvious). Have you looked at the multi interface that ships with em-http? Multi would be equivalent of Hydra under typhoeus.

Finally, if you're hitting the errback, can you print the http object? It's probably failing on DNS resolution, or invalid parameters.

@joshk
Copy link
Author

joshk commented Mar 7, 2011

Hey Ilya,

Thanks for the comments.

I saw the beta version of em-http before before I started and have been using it during my trail-and-many-errors. Regarding c.get, this is Faraday so no :path option needs to be passed in, unless I am misunderstanding you.

I updated http_status to status, thanks for noticing that).

I am pretty new to the EM way of thinking and took most of the Deferrable code from the Pusher gem. Although I now realise that the extra Deferrables weren't adding anything and have since removed the code and tidied it up a bit.

The idea of this adapter was to provide a fire and forget strategy, similar to Hydra, but true async. Hydra requires you to queue up the requests then hit run on the parallel manager, blocking until it finishes. EMDeferrable (although I think it might need a new name) allows you to make the request via Faraday and move on.

Although, that said, I am still no closer to finding out why the request reaches errback, even with a print of the http object. Very very confusing. I'm sure its a stupid mistake, like leaving your keys in the freezer, hmmmmmmmmm.

@igrigorik
Copy link

Hmm. Haven't had a chance to actually run the code, but looking at your example.. try taking out sleep(3) and EM.stop - I don't think that will do what you want it to do. Namely, sleep(3) will block the reactor for 3 seconds, and immediately thereafter exit. Without the EM.stop your reactor will run forever.. but your requests should fire and finish. To exit cleanly, you should in fact be specifying a callback within the request to indicate that the reactor should be stopped once the request is complete.

@joshk
Copy link
Author

joshk commented Mar 8, 2011

Hey Ilya,

Ahhhh, once I remove the sleep and EM.stop it runs the queries I want. So there is no way to 'sleep' for x many seconds and let other em fibers/code finish executing? My next step is to benchmark this against other Faraday adapters, looking at both the fire and forget time and the full execution time.

Thanks for all the help!

Josh

@igrigorik
Copy link

If you want to stop the reactor after a certain amount of time passes, the way to do that is: EM.add_timer(3) { EM.stop }

In practice though, you still want to add some logic to make sure everyone finishes their requests before you terminate the reactor. Take a look at Multi that ships with em-http, its designed to do exactly this.. It'll dispatch all the requests in parallel, but you can then define a callback which will fire once all requests are done.

@joshk
Copy link
Author

joshk commented Mar 8, 2011

Hey Ilya,

Thanks for all the help, the above script works now, and it does the fire and forget strategy I was aiming for, mimicking what the Pusher gem does in its trigger_async method (or is it async_trigger).

I'll have a look over Multi, thanks for the tip.

If you think my adapter is, well, stupid or poorly thought out, any and all advice, comments and criticism is much appreciated.

@igrigorik
Copy link

No, no.. Given what you're going for (fire and forget), I think it makes sense.

My only worry is that if at some point you will want to synchronize to see what happens to all the requests (or wait for them to finish), then I think you'll have to introduce some other deferrable into the mix (which is multi is all about - and its very simple). Take a look at that code and you'll see what I mean.

Glad you got it working!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment