Last active
August 29, 2015 14:10
-
-
Save stevecj/f124d248758410620e1b to your computer and use it in GitHub Desktop.
ReversibleQueue: Works much like Ruby's native Queue, but provides an #undeq method, which Ruby's own Queue does not have
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Works much like Ruby's native Queue, but provides an #undeq | |
# method, which Ruby's own Queue does not have. | |
class ReversibleQueue | |
def initialize | |
@queue_array = [] | |
@threads_waiting = [] | |
@mutex = Mutex.new | |
end | |
# Push an object onto the tail of the queue | |
def enq(object) | |
mutex.synchronize do | |
queue_array.push object | |
data_is_available | |
end | |
self | |
end | |
# #<< is an alias for #enq | |
alias << enq | |
# Shift the head item out of the queue. Block until data | |
# is available if the queue is empty. | |
def deq | |
mutex.synchronize { | |
wait_for_data | |
queue_array.shift | |
} | |
end | |
# Insert an item into the head of the queue | |
def undeq( object ) | |
mutex.synchronize do | |
queue_array.unshift object | |
data_is_available | |
end | |
self | |
end | |
def empty? | |
mutex.synchronize{ queue_array.empty? } | |
end | |
# Returns the sequence of objects currently in the queue as an | |
# array. | |
def snapshot | |
mutex.synchronize{ queue_array.dup } | |
end | |
private | |
attr_reader :queue_array, :threads_waiting, :mutex | |
def wait_for_data | |
# Use a while loop so we can resume waiting if another thread | |
# beats us to the punch after data becomes available. | |
while queue_array.empty? | |
threads_waiting << Thread.current | |
mutex.sleep | |
end | |
end | |
def data_is_available | |
next_waiting = threads_waiting.shift | |
next_waiting.wakeup if next_waiting | |
end | |
end | |
# === Demo === | |
rq = ReversibleQueue.new | |
rq << :a << :b | |
rq.undeq :aa | |
time_ref = Time.now | |
puts "Initial snapshot: #{rq.snapshot.inspect}" | |
Thread.new do | |
4.times do | |
puts "Thread deq: #{rq.deq.inspect} (t: #{Time.now - time_ref})" | |
end | |
puts "Thread done" | |
end | |
sleep 0.1 | |
rq << :c << :d << :e | |
sleep 0.1 | |
until rq.empty? | |
puts "Main deq: #{rq.deq.inspect} (t: #{Time.now - time_ref})" | |
end | |
# === Demo Output === | |
# Initial snapshot: [:aa, :a, :b] | |
# Thread deq: :aa (t: 0.000198) | |
# Thread deq: :a (t: 0.000263) | |
# Thread deq: :b (t: 0.000295) | |
# Thread deq: :c (t: 0.101016) | |
# Thread done | |
# Main deq: :d (t: 0.202011) | |
# Main deq: :e (t: 0.202084) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment