Skip to content

Instantly share code, notes, and snippets.

@kbrock
Last active June 8, 2017 02:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kbrock/8683ea37bb610956c75d51f7007a0a9b to your computer and use it in GitHub Desktop.
Save kbrock/8683ea37bb610956c75d51f7007a0a9b to your computer and use it in GitHub Desktop.
multi threaded example
#!/usr/bin/env ruby
require 'thread'
COLLECTOR_COUNT = 2 # define to run in separate threads. comment to run inline
class Db
def initialize
@mutex = Mutex.new
@data = {}
end
def []=(n, v) ; @mutex.synchronize { @data[n] = v } ; end
def [](n) ; @mutex.synchronize { @data[n] } ; end
def each(*args, &block) ; @mutex.synchronize { @data.each(*args, &block) } ; end
end
class Collector
def initialize(my_n, n_b) ; @my_n, @n_b = my_n, n_b ; end
def run(q, db)
while (task = q.get_task(@n_b)) do
process(task, db)
q.finished_task(task)
end
end
def process(task, db)
record = db[task[:id]] # find
sleep 0.1 # rand(0) # work TODO: hang or throw exceptions?
record[:last_updated] = Time.now
db[task[:id]] = record # save
end
end
class CoordinatorAPI
def initialize(c) ; @coordinator = c ; end
def get_task(n_b) ; @coordinator.get_task(n_b) ; end
def finished_task(task) @coordinator.finished_task(task) ; end
end
class Coordinator
INTERVAL = 2 # seconds
def initialize(db, inline_workers)
@db = db
@inline_workers = inline_workers
@data = Queue.new
end
def schedule
puts "", "W (#{@data.size}) #{@data.empty? ? "" : "WARNING non empty queue"}"
@db.each do |id, task|
t = if task[:last_updated].nil? ; "N"
elsif task[:last_updated] < (Time.now - 3) && task[:alert] ; "A"
elsif task[:last_updated] < (Time.now - 10) ; "+"
end
@data.push(:id => task[:id]) if t
print t # || (Time.now - lu).to_i
end
end
def block_until_done
while !@data.empty? # could use peek
puts "", "Q (#{@data.size})"
sleep(1)
end
end
def get_task(n_b) ; @data.pop(n_b) rescue nil ; end
def finished_task(task) ; print "." ; end # TODO: do something in @data to mark done
def run
loop do
start = Time.now
schedule
Collector.new(1, @inline_workers).run(api, db) if @inline_workers
block_until_done
sleep([INTERVAL - (Time.now - start), 0].max.round)
end
end
def api ; CoordinatorAPI.new(self) ; end
end
db = Db.new
100.times { |n|
db["vm#{n}"] = {:id => "vm#{n}", :alert => (n % 3 == 0), :last_updated => nil }
}
coordinator = Coordinator.new(db, !defined?(COLLECTOR_COUNT))
if defined?(COLLECTOR_COUNT)
collectors = COLLECTOR_COUNT.times.map do |n|
Thread.new(n+1) do |my_n|
Collector.new(my_n, false).run(coordinator.api, db)
end
end
end
coordinator.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment