Skip to content

Instantly share code, notes, and snippets.

@ender672
Created May 3, 2012 01:09
Show Gist options
  • Save ender672/2582334 to your computer and use it in GitHub Desktop.
Save ender672/2582334 to your computer and use it in GitHub Desktop.
Multithreaded rack handler. Proof-of-concept. Use Puma instead.
require 'socket'
require 'uri'
require 'rack/handler' # gem install rack
require 'rack/rewindable_input'
require 'http/parser' # gem install http_parser.rb
require 'stringio'
module Rack::Handler
module Qute
def self.run(app, options={})
opts = { :Host => '0.0.0.0', :Port => '3000' }.merge(options)
if opts[:Host][0] == ?/
Socket.unix_server_loop(opts[:Host]){ |c| handle(c, app) }
else
Socket.tcp_server_loop(opts[:Host], opts[:Port]){ |c| handle(c, app) }
end
end
private
def self.handle(client, app)
Thread.new do
begin
req = Req.new
until req.done
req.parser << client.readpartial(4096)
end
send_response(client, *app.call(req.parser.headers))
rescue
ensure
client.close
end
end
end
def self.send_response(client, status, headers, body)
client.write "HTTP/1.1 #{status}\r\n"
headers.each do |k, vs|
vs.split("\n").each{ |v| client.write "#{k}: #{v}\r\n" }
end
client.write "\r\n"
body.each{ |part| client.write part }
rescue Errno::EPIPE, Errno::ECONNRESET # client closed connection
ensure
body.close if body.respond_to?(:close)
end
class Req
PROTO = {
'rack.version' => [1, 1],
'rack.errors' => $stderr,
'rack.multithread' => true,
'rack.multiprocess' => false,
'rack.run_once' => false,
'rack.url_scheme' => 'http',
'SCRIPT_NAME' => ''
}
attr_reader :done, :parser
def initialize
@parser = Http::Parser.new(self)
@body = ''
@done = false
end
def on_headers_complete(h)
h.merge! PROTO
h['PATH_INFO'] = h['REQUEST_PATH'] = @parser.request_path
h['REQUEST_METHOD'] = @parser.http_method
h['QUERY_STRING'] = @parser.query_string
return unless h['Host']
u = URI.parse "http://#{h['Host']}"
h['SERVER_NAME'] = u.host
h['SERVER_PORT'] = u.port.to_s
end
def on_body(chunk)
@body << chunk
end
def on_message_complete
body = Rack::RewindableInput.new(StringIO.new(@body))
@parser.headers['rack.input'] = body
@done = true
end
end
end
register :qute, Qute
end
if $0 == __FILE__
case ARGV[0]
when 'lobster'
require 'rack/lobster'
Rack::Handler::Qute.run Rack::Lobster.new
when 'sleeper'
app = lambda do |*|
sleep 1
[200, {}, ['hello world']]
end
Rack::Handler::Qute.run app
else
require 'test/unit'
require 'open-uri'
require 'tempfile'
require 'rack/lobster'
class TestQute < Test::Unit::TestCase
include Rack
def setup
@sock_path = File.expand_path('../test.socket', __FILE__)
@default_url = 'http://localhost:3000'
end
def test_tcp_server
lobster_server do
res = open(@default_url)
assert_equal '200', res.status[0]
end
end
def test_unix_socket_server
lobster_server(:Host => @sock_path) do
Socket.unix(@sock_path) do |socket|
socket.write "GET / HTTP/1.1\r\n\r\n"
assert socket.read.start_with?('HTTP/1.1 200')
end
end
end
def test_crashing_server
server(lambda{ |*| raise 'hell' }) do
assert_raises(EOFError){ open(@default_url) }
end
end
def test_sleep_concurrency
app = lambda do |*|
sleep 4
[200, {}, ['hello world']]
end
server(app) do
threads = (0..12).map do
Thread.new do
res = open(@default_url)
assert_equal '200', res.status[0]
assert_equal 'hello world', res.read
end
end
threads.each{ |t| t.join }
end
end
private
def server(app, options={})
pid = Process.fork{ Handler::Qute.run(app, options) }
if options[:Host] && options[:Host][0] == ?/
# If using a unix socket, loop until we can connect
loop do
Socket.unix(options[:Host]){} rescue next
break
end
end
yield
ensure
Process.kill('TERM', pid)
Process.wait
end
def lobster_server(options={}, &block)
server(Lobster.new, options, &block)
end
end
end
end
@ender672
Copy link
Author

ender672 commented May 8, 2012

add to benchmark:

setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
BasicSocket.do_not_reverse_lookup = true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment