Skip to content

Instantly share code, notes, and snippets.

@therealadam
Created August 18, 2012 16:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save therealadam/3388161 to your computer and use it in GitHub Desktop.
Save therealadam/3388161 to your computer and use it in GitHub Desktop.
# Simple Queue
# ------------
#
# A riff on bitly's HTTP-based simple queue[1]
#
# [1] https://github.com/bitly/simplehttp/blob/master/simplequeue/simplequeue.c
#
# Is it any good?
# ===============
#
# As an exercise, yes!
#
# Usage
# =====
#
# ```
# $ ruby simplequeue.rb
# $ nc localhost 4242
# PUT 123
# OK
# PUT 456
# OK
# GET
# OK 123
#
# GET
# OK 456
#
# STATS
#
# puts: 2
# gets: 2
# OK
# ```
#
# What does it do?
# ================
#
# - Enqueue via `PUT <whatever>`
# - Dequeue via `GET`
# - Enqueue multiple via `MPUT <separator> <whatever> <sep> <whatever> ...`
# (TODO)
# - Dequeue multiple via `MGET <num> <separator>` (TODO)
# - Dump the entire queue via `DUMP` (TODO)
# - Exit on `EXIT` (TODO)
# - Fetch stats via `STATS`
# - Tracks put/get operations
# - Tracks queue depth and high water mark (TODO)
# - Tracks queue size in bytes (TODO)
# - Overflows the in-memory queue to an on-disk log (TODO)
# - Tracks overflow size (TODO)
# - Flush overflow log on SIGHUP (TODO)
#
# What's a good song to listen to right now?
# ==========================================
#
# "Good Vibrations". Did you realize it's the same song, structurally and
# thematically, as "Whole Lotta Love"? But the former is way better!
#
require 'celluloid/io'
class QueueStats
include Celluloid
def initialize
@gets = 0
@puts = 0
end
def incr_gets
@gets += 1
end
def incr_puts
@puts += 1
end
def dump
%{
puts: #{@puts}
gets: #{@gets}
}
end
end
class SimpleQueue
include Celluloid::IO
def initialize(queue, host, port)
puts "*** Starting queue server on #{host}:#{port}"
@queue = queue
@stats = QueueStats.new
@server = TCPServer.new(host, port)
run!
end
def finalize
@server.close if @server
end
def run
loop { handle_connection!(@server.accept) }
end
def handle_connection(socket)
_, port, host = socket.peeraddr
puts "*** Received connection from #{host}:#{port}"
loop do
message = socket.readpartial(4096)
puts "*** Message: #{message}"
case message
when /^GET/
puts "*** GET"
obj = @queue.pop
socket.write("OK #{obj}\n")
@stats.incr_gets!
when /^PUT/
body = message[4..-1]
puts "*** PUT #{body}"
@queue.push(body)
socket.write("OK\n")
@stats.incr_puts!
when /^STATS/
puts "*** STATS"
socket.write(@stats.dump)
puts @stats.dump
socket.write("OK\n")
else
puts "*** Unknown operation!"
end
end
rescue EOFError
puts "*** #{host}:#{port} disconnected"
end
end
SimpleQueue.new(Queue.new, '0.0.0.0', 4242).join
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment