Skip to content

Instantly share code, notes, and snippets.

@marzdgzmn
Created April 23, 2018 09:37
Show Gist options
  • Save marzdgzmn/4266a16d78cd3baf8b3ba76fa0db3ae2 to your computer and use it in GitHub Desktop.
Save marzdgzmn/4266a16d78cd3baf8b3ba76fa0db3ae2 to your computer and use it in GitHub Desktop.
require 'thread'
require 'monitor'
require 'singleton'
require_relative 'mirror_sync'
require_relative 'mirror_observer'
module MirrorConcurrentUtil
THREAD_COUNT = 10
QUEUE_SIZE = 30
class MirrorThreadPool
include Singleton
attr_accessor :thread_pool, :work_queue, :threads_available
def initialize
@thread_pool = Array.new(MirrorConcurrentUtil::THREAD_COUNT)
@thread_pool.extend(MonitorMixin)
@work_queue = SizedQueue.new(MirrorConcurrentUtil::QUEUE_SIZE)
@threads_available = @thread_pool.new_cond
end
end
# Unnecessary since we have complete control on the jobs we are queueing
# class MirrorProducer
# def self.run(jobs)
# thread_pool = MirrorThreadPool.instance.thread_pool
# threads_available = MirrorThreadPool.instance.threads_available
# work_queue = MirrorThreadPool.instance.work_queue
# Thread.new do
# jobs.each do |job|
# work_queue << job
# thread_pool.synchronize do
# threads_available.signal
# end
# end
# end
# end
# end
class MirrorConsumer
extend MirrorSync
def self.run
thread_pool = MirrorThreadPool.instance.thread_pool
threads_available = MirrorThreadPool.instance.threads_available
work_queue = MirrorThreadPool.instance.work_queue
Thread.new do
loop do
break if work_queue.length == 0
available_thread_index = nil
thread_pool.synchronize do
threads_available.wait_while do
thread_pool.none? { |thread| available?(thread) }
puts 'none none none' if thread_pool.none? { |thread| available?(thread) }
end
available_thread_index = thread_pool.rindex { |thread| available?(thread) }
end
job = work_queue.pop
thread_pool[available_thread_index] = Thread.new(job) do
MirrorObserver.sync_start(Time.now, job[:name])
result = MirrorSync::Strategies::MirrorRsync.sync(job)
MirrorObserver.sync_end(Time.now, result[:mirror_name], result[:rsync_result])
Thread.current["finished"] = true
thread_pool.synchronize do
threads_available.signal
end
end
end
end
end
def self.available?(thread)
thread.nil? || !thread.status || !thread['finished'].nil?
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment