-
-
Save TvL2386/6c94504b4890dc502a1c to your computer and use it in GitHub Desktop.
Load testing Hash with TimedQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
require 'timeout' | |
# "borrowed from DUNNO" | |
# alternative: https://www.omniref.com/ruby/gems/girl_friday/0.9.4/symbols/TimedQueue | |
# alternative: http://spin.atomicobject.com/2014/07/07/ruby-queue-pop-timeout/ | |
class TimedQueue | |
def initialize(size = 0) | |
@que = Array.new(size) { yield } | |
@mutex = Mutex.new | |
@resource = ConditionVariable.new | |
end | |
def push(obj) | |
@mutex.synchronize do | |
@que.push obj | |
@resource.broadcast | |
end | |
end | |
alias_method :<<, :push | |
def pop_with_timeout(timeout=0.5) | |
deadline = Time.now + timeout | |
@mutex.synchronize do | |
loop do | |
return @que.pop unless @que.empty? | |
to_wait = deadline - Time.now | |
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 | |
@resource.wait(@mutex, to_wait) | |
end | |
end | |
end | |
def empty? | |
@que.empty? | |
end | |
def clear | |
@que.clear | |
end | |
def length | |
@que.length | |
end | |
end | |
# class TimedQueue | |
# def initialize | |
# @mutex = Mutex.new | |
# @queue = [] | |
# @recieved = ConditionVariable.new | |
# end | |
# | |
# def push(x) | |
# @mutex.synchronize do | |
# @queue << x | |
# @recieved.signal | |
# end | |
# end | |
# | |
# def pop(non_block = false) | |
# pop_with_timeout(non_block ? 0 : nil) | |
# end | |
# | |
# def pop_with_timeout(timeout = nil) | |
# @mutex.synchronize do | |
# if @queue.empty? | |
# @recieved.wait(@mutex, timeout) if timeout != 0 | |
# #if we're still empty after the timeout, raise exception | |
# raise ThreadError, "queue empty" if @queue.empty? | |
# end | |
# @queue.shift | |
# end | |
# end | |
# end | |
if $0 == __FILE__ | |
puts "Testing timed_queue.rb" | |
iterations = 100_000 | |
array = [] | |
threads = 1 | |
results = Hash.new { |h,k| h[k] = TimedQueue.new } | |
threads.times do | |
array << Thread.start do | |
begin | |
iterations.times { |x| results[x.to_s].pop_with_timeout(30)} | |
rescue Exception => e | |
puts "Exception #{e.message} occurred in q.timed_pop() thread" | |
end | |
end | |
end | |
threads.times do | |
array << Thread.start do | |
begin | |
iterations.times { |x| results[x.to_s].push(x.to_s) } | |
rescue Exception => e | |
puts "Exception #{e.message} occurred in q.push() thread" | |
end | |
end | |
end | |
array.map(&:join) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment