Created
May 3, 2012 01:09
-
-
Save ender672/2582334 to your computer and use it in GitHub Desktop.
Multithreaded rack handler. Proof-of-concept. Use Puma instead.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'socket' | |
require 'uri' | |
require 'rack/handler' # gem install rack | |
require 'rack/rewindable_input' | |
require 'http/parser' # gem install http_parser.rb | |
require 'stringio' | |
module Rack::Handler | |
module Qute | |
def self.run(app, options={}) | |
opts = { :Host => '0.0.0.0', :Port => '3000' }.merge(options) | |
if opts[:Host][0] == ?/ | |
Socket.unix_server_loop(opts[:Host]){ |c| handle(c, app) } | |
else | |
Socket.tcp_server_loop(opts[:Host], opts[:Port]){ |c| handle(c, app) } | |
end | |
end | |
private | |
def self.handle(client, app) | |
Thread.new do | |
begin | |
req = Req.new | |
until req.done | |
req.parser << client.readpartial(4096) | |
end | |
send_response(client, *app.call(req.parser.headers)) | |
rescue | |
ensure | |
client.close | |
end | |
end | |
end | |
def self.send_response(client, status, headers, body) | |
client.write "HTTP/1.1 #{status}\r\n" | |
headers.each do |k, vs| | |
vs.split("\n").each{ |v| client.write "#{k}: #{v}\r\n" } | |
end | |
client.write "\r\n" | |
body.each{ |part| client.write part } | |
rescue Errno::EPIPE, Errno::ECONNRESET # client closed connection | |
ensure | |
body.close if body.respond_to?(:close) | |
end | |
class Req | |
PROTO = { | |
'rack.version' => [1, 1], | |
'rack.errors' => $stderr, | |
'rack.multithread' => true, | |
'rack.multiprocess' => false, | |
'rack.run_once' => false, | |
'rack.url_scheme' => 'http', | |
'SCRIPT_NAME' => '' | |
} | |
attr_reader :done, :parser | |
def initialize | |
@parser = Http::Parser.new(self) | |
@body = '' | |
@done = false | |
end | |
def on_headers_complete(h) | |
h.merge! PROTO | |
h['PATH_INFO'] = h['REQUEST_PATH'] = @parser.request_path | |
h['REQUEST_METHOD'] = @parser.http_method | |
h['QUERY_STRING'] = @parser.query_string | |
return unless h['Host'] | |
u = URI.parse "http://#{h['Host']}" | |
h['SERVER_NAME'] = u.host | |
h['SERVER_PORT'] = u.port.to_s | |
end | |
def on_body(chunk) | |
@body << chunk | |
end | |
def on_message_complete | |
body = Rack::RewindableInput.new(StringIO.new(@body)) | |
@parser.headers['rack.input'] = body | |
@done = true | |
end | |
end | |
end | |
register :qute, Qute | |
end | |
if $0 == __FILE__ | |
case ARGV[0] | |
when 'lobster' | |
require 'rack/lobster' | |
Rack::Handler::Qute.run Rack::Lobster.new | |
when 'sleeper' | |
app = lambda do |*| | |
sleep 1 | |
[200, {}, ['hello world']] | |
end | |
Rack::Handler::Qute.run app | |
else | |
require 'test/unit' | |
require 'open-uri' | |
require 'tempfile' | |
require 'rack/lobster' | |
class TestQute < Test::Unit::TestCase | |
include Rack | |
def setup | |
@sock_path = File.expand_path('../test.socket', __FILE__) | |
@default_url = 'http://localhost:3000' | |
end | |
def test_tcp_server | |
lobster_server do | |
res = open(@default_url) | |
assert_equal '200', res.status[0] | |
end | |
end | |
def test_unix_socket_server | |
lobster_server(:Host => @sock_path) do | |
Socket.unix(@sock_path) do |socket| | |
socket.write "GET / HTTP/1.1\r\n\r\n" | |
assert socket.read.start_with?('HTTP/1.1 200') | |
end | |
end | |
end | |
def test_crashing_server | |
server(lambda{ |*| raise 'hell' }) do | |
assert_raises(EOFError){ open(@default_url) } | |
end | |
end | |
def test_sleep_concurrency | |
app = lambda do |*| | |
sleep 4 | |
[200, {}, ['hello world']] | |
end | |
server(app) do | |
threads = (0..12).map do | |
Thread.new do | |
res = open(@default_url) | |
assert_equal '200', res.status[0] | |
assert_equal 'hello world', res.read | |
end | |
end | |
threads.each{ |t| t.join } | |
end | |
end | |
private | |
def server(app, options={}) | |
pid = Process.fork{ Handler::Qute.run(app, options) } | |
if options[:Host] && options[:Host][0] == ?/ | |
# If using a unix socket, loop until we can connect | |
loop do | |
Socket.unix(options[:Host]){} rescue next | |
break | |
end | |
end | |
yield | |
ensure | |
Process.kill('TERM', pid) | |
Process.wait | |
end | |
def lobster_server(options={}, &block) | |
server(Lobster.new, options, &block) | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
add to benchmark:
setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
BasicSocket.do_not_reverse_lookup = true