Skip to content

Instantly share code, notes, and snippets.

@seki
Created January 9, 2013 10:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seki/4492047 to your computer and use it in GitHub Desktop.
Save seki/4492047 to your computer and use it in GitHub Desktop.
Distributed/Persistent Queue using Drip#write_if_latest.
require 'drip'
class DripQueue
def initialize(drip=Drip.new(nil))
@drip = drip
end
def empty
head = @drip.head[0][0] rescue 0
@drip.write(head, 'cursor')
end
def enq(value)
@drip.write(value, 'queue')
end
def deq
begin
rev, cursor = @drip.head(1, 'cursor')[0]
cursor, value = @drip.read_tag(cursor, 'queue', 1, 1)[0]
sleep(0.01)
end while @drip.write_if_latest([['cursor', rev]], cursor, 'cursor').nil?
value
end
def deq0
cursor = @drip.head(1, 'cursor')[0][1]
cursor, value = @drip.read_tag(cursor, 'queue', 1, 1)[0]
sleep(0.01)
@drip.write(cursor, 'cursor')
value
end
end
queue = DripQueue.new
queue.empty
th = (1..10).collect do |n|
Thread.new(n) do |m|
10.times do
p [m, queue.deq]
sleep(rand(0.1))
end
end
end
100.times do |n|
queue.enq(n)
end
th.each {|t| t.join}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment