Skip to content

Instantly share code, notes, and snippets.

@marzdgzmn
Created April 2, 2018 05:28
Show Gist options
  • Save marzdgzmn/44b8efbbfb6101881a3571d21c59a0b9 to your computer and use it in GitHub Desktop.
Save marzdgzmn/44b8efbbfb6101881a3571d21c59a0b9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
require 'concurrent'
require 'beaneater'
require 'irb'
require 'pry'
require_relative '../lib/mirror_rsync'
require_relative '../lib/beaneater_config'
require_relative '../lib/mirror_observer'
require_relative '../lib/rise_mirror_app'
#Handle interruptions, e.g. update mirror status to stopped
class RsyncSupervisor
include RiseMirrorApp
Thread.abort_on_exception=true
#Concurrent.use_stdlib_logger(Logger::DEBUG)
def initialize
@beanstalk = Beaneater.new(Beaneater.configuration.beanstalkd_url)
@pool = Concurrent::FixedThreadPool.new(10, auto_terminate: false) #http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ProcessorCounter.html#processor_count-instance_method
@mirror_observer = MirrorObserver.new
#define an instance variable that will be shared by all threads, adding and removing jobs making sure there isn't a duplicate
#or should this be handled by beanstalkd?
end
def run
@beanstalk.jobs.register('mirror') do |job| #Handle 'Failed parsing to JSON object error'
RiseMirrorApp::LOG.debug "Processing #{job}"
process_job(job)
end
@beanstalk.jobs.process!
end
private
def process_job(job)
job_body = job.body
if job.body.nil?
job.delete
return
end
future = Concurrent::Future.new(executor: @pool) {
@mirror_observer.mirror_rsync_start(Time.now, job_body[:NAME])
MirrorRsync.run(job.body)
}
future.add_observer(@mirror_observer, :mirror_rsync_end)
future.execute
end
#SIGTERM > wait(90 seconds) > SIGKILL
trap(:INT) do
#binding.pry
puts 'int terminating . . .'
@pool.shutdown
puts 'killed threads'
@pool.wait_for_termination
puts 'terminated pool'
@beanstalk.close
puts 'beanstalk closed'
puts @beanstalk.connection
puts 'hmmmm'
exit
end
#SIGTERM > wait(90 seconds) > SIGKILL
trap(:TERM) do
puts 'terminating . . .'
#binding.pry
@pool.kill
#binding.pry
puts 'killed threads'
@pool.wait_for_termination
puts 'terminated pool'
@beanstalk.close
puts 'beanstalk closed'
puts @beanstalk.connection
puts 'hmmmm'
end
end
RsyncSupervisor.new.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment