Skip to content

Instantly share code, notes, and snippets.

@tarcieri
Created January 3, 2012 22:49
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tarcieri/1557390 to your computer and use it in GitHub Desktop.
Save tarcieri/1557390 to your computer and use it in GitHub Desktop.
Playing around with @j_brisbin's Disruptor web server idea in JRuby
# Requires Java! OH NO!
require 'java'
require 'disruptor' # you'll need to snag disruptor.jar
module Warbird
class Worker
include com.lmax.disruptor.WorkHandler
java_import 'java.nio.ByteBuffer'
def initialize(ring_buffer)
@ring_buffer = ring_buffer
@msg = ByteBuffer.wrap(
"HTTP/1.1 200 OK\r\n" \
"Connection: Keep-Alive\r\n" \
"Content-Type: text/plain\r\n" \
"Content-Length: 12\r\n\r\n" \
"Hello World!".to_java_bytes
)
end
def on_event(ev)
unless ev.id
puts "zomgwtfbbq empty event!"
return
end
buffer = @ring_buffer.get(ev.buffer)
buffer.clear if buffer.position > 0
nbytes = read(ev.channel, buffer)
puts "read #{nbytes} bytes"
if nbytes < 1
ev.key.cancel
write ev.channel, @msg
ev.channel.close
end
ensure
@ring_buffer.publish(ev.buffer) if ev.buffer
end
def read(channel, buffer)
channel.read(buffer)
end
def write(channel, buffer)
channel.write(buffer)
end
end
class Event < Struct.new(:id, :channel, :selector, :key, :server, :buffer); end
class Server
java_import 'java.lang.Runtime'
java_import 'java.net.InetSocketAddress'
java_import 'java.util.concurrent.Executors'
java_import 'java.nio.ByteBuffer'
java_import 'java.nio.channels.ServerSocketChannel'
java_import 'java.nio.channels.Selector'
java_import 'java.nio.channels.SelectionKey'
java_import 'com.lmax.disruptor.dsl.Disruptor'
java_import 'com.lmax.disruptor.WorkerPool'
java_import 'com.lmax.disruptor.WorkHandler'
java_import 'com.lmax.disruptor.SingleThreadedClaimStrategy'
java_import 'com.lmax.disruptor.BlockingWaitStrategy'
java_import 'com.lmax.disruptor.FatalExceptionHandler'
# FFFFUUUUU Java, Y U SO VERBOSE?
SocketOpts = java.net.StandardSocketOptions
# How big of receive buffers should we use?
BUFFER_SIZE = 4096
def initialize(addr, port)
cores = Runtime.runtime.available_processors
ring_size = 2 ** (cores + 1)
buffer_factory = proc { ByteBuffer.allocate(BUFFER_SIZE) }
claim_strategy = SingleThreadedClaimStrategy.new(ring_size)
wait_strategy = BlockingWaitStrategy.new
@executor = Executors.new_fixed_thread_pool(cores)
buffer = Disruptor.new(buffer_factory, @executor, claim_strategy, wait_strategy)
@buffer_ring = buffer.start
workers = cores.times.map { Worker.new(@buffer_ring) }.to_java(WorkHandler)
event_factory = proc { Event.new }
exception_handler = FatalExceptionHandler.new
worker_pool = WorkerPool.new(event_factory, claim_strategy, wait_strategy, exception_handler, workers)
@worker_ring = worker_pool.start(@executor)
@server = ServerSocketChannel.open
@server.configure_blocking false
@server.bind InetSocketAddress.new(addr, port), 1024
@selector = Selector.open
@server.register @selector, SelectionKey::OP_ACCEPT
server_sock = @server.socket
server_sock.receive_buffer_size = BUFFER_SIZE
server_sock.reuse_address = true
end
def run
# Claim these eagerly to chop off a little bit of latency, or something
worker_id = @worker_ring.next
while true
puts "selecting!"
events = @selector.select
next if events < 1
@selector.selected_keys.each do |key|
if key.acceptable?
puts "got a connection"
while accept(worker_id)
worker_id = @worker_ring.next
end
elsif key.readable?
puts "socket is readable"
send_event worker_id, key.channel, key
worker_id = @worker_ring.next
end
end
end
end
def accept(worker_id)
channel = @server.accept
return unless channel
channel.configure_blocking false
channel.set_option SocketOpts::SO_KEEPALIVE, true
channel.set_option SocketOpts::TCP_NODELAY, true
buffer_size = java.lang.Integer.new(BUFFER_SIZE)
channel.set_option SocketOpts::SO_RCVBUF, buffer_size
channel.set_option SocketOpts::SO_SNDBUF, buffer_size
read_key = channel.register @selector, SelectionKey::OP_READ
send_event worker_id, channel, read_key
channel
end
def send_event(worker_id, channel, ready_key)
event = @worker_ring.get worker_id
event.id = worker_id
event.channel = channel
event.selector = @selector
event.key = ready_key
event.server = @server
event.buffer = @buffer_ring.next
@worker_ring.publish worker_id
end
end
end
if $0 == __FILE__
addr, port = "127.0.0.1", 3001
puts "Starting Warbird on #{addr}:#{port}"
server = Warbird::Server.new(addr, port)
p server
server.run
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment