Skip to content

Instantly share code, notes, and snippets.

@marzdgzmn
Created April 5, 2018 06:28
Show Gist options
  • Save marzdgzmn/9814f23f38918f8af446a26254b0e98d to your computer and use it in GitHub Desktop.
Save marzdgzmn/9814f23f38918f8af446a26254b0e98d to your computer and use it in GitHub Desktop.
module RiseMirrorExceptions
class RiseError < StandardError
end
class JobAlreadyRunningError < RiseError
end
end
******************************************************
def initialize
@beanstalk = Beaneater.new(Beaneater.configuration.beanstalkd_url)
@pool = Concurrent::FixedThreadPool.new(10) #http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ProcessorCounter.html#processor_count-instance_method
@mirror_observer = MirrorObserver.new
@test_observer = TestObserver.new
@running_jobs = Concurrent::Array.new
end
def run
RiseMirrorApp::LOG.debug "RsyncSupervisor running . . ."
begin
run_beanstalkd_job_processor
rescue RiseMirrorExceptions::JobAlreadyRunningError => e
RiseMirrorApp::LOG.debug "wakanda"
RiseMirrorApp::LOG.debug e
rescue Beaneater::AbortProcessingError
shutdown!
end
end
def shutdown!
puts "Shutting down . . ."
@beanstalk.close
puts 'beanstalk connection closed'
@pool.kill
puts "shutting down? #{@pool.shutdown?}"
@pool.wait_for_termination
puts "pool is running? #{@pool.running?}"
exit
end
private
def run_beanstalkd_job_processor
@beanstalk.jobs.register('mirror') do |job| #Handle 'Failed parsing to JSON object error'
process_job(job)
end
begin
@beanstalk.jobs.process!
rescue Interrupt
puts 'Interrupt received'
raise Beaneater::AbortProcessingError
end
end
def process_job(job)
RiseMirrorApp::LOG.debug "Processing job . . ."
job_body = job.body
if job.body.nil?
job.delete
return
end
if @running_jobs.include? job_body[:NAME].to_sym
RiseMirrorApp::LOG.debug "wakekeke"
raise RiseMirrorExceptions::JobAlreadyRunningError
RiseMirrorApp::LOG.debug "wakakaka"
end
@running_jobs << job_body[:NAME].to_sym
future = Concurrent::Future.new(executor: @pool) {
@mirror_observer.mirror_rsync_start(Time.now, job_body[:NAME])
MirrorRsync.run(job.body)
}
future.add_observer(@mirror_observer, :mirror_rsync_end)
future.execute
puts "rejected!" if future.rejected?
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment