Skip to content

Instantly share code, notes, and snippets.

@ixti
Last active June 5, 2021 01:16
Show Gist options
  • Save ixti/96bc64eba7966d1fd6482af738a02921 to your computer and use it in GitHub Desktop.
Save ixti/96bc64eba7966d1fd6482af738a02921 to your computer and use it in GitHub Desktop.
[POC] Sketch sidekiq bulk push enhancement
module Sidekiq
module Bulky
class Collector
def initialize(accumulator)
@accumulator = accumulator
end
def perform_async(*args)
@accumulator[nil] << args
end
def perform_in(interval, *args)
@accumulator[interval] << args
end
alias perform_at perform_in
end
def self.call(worker, accumulated, size: 100)
accumulated.each do |time, args|
payload = { 'class' => worker }
payload['at'] = time if time
args.each_slice(size) do |slice|
Sidekiq::Client.push_bulk(payload.merge('args' => slice))
end
end
end
end
module Worker
def self.bulky(**options)
accumulator = Hash.new { |h, k| h[k] = [] }
collector = Bulky::Collector.new(accumulator)
yield(collector).tap { Bulky.call(self, accumulator, **options) }
end
end
end
@ixti
Copy link
Author

ixti commented Jun 5, 2021

Usage

class SomeWorker
  include Sidekiq::Worker

  def perform(user_id); end
end

SomeWorker.bulky(size: 1000) do |worker|
  User.pluck(:id).each_with_index do |user_id, index|
    worker.perform_in((index % 7).days, user_id)
  end
end

@ixti
Copy link
Author

ixti commented Jun 5, 2021

NB

This is just a sketch, I did not tested it in any way, so it might contain typos. :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment