Everything stays as it is and we make sure each class of job runs in its own queue and never concurrently
- We will migrate to ActiveJob instead of running jobs directly on DelayedJobs.
- We are not going to be merging jobs into one for having more concentrated multipurpose jobs, so, lets keep things like they are right now.
- Adding code for failures while processing events wil be also not in this proposal but will be something mandatory to implement.
As we agreed for this option on to not run the same job concurrently... to have at most one job enqueued is something we may introduce. Could be something like this in every place we create and enque a job:
JobClass.perform_later if Delayed::Job.where(queue: JobClass.queue_as).count.zero?
Beside that, this could be an implementation approach for this option:
Actually there is no real job class for this, just ::Event::NotifyBackends.trigger_delayed_sent
is being run from Clockwork. So, we will delete the model Event::NotifyBackends
that doesn't make any sense and we will create a job class that will look like this:
# file src/app/api/jobs/notify_backend_job.rb
class NotifyBackendJob < ApplicationJob
queue_as :notify_backend
def perform
Event::Base.not_in_queue.find_each(&:notify_backend)
end
end
And in the Clockwork configuration file we will replace the old way of creating it (::Event::NotifyBackends.trigger_delayed_sent
) by using ActiveJob, like this:
# line 39
NotifyBackendsJob.perform_later
Nowdays the job is not using ActiveJob and is not a descendant of ApplicationJob
, so we will change this job class to look like this:
# file src/app/api/jobs/project_log_rotate_job.rb
class ProjectLogRotateJob < ApplicationJob
queue_as :project_log_rotate
def perform
oldest_date = 10.days.ago
cleanup_old_log_entries(oldest_date)
cleanup_old_events(oldest_date)
process_newer_events
true
end
private
def cleanup_old_log_entries(date)
ProjectLogEntry.clean_older_than(date)
end
def cleanup_old_events(date)
Event::Base.where(project_logged: false).where(["created_at < ?", date]).update_all(project_logged: true)
end
def process_newer_events
# Instead of calculating event_types we can define true as default value for project_logged in Event::Base
# and override it with false at Event::Package and Event::Project classes
event_classes = [Event::Package, Event::Project]
event_types = event_classes.flat_map(&:descendants).map(&:name)
Event::Base.where(project_logged: false, eventtype: event_types).find_in_batches(batch_size: 10000) do |events_batch|
events_batch.each do |event|
entry = ProjectLogEntry.create_from(event)
event.update_attributes(project_logged: true) if entry.persisted?
end
end
end
end
And in the Clockwork configuration file we will replace the old way of creating it (ProjectLogRotate.new.delay(queue: 'project_log_rotate').perform
) by using ActiveJob, like this:
# line 61
ProjectLogRotateJob.perform_later
This is one of the multipurpose jobs we already have. Nowdays the job is not using ActiveJob and is not a descendant of ApplicationJob
, so we will change this job class to look like this:
# file src/app/api/jobs/send_event_emails_job.rb
class SendEventEmailsJob < ApplicationJob
queue_as :mailers
# The rest will be the same as we have in the src/app/api/jobs/send_event_emails.rb file
....
end
And in the Clockwork configuration file we will replace the old way of creating it (SendEventEmails.new.delay(queue: 'mailers').perform').perform
) by using ActiveJob, like this:
# line 40
SendEventEmailsJob.perform_later
Actually there is no real job class for this, just UpdateNotificationEvents.new.perform
is being run from Clockwork, so is also not running asynchronously. We will move the file for the model UpdateNotificationEvents
to be an ActiveJob class, also we can now remove the semaphore used (as this job is not concurrent anymore), it will look like this:
# file src/app/api/jobs/update_notification_events_job.rb
class UpdateNotificationEventsJob < ApplicationJob
queue_as :update_notification_events
def perform
User.current ||= User.get_default_admin # Login as an admin so we can see all projects
loop do
nr = BackendInfo.lastnotification_nr
nr = 1 if nr.zero? # 0 is a bad start
begin
@last = Xmlhash.parse(Backend::Connection.get("/lastnotifications?start=#{nr}&block=1").body)
rescue Net::ReadTimeout, EOFError, ActiveXML::Transport::Error
return
end
# Tt's not supposed to happen, but if more than one process consume the backend data it could happen
if @last['sync'] == 'lost'
BackendInfo.lastnotification_nr = Integer(@last['next'])
return
end
create_events
break if !defined?(@last) || @last['limit_reached'].blank?
end
end
private
def create_events
# This method stays the same but now is private
....
end
end
And in the Clockwork configuration file we will replace the old way of creating it (UpdateNotificationEvents.new.perform
) by using an asynchronous ActiveJob job, like this:
# lines 43-48
# 17 seconds is maybe now not the best timing, so it can be changed
every(17.seconds, 'fetch notifications') do
UpdateNotificationEventsJob.perform_later
end
We will convert this base class into an ActiveJob class, something like this:
# file src/app/api/jobs/create_job.rb
class CreateJobX < ApplicationJob
attr_accessor :event
def initialize(event)
super
self.event = event
end
def after(job)
event = job.payload_object.event
# in test suite the undone_jobs are 0 as the delayed jobs are not delayed
event.with_lock do
event.undone_jobs -= 1
event.save!
end
end
def error(job, exception)
if Rails.env.test?
# make debug output useful in test suite, not just showing backtrace to Airbrake
Rails.logger.debug "ERROR: #{exception.inspect}: #{exception.backtrace}"
puts exception.inspect, exception.backtrace
return
end
Airbrake.notify(exception, {failed_job: job.inspect})
end
end
And its subclasses will be also changed accordingly:
# file src/app/api/jobs/update_backend_infos_job.rb
class UpdateBackendInfosJob < CreateJob
queue_as :update_backend_infos
# The rest remains the same, without redefining the event attribute
....
end
# file src/app/api/jobs/update_released_binaries_job.rb
class UpdateReleasedBinariesJob < CreateJob
queue_as :releasetracking
def perform
pl = event.payload
repo = Repository.find_by_project_and_name(pl['project'], pl['repo'])
return unless repo
BinaryRelease.update_binary_releases(repo, pl['payload'], event.created_at)
end
end
Workers will attend one queue, so no default queue will be attended right now (until is needed for another jobs).
# File dist/obsapidelayed lines 63-69 will be now
run_in_api script/delayed_job.api.rb --queue=quick start -n $NUM
run_in_api script/delayed_job.api.rb --queue=releasetracking start -i 1000
run_in_api script/delayed_job.api.rb --queue=issuetracking start -i 1010
run_in_api script/delayed_job.api.rb --queue=mailers start -i 1020
# The default queue used by ActiveJob (jobs scheduled with .perform_later)
run_in_api script/delayed_job.api.rb --queue=default start -i 1030
run_in_api script/delayed_job.api.rb --queue=project_log_rotate start -i 1040
run_in_api script/delayed_job.api.rb --queue=notify_backend start -i 1050
run_in_api script/delayed_job.api.rb --queue=update_notification_events start -i 1060
run_in_api script/delayed_job.api.rb --queue=update_backend_infos start -i 1070
run_in_api script/delayed_job.api.rb --queue=releasetracking start -i 1080