Skip to content

Instantly share code, notes, and snippets.

@astro
Created October 2, 2008 09:46
Show Gist options
  • Save astro/14329 to your computer and use it in GitHub Desktop.
Save astro/14329 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
# An attempt to create an efficient HTTP client using eventmachine, utilizing HTTP pipelining. I reconsidered and decided it would be more natural to hack this in Erlang. Please finish.
require 'yaml'
require 'eventmachine'
require 'dnsruby'
require 'uri'
require 'zlib'
Dnsruby::Resolver::use_eventmachine true
Dnsruby::Resolver::start_eventmachine_loop false
class GodObject
include Singleton
def self.method_missing m, *a
puts "Sending #{m}(#{a.inspect}) to #{inspect}"
if block_given?
b = lambda { |*c| yield *c }
instance.send m, *a, &b
else
instance.send m, *a
end
end
end
class DNSName
def self.parse_ip_address(ip)
begin
Dnsruby::IPv4::create(ip)
rescue ArgumentError
begin
Dnsruby::IPv6::create(ip)
rescue ArgumentError
nil
end
end
end
def initialize(dns, name)
if address = DNSName.parse_ip_address(name)
@result = [:succeed, [address.to_s]]
else
df = dns.send_async(Dnsruby::Message.new(name))
df.callback &method(:on_success)
df.errback &method(:on_fail)
@result = nil
end
@deferrables = []
end
def defer
d = EM::DefaultDeferrable.new
if @result
apply_result_to d
else
@deferrables << d
end
d
end
private
def apply_result_to(d)
d.send *@result
d.send *@result
end
def apply_result_to_all
@deferrables.each { |d|
apply_result_to d
}
@deferrables = []
end
def on_success(msg)
addresses = msg.answer.select { |a|
a.kind_of?(Dnsruby::RR::IN::A) ||
a.kind_of?(Dnsruby::RR::IN::AAAA)
}.map { |a| a.address.to_s }
@result = [:succeed, addresses]
apply_result_to_all
end
def on_fail(msg, err)
@result = [:fail, err]
apply_result_to_all
end
end
class DNSCache < GodObject
def initialize
@dns = Dnsruby::Resolver.new
@queries = {}
end
def resolve(name)
q = if @queries.has_key? name
@queries[name]
else
@queries[name] = DNSName.new(@dns, name)
end
q.defer
end
end
# TODO: SSL, connection reviving when was idle
class HTTPConnection
module LineConnection
attr_accessor :handler
attr_accessor :mode
attr_accessor :packet_length
def connection_completed
@handler.opened!
@mode = :line
@line = ''
@packet_length = nil
end
def send_requests(requests)
send_data requests.to_s
puts "sent requests: #{requests.to_s.inspect}"
end
def receive_data(data)
while data.size > 0
data = send "receive_#{@mode}", data
end
end
def unbind
@handler.handle_disconnected!
end
private
def receive_line(data)
l, data = data.split("\n", 2)
@line += l
if data
@handler.handle_line @line
@line = ''
data
else
''
end
end
def receive_packet(data)
if @packet_length
chunk = (@packet_length > 0) ? data[0..(@packet_length - 1)] : ''
@handler.handle_packet_chunk data if chunk != ''
data = data[@packet_length..-1].to_s
@packet_length -= chunk.size
if @packet_length < 1
@handler.handle_packet_end
data
else
''
end
else
@handler.handle_chunk data
''
end
end
end
def initialize(host, port)
@requests = []
@host = host
@port = port
open_connection
@state = :status
@code, @status = nil, nil
@headers = {}
@content_decoder = nil
end
def open_connection
@opened = false
@c = EM.connect @host, @port, LineConnection
@c.handler = self
end
def opened!
@opened = true
may_send
end
def request(text, &block)
@requests << [text, block]
if @c
may_send
else
open_connection
end
end
def tell_requester(what, *msg)
if @requests.size < 1
# Why is there more data?
@c.close_connection if @c
return
end
block = @requests.first[1]
block.call what, *msg
if what == :end
@requests.shift
@c.close_connection if @requests.size < 1 and @c
end
end
def handle_line(line)
line.strip!
case @state
when :status
http_ver, code, @status = line.split(' ', 3)
@code = code.to_i
@state = :headers
when :headers
if line != ''
k, v = line.split(': ', 2)
@headers[k] = v
else
# Headers finished
tell_requester :response, @code, @status, @headers
if @headers['Transfer-Encoding'] == 'chunked'
@chunked = true
@dumb = false
@state = :chunk_length
elsif (l = @headers['Content-Length'])
@chunked = false
@dumb = false
@c.mode = :packet
@c.packet_length = l.to_i
@state = :body
elsif (@code >= 100 && @code <= 199) || @code == 204 || @code == 304
tell_requester :end
@state = :status
else
@chunked = false
@dumb = true
@c.mode = :packet
@c.packet_length = nil
@state = :body
end
@content_decoder = case @headers['Content-Encoding']
when 'deflate'
z = Zlib::Inflate.new(nil)
z.method :inflate
when nil
else
raise "Unknown Content-Encoding: #{@headers['Content-Encoding']}"
end
end
when :chunk_length
if line != ''
@c.packet_length = line.to_i(16)
if @c.packet_length == 0
tell_requester :end
@state = :chunk_trailer
else
@c.mode = :packet
@state = :body
end
end
when :chunk_trailer
if line == ''
@state = :status
end
end
end
def handle_packet_chunk(data)
if @content_decoder
puts "*** DECOMPRESSING ***"
data = @content_decoder.call(data)
end
tell_requester :body, data
end
def handle_packet_end
if @content_decoder
data = @content_decoder.call(nil)
tell_requester :body, data
end
@c.mode = :line
if @chunked
@state = :chunk_length
else
@state = :headers
tell_requester :end
end
end
def handle_disconnected!
@opened = false
@c = nil
if @dumb
tell_requester :end
end
if @requests.size > 0
open_connection
end
end
private
def may_send
if @opened
@c.send_requests(@requests.map { |r| r[0] })
end
end
end
class ConnectionPool < GodObject
def initialize
@connections = {}
end
def request(scheme, host, port, text, &block)
target = [scheme, host, port]
c = if @connections.has_key? target
@connections[target]
else
@connections[target] = new_connection(*target)
end
c.request(text, &block)
end
private
def new_connection(scheme, host, port)
case scheme
when 'http' then HTTPConnection
else raise "Unsupported URL scheme: #{scheme}"
end.new(host, port)
end
end
class Transfer
def initialize(url)
@can_go = false
@has_addresses = false
@error = nil
@uri = URI::parse(url)
d = DNSCache.resolve(@uri.host)
d.callback { |addresses|
puts "dns for #{@uri.host}: #{addresses.inspect}"
@addresses = addresses
@has_addresses = true
may_go
}
d.errback { |err|
puts "dns for #{@uri.host}: #{err}"
@error = err.to_s
@has_addresses = true
may_go
}
@receivers = []
end
def get(spawnable)
@receivers << spawnable
end
def go!
@can_go = true
may_go
end
private
def notify_receivers(*msg)
@receivers.each { |r| r.notify *msg }
end
def may_go
if @can_go && @has_addresses
if @error
notify_receivers :error, @error
else
# TODO: RR-addresses
request_headers = {
'Host' => @uri.host,
'Connection' => 'Keep-Alive',
'Accept-Encoding' => 'chunked, deflate, identity'}
ConnectionPool.request(@uri.scheme, @addresses[0], @uri.port,
"GET #{@uri.request_uri} HTTP/1.1\r\n" +
request_headers.map { |k,v|
"#{k}: #{v}\r\n"
}.to_s +
"\r\n") { |*msg|
notify_receivers *msg
}
end
end
end
end
class TransferManager < GodObject
def initialize
@transfers = {}
end
def get(url, spawnable)
t = if @transfers.has_key? url
@transfers[url]
else
@transfers[url] = Transfer.new(url)
end
t.get spawnable
end
##
# Call this after everybody has made his get request, so nobody gets
# chunks starts at the half of the stream just because he has
# requested too late and the network was too fast.
#
# TODO: this can be solved more elegantly
def go!
@transfers.each { |url,t|
t.go!
}
end
end
EM.run do
reader = EM.spawn { |w,*m|
case w
when :body
b, = m
puts "reader: #{b[0..10].inspect} (#{b.size})"
else
puts "reader: #{w} #{m.inspect}"
end
}
#=begin
YAML::load_file('config.yaml')['collections'].each { |cat,urls|
urls.each { |url|
TransferManager.get(url, reader)
}
}
#=end
TransferManager.get("http://api.flickr.com/services/feeds/photos_public.gne?id=73915810@N00&format=atom", reader)
TransferManager.get("http://api.flickr.com/services/feeds/photos_public.gne?id=58728439@N00&format=atom", reader)
TransferManager.go!
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment