Created
January 3, 2012 22:49
-
-
Save tarcieri/1557390 to your computer and use it in GitHub Desktop.
Playing around with @j_brisbin's Disruptor web server idea in JRuby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Requires Java! OH NO! | |
require 'java' | |
require 'disruptor' # you'll need to snag disruptor.jar | |
module Warbird | |
class Worker | |
include com.lmax.disruptor.WorkHandler | |
java_import 'java.nio.ByteBuffer' | |
def initialize(ring_buffer) | |
@ring_buffer = ring_buffer | |
@msg = ByteBuffer.wrap( | |
"HTTP/1.1 200 OK\r\n" \ | |
"Connection: Keep-Alive\r\n" \ | |
"Content-Type: text/plain\r\n" \ | |
"Content-Length: 12\r\n\r\n" \ | |
"Hello World!".to_java_bytes | |
) | |
end | |
def on_event(ev) | |
unless ev.id | |
puts "zomgwtfbbq empty event!" | |
return | |
end | |
buffer = @ring_buffer.get(ev.buffer) | |
buffer.clear if buffer.position > 0 | |
nbytes = read(ev.channel, buffer) | |
puts "read #{nbytes} bytes" | |
if nbytes < 1 | |
ev.key.cancel | |
write ev.channel, @msg | |
ev.channel.close | |
end | |
ensure | |
@ring_buffer.publish(ev.buffer) if ev.buffer | |
end | |
def read(channel, buffer) | |
channel.read(buffer) | |
end | |
def write(channel, buffer) | |
channel.write(buffer) | |
end | |
end | |
class Event < Struct.new(:id, :channel, :selector, :key, :server, :buffer); end | |
class Server | |
java_import 'java.lang.Runtime' | |
java_import 'java.net.InetSocketAddress' | |
java_import 'java.util.concurrent.Executors' | |
java_import 'java.nio.ByteBuffer' | |
java_import 'java.nio.channels.ServerSocketChannel' | |
java_import 'java.nio.channels.Selector' | |
java_import 'java.nio.channels.SelectionKey' | |
java_import 'com.lmax.disruptor.dsl.Disruptor' | |
java_import 'com.lmax.disruptor.WorkerPool' | |
java_import 'com.lmax.disruptor.WorkHandler' | |
java_import 'com.lmax.disruptor.SingleThreadedClaimStrategy' | |
java_import 'com.lmax.disruptor.BlockingWaitStrategy' | |
java_import 'com.lmax.disruptor.FatalExceptionHandler' | |
# FFFFUUUUU Java, Y U SO VERBOSE? | |
SocketOpts = java.net.StandardSocketOptions | |
# How big of receive buffers should we use? | |
BUFFER_SIZE = 4096 | |
def initialize(addr, port) | |
cores = Runtime.runtime.available_processors | |
ring_size = 2 ** (cores + 1) | |
buffer_factory = proc { ByteBuffer.allocate(BUFFER_SIZE) } | |
claim_strategy = SingleThreadedClaimStrategy.new(ring_size) | |
wait_strategy = BlockingWaitStrategy.new | |
@executor = Executors.new_fixed_thread_pool(cores) | |
buffer = Disruptor.new(buffer_factory, @executor, claim_strategy, wait_strategy) | |
@buffer_ring = buffer.start | |
workers = cores.times.map { Worker.new(@buffer_ring) }.to_java(WorkHandler) | |
event_factory = proc { Event.new } | |
exception_handler = FatalExceptionHandler.new | |
worker_pool = WorkerPool.new(event_factory, claim_strategy, wait_strategy, exception_handler, workers) | |
@worker_ring = worker_pool.start(@executor) | |
@server = ServerSocketChannel.open | |
@server.configure_blocking false | |
@server.bind InetSocketAddress.new(addr, port), 1024 | |
@selector = Selector.open | |
@server.register @selector, SelectionKey::OP_ACCEPT | |
server_sock = @server.socket | |
server_sock.receive_buffer_size = BUFFER_SIZE | |
server_sock.reuse_address = true | |
end | |
def run | |
# Claim these eagerly to chop off a little bit of latency, or something | |
worker_id = @worker_ring.next | |
while true | |
puts "selecting!" | |
events = @selector.select | |
next if events < 1 | |
@selector.selected_keys.each do |key| | |
if key.acceptable? | |
puts "got a connection" | |
while accept(worker_id) | |
worker_id = @worker_ring.next | |
end | |
elsif key.readable? | |
puts "socket is readable" | |
send_event worker_id, key.channel, key | |
worker_id = @worker_ring.next | |
end | |
end | |
end | |
end | |
def accept(worker_id) | |
channel = @server.accept | |
return unless channel | |
channel.configure_blocking false | |
channel.set_option SocketOpts::SO_KEEPALIVE, true | |
channel.set_option SocketOpts::TCP_NODELAY, true | |
buffer_size = java.lang.Integer.new(BUFFER_SIZE) | |
channel.set_option SocketOpts::SO_RCVBUF, buffer_size | |
channel.set_option SocketOpts::SO_SNDBUF, buffer_size | |
read_key = channel.register @selector, SelectionKey::OP_READ | |
send_event worker_id, channel, read_key | |
channel | |
end | |
def send_event(worker_id, channel, ready_key) | |
event = @worker_ring.get worker_id | |
event.id = worker_id | |
event.channel = channel | |
event.selector = @selector | |
event.key = ready_key | |
event.server = @server | |
event.buffer = @buffer_ring.next | |
@worker_ring.publish worker_id | |
end | |
end | |
end | |
if $0 == __FILE__ | |
addr, port = "127.0.0.1", 3001 | |
puts "Starting Warbird on #{addr}:#{port}" | |
server = Warbird::Server.new(addr, port) | |
p server | |
server.run | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment