Skip to content

Instantly share code, notes, and snippets.

@stevecj
Last active August 29, 2015 14:10
Show Gist options
  • Save stevecj/f124d248758410620e1b to your computer and use it in GitHub Desktop.
Save stevecj/f124d248758410620e1b to your computer and use it in GitHub Desktop.
ReversibleQueue: Works much like Ruby's native Queue, but provides an #undeq method, which Ruby's own Queue does not have
# Works much like Ruby's native Queue, but provides an #undeq
# method, which Ruby's own Queue does not have.
class ReversibleQueue
def initialize
@queue_array = []
@threads_waiting = []
@mutex = Mutex.new
end
# Push an object onto the tail of the queue
def enq(object)
mutex.synchronize do
queue_array.push object
data_is_available
end
self
end
# #<< is an alias for #enq
alias << enq
# Shift the head item out of the queue. Block until data
# is available if the queue is empty.
def deq
mutex.synchronize {
wait_for_data
queue_array.shift
}
end
# Insert an item into the head of the queue
def undeq( object )
mutex.synchronize do
queue_array.unshift object
data_is_available
end
self
end
def empty?
mutex.synchronize{ queue_array.empty? }
end
# Returns the sequence of objects currently in the queue as an
# array.
def snapshot
mutex.synchronize{ queue_array.dup }
end
private
attr_reader :queue_array, :threads_waiting, :mutex
def wait_for_data
# Use a while loop so we can resume waiting if another thread
# beats us to the punch after data becomes available.
while queue_array.empty?
threads_waiting << Thread.current
mutex.sleep
end
end
def data_is_available
next_waiting = threads_waiting.shift
next_waiting.wakeup if next_waiting
end
end
# === Demo ===
rq = ReversibleQueue.new
rq << :a << :b
rq.undeq :aa
time_ref = Time.now
puts "Initial snapshot: #{rq.snapshot.inspect}"
Thread.new do
4.times do
puts "Thread deq: #{rq.deq.inspect} (t: #{Time.now - time_ref})"
end
puts "Thread done"
end
sleep 0.1
rq << :c << :d << :e
sleep 0.1
until rq.empty?
puts "Main deq: #{rq.deq.inspect} (t: #{Time.now - time_ref})"
end
# === Demo Output ===
# Initial snapshot: [:aa, :a, :b]
# Thread deq: :aa (t: 0.000198)
# Thread deq: :a (t: 0.000263)
# Thread deq: :b (t: 0.000295)
# Thread deq: :c (t: 0.101016)
# Thread done
# Main deq: :d (t: 0.202011)
# Main deq: :e (t: 0.202084)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment