Skip to content

Instantly share code, notes, and snippets.

@mdeniz
Last active July 12, 2017 15:12
Show Gist options
  • Save mdeniz/62d2f818e1be9266a23db40f2f0819bb to your computer and use it in GitHub Desktop.
Save mdeniz/62d2f818e1be9266a23db40f2f0819bb to your computer and use it in GitHub Desktop.

Option #3: batched events are processed in multipurpose jobs

Everything stays as it is and we make sure each class of job runs in its own queue and never concurrently

  • We will migrate to ActiveJob instead of running jobs directly on DelayedJobs.
  • We are not going to be merging jobs into one for having more concentrated multipurpose jobs, so, lets keep things like they are right now.
  • Adding code for failures while processing events wil be also not in this proposal but will be something mandatory to implement.

As we agreed for this option on to not run the same job concurrently... to have at most one job enqueued is something we may introduce. Could be something like this in every place we create and enque a job:

JobClass.perform_later if Delayed::Job.where(queue: JobClass.queue_as).count.zero?

Beside that, this could be an implementation approach for this option:

NotifyBackends

Actually there is no real job class for this, just ::Event::NotifyBackends.trigger_delayed_sent is being run from Clockwork. So, we will delete the model Event::NotifyBackends that doesn't make any sense and we will create a job class that will look like this:

# file src/app/api/jobs/notify_backend_job.rb
class NotifyBackendJob < ApplicationJob
  queue_as :notify_backend
  
  def perform
    Event::Base.not_in_queue.find_each(&:notify_backend)
  end
end

And in the Clockwork configuration file we will replace the old way of creating it (::Event::NotifyBackends.trigger_delayed_sent) by using ActiveJob, like this:

# line 39
NotifyBackendsJob.perform_later

ProjectLogRotate

Nowdays the job is not using ActiveJob and is not a descendant of ApplicationJob, so we will change this job class to look like this:

# file src/app/api/jobs/project_log_rotate_job.rb
class ProjectLogRotateJob < ApplicationJob
  queue_as :project_log_rotate
  
  def perform
    oldest_date = 10.days.ago
    cleanup_old_log_entries(oldest_date)
    cleanup_old_events(oldest_date)
    process_newer_events
    true
  end
  
  private
  
  def cleanup_old_log_entries(date)
    ProjectLogEntry.clean_older_than(date)
  end
  
  def cleanup_old_events(date)
    Event::Base.where(project_logged: false).where(["created_at < ?", date]).update_all(project_logged: true)
  end
  
  def process_newer_events
    # Instead of calculating event_types we can define true as default value for project_logged in Event::Base
    # and override it with false at Event::Package and Event::Project classes
    event_classes = [Event::Package, Event::Project]
    event_types = event_classes.flat_map(&:descendants).map(&:name) 
    
    Event::Base.where(project_logged: false, eventtype: event_types).find_in_batches(batch_size: 10000) do |events_batch|
      events_batch.each do |event|
        entry = ProjectLogEntry.create_from(event)
        event.update_attributes(project_logged: true) if entry.persisted?
      end
    end
  end
end

And in the Clockwork configuration file we will replace the old way of creating it (ProjectLogRotate.new.delay(queue: 'project_log_rotate').perform) by using ActiveJob, like this:

# line 61
ProjectLogRotateJob.perform_later

SendEventEmails

This is one of the multipurpose jobs we already have. Nowdays the job is not using ActiveJob and is not a descendant of ApplicationJob, so we will change this job class to look like this:

# file src/app/api/jobs/send_event_emails_job.rb
class SendEventEmailsJob < ApplicationJob
  queue_as :mailers
  
  # The rest will be the same as we have in the src/app/api/jobs/send_event_emails.rb file
  ....
end

And in the Clockwork configuration file we will replace the old way of creating it (SendEventEmails.new.delay(queue: 'mailers').perform').perform) by using ActiveJob, like this:

# line 40
SendEventEmailsJob.perform_later

UpdateNotificationEvents

Actually there is no real job class for this, just UpdateNotificationEvents.new.perform is being run from Clockwork, so is also not running asynchronously. We will move the file for the model UpdateNotificationEvents to be an ActiveJob class, also we can now remove the semaphore used (as this job is not concurrent anymore), it will look like this:

# file src/app/api/jobs/update_notification_events_job.rb
class UpdateNotificationEventsJob < ApplicationJob
  queue_as :update_notification_events
  
  def perform
    User.current ||= User.get_default_admin # Login as an admin so we can see all projects 

    loop do
      nr = BackendInfo.lastnotification_nr
      nr = 1 if nr.zero? # 0 is a bad start
        
      begin
        @last = Xmlhash.parse(Backend::Connection.get("/lastnotifications?start=#{nr}&block=1").body)
      rescue Net::ReadTimeout, EOFError, ActiveXML::Transport::Error
        return
      end
        
      # Tt's not supposed to happen, but if more than one process consume the backend data it could happen
      if @last['sync'] == 'lost'
        BackendInfo.lastnotification_nr = Integer(@last['next'])
        return
      end
        
      create_events

      break if !defined?(@last) || @last['limit_reached'].blank?
    end
  end
  
  private
  
  def create_events
    # This method stays the same but now is private
    ....
  end
end

And in the Clockwork configuration file we will replace the old way of creating it (UpdateNotificationEvents.new.perform) by using an asynchronous ActiveJob job, like this:

# lines 43-48
# 17 seconds is maybe now not the best timing, so it can be changed
every(17.seconds, 'fetch notifications') do
  UpdateNotificationEventsJob.perform_later
end

CreateJob

We will convert this base class into an ActiveJob class, something like this:

# file src/app/api/jobs/create_job.rb
class CreateJobX < ApplicationJob
  attr_accessor :event
  
  def initialize(event)
    super
    self.event = event
  end

  def after(job)
    event = job.payload_object.event
    # in test suite the undone_jobs are 0 as the delayed jobs are not delayed
    event.with_lock do
      event.undone_jobs -= 1
      event.save!
    end
  end

  def error(job, exception)
    if Rails.env.test?
      # make debug output useful in test suite, not just showing backtrace to Airbrake
      Rails.logger.debug "ERROR: #{exception.inspect}: #{exception.backtrace}"
      puts exception.inspect, exception.backtrace
      return
    end
    Airbrake.notify(exception, {failed_job: job.inspect})
  end
end

And its subclasses will be also changed accordingly:

UpdateBackendInfos

# file src/app/api/jobs/update_backend_infos_job.rb
class UpdateBackendInfosJob < CreateJob
  queue_as :update_backend_infos

  # The rest remains the same, without redefining the event attribute
  ....
end

UpdateReleasedBinaries

# file src/app/api/jobs/update_released_binaries_job.rb
class UpdateReleasedBinariesJob < CreateJob
  queue_as :releasetracking

  def perform
    pl = event.payload
    repo = Repository.find_by_project_and_name(pl['project'], pl['repo'])
    return unless repo
    BinaryRelease.update_binary_releases(repo, pl['payload'], event.created_at)
  end
end

Changing the DelayedJob control

Workers will attend one queue, so no default queue will be attended right now (until is needed for another jobs).

# File dist/obsapidelayed lines 63-69 will be now
    run_in_api script/delayed_job.api.rb --queue=quick start -n $NUM
    run_in_api script/delayed_job.api.rb --queue=releasetracking start -i 1000
    run_in_api script/delayed_job.api.rb --queue=issuetracking start -i 1010
    run_in_api script/delayed_job.api.rb --queue=mailers start -i 1020
    # The default queue used by ActiveJob (jobs scheduled with .perform_later)
    run_in_api script/delayed_job.api.rb --queue=default start -i 1030
    run_in_api script/delayed_job.api.rb --queue=project_log_rotate start -i 1040
    run_in_api script/delayed_job.api.rb --queue=notify_backend start -i 1050
    run_in_api script/delayed_job.api.rb --queue=update_notification_events start -i 1060
    run_in_api script/delayed_job.api.rb --queue=update_backend_infos start -i 1070
    run_in_api script/delayed_job.api.rb --queue=releasetracking start -i 1080
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment