Skip to content

Instantly share code, notes, and snippets.

@marzdgzmn
Created April 2, 2018 20:21
Show Gist options
  • Save marzdgzmn/f4e1a15608b9a23e41afb97197e7ba78 to your computer and use it in GitHub Desktop.
Save marzdgzmn/f4e1a15608b9a23e41afb97197e7ba78 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
require 'concurrent'
require 'beaneater'
require 'irb'
require 'pry'
require_relative '../lib/mirror_rsync'
require_relative '../lib/beaneater_config'
require_relative '../lib/mirror_observer'
require_relative '../lib/rise_mirror_app'
require_relative '../lib/test'
require_relative '../lib/test_observer'
#Handle interruptions, e.g. update mirror status to stopped
class RsyncSupervisor
include RiseMirrorApp
Thread.abort_on_exception=true
Concurrent.use_stdlib_logger(Logger::DEBUG)
def initialize
RiseMirrorApp::LOG.debug "initialize RsyncSupervisor"
@beanstalk = Beaneater.new(Beaneater.configuration.beanstalkd_url)
@pool = Concurrent::FixedThreadPool.new(10) #http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ProcessorCounter.html#processor_count-instance_method
@mirror_observer = MirrorObserver.new
@test_observer = TestObserver.new
#define an instance variable that will be shared by all threads, adding and removing jobs making sure there isn't a duplicate
#or should this be handled by beanstalkd?
end
def run
RiseMirrorApp::LOG.debug "RsyncSupervisor running . . ."
future = Concurrent::Future.new(executor: @pool) {
RiseMirrorApp::LOG.debug "inside future"
@test_observer.start()
Test.run
}
RiseMirrorApp::LOG.debug "hmmm"
future.execute
loop do
sleep(2)
end
# @beanstalk.jobs.register('mirror') do |job| #Handle 'Failed parsing to JSON object error'
# RiseMirrorApp::LOG.debug "Processing #{job}"
# process_job(job)
# end
# @beanstalk.jobs.process!
end
def shut_down
puts "shut_down"
@pool.shutdown
puts "pool shut_down"
end
private
def process_job(job)
job_body = job.body
if job.body.nil?
job.delete
return
end
future = Concurrent::Future.new(executor: @pool) {
@mirror_observer.mirror_rsync_start(Time.now, job_body[:NAME])
MirrorRsync.run(job.body)
}
future.add_observer(@mirror_observer, :mirror_rsync_end)
future.execute
end
#SIGTERM > wait(90 seconds) > SIGKILL
trap(:INT) do
#binding.pry
puts @pool.nil?
puts 'int terminating . . .'
shut_down
puts 'killed threads'
@pool.wait_for_termination
puts 'terminated pool'
@beanstalk.close
puts 'beanstalk closed'
puts @beanstalk.connection
puts 'hmmmm'
exit
end
#SIGTERM > wait(90 seconds) > SIGKILL
trap(:TERM) do
puts 'terminating . . .'
#binding.pry
@pool.kill
#binding.pry
puts 'killed threads'
@pool.wait_for_termination
puts 'terminated pool'
@beanstalk.close
puts 'beanstalk closed'
puts @beanstalk.connection
puts 'hmmmm'
end
end
RsyncSupervisor.new.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment