Created
June 22, 2011 17:37
-
-
Save rklemme/1040631 to your computer and use it in GitHub Desktop.
Prioritized work queue with weighted items
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# statistics | |
# | |
#<WorkQueue:0x10039da4 | |
@p=#<Proc:0x10032734>, | |
@q= | |
[#<Picker:0x10039d7c | |
@items=[#<struct SRV domain="server-1", port=1234, priority=1, weight=1>], | |
@total=1, | |
@weight=#<Proc:0x1003293c>>, | |
#<Picker:0x10039d54 | |
@items= | |
[#<struct SRV domain="server-2-A", port=1234, priority=2, weight=16>, | |
#<struct SRV domain="server-2-B", port=1234, priority=2, weight=4>, | |
#<struct SRV domain="server-2-C", port=1234, priority=2, weight=8>], | |
@total=28, | |
@weight=#<Proc:0x1003293c>>, | |
#<Picker:0x10039d2c | |
@items=[#<struct SRV domain="server-3", port=1234, priority=4, weight=50>], | |
@total=50, | |
@weight=#<Proc:0x1003293c>>], | |
@w=#<Proc:0x1003293c>> | |
server-1 : 100000 0 0 0 0 | |
server-2-A : 0 57137 32571 10292 0 | |
server-2-B : 0 14247 24788 60965 0 | |
server-2-C : 0 28616 42641 28743 0 | |
server-3 : 0 0 0 0 100000 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Picks any of given items based on weight | |
class Picker | |
def initialize(items = nil, &weight) | |
@total = 0 | |
@items = [] | |
@weight = weight | |
items.each {|it| self << it} if items | |
end | |
def empty? | |
@total == 0 | |
end | |
# add an item | |
def <<(it) | |
@total += @weight[it] | |
@items << it | |
self | |
end | |
# pick and leave in place | |
def pick | |
@items[pos] | |
end | |
# pick and remove | |
def fetch | |
it = @items.delete_at pos | |
@total -= @weight[it] | |
it | |
end | |
def dup | |
super.tap do |c| | |
c.instance_variable_set '@items', @items.dup | |
end | |
end | |
def freeze | |
@items.freeze | |
super | |
end | |
private | |
# determine position | |
def pos | |
return nil if @total == 0 | |
sum = 0 | |
pick = rand(@total) | |
@items.each_with_index do |it, idx| | |
sum += @weight[it] | |
return idx if sum > pick | |
end | |
raise "Algorithm error" | |
end | |
end | |
# Queue of items with priorities and weights | |
class WorkQueue | |
def initialize(items, weight, prio) | |
@q = [] | |
@w = weight.to_proc | |
@p = prio.to_proc | |
items.each do |it| | |
(@q[@p[it]] ||= Picker.new(&weight)) << it | |
end | |
@q.compact! | |
end | |
def empty? | |
@q.all? &:empty | |
end | |
def deq | |
until @q.empty? | |
hd = @q.first | |
if hd.empty? | |
@q.shift | |
else | |
return hd.fetch | |
end | |
end | |
nil | |
end | |
def freeze | |
@q.each(&:freeze).freeze | |
super | |
end | |
def dup | |
super.tap do |c| | |
c.instance_variable_set '@q', @q.map(&:dup) | |
end | |
end | |
end | |
require 'pp' | |
# dummy record type | |
SRV = Struct.new :domain, :port, :priority, :weight | |
# dummy connection method with log output | |
def connect(it) | |
# simulate 1/3 is OK | |
(rand(3) == 0).tap do |ok| | |
printf "try %p... -> %s\n", it, ok ? "OK" : "fail" | |
end | |
end | |
# dummy data | |
records = (1..10).map {|i| SRV.new "www#{i}.dom.com", 1000 + i, rand(3), rand(100) } | |
# can be reused: | |
template = WorkQueue.new(records, :weight, :priority).freeze | |
pp template | |
# per connection attempt | |
wk = template.dup | |
it = nil | |
begin | |
it = wk.deq | |
end until it.nil? || connect(it) | |
pp it, wk | |
puts <<STR | |
# | |
# statistics | |
# | |
STR | |
records = [ | |
SRV["server-1", 1234, 1, 1], | |
SRV["server-2-A", 1234, 2, 16], | |
SRV["server-2-B", 1234, 2, 4], | |
SRV["server-2-C", 1234, 2, 8], | |
SRV["server-3", 1234, 4, 50], | |
] | |
template = WorkQueue.new(records, :weight, :priority).freeze | |
pp template | |
stats = Hash.new {|h,k| h[k] = Array.new(records.length, 0)} | |
100_000.times do | |
wk = template.dup | |
pos = 0 | |
while it = wk.deq | |
stats[it.domain][pos] += 1 | |
pos += 1 | |
end | |
end | |
stats.sort.each do |srv, positions| | |
printf "%-15s: %s\n", srv, positions.map {|count| "%10d" % count}.join(' ') | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment