Skip to content

Instantly share code, notes, and snippets.

@bensheldon
Last active October 13, 2021 14:01
Show Gist options
  • Save bensheldon/5d3add207b7c5e689682c29ebbc506ab to your computer and use it in GitHub Desktop.
Save bensheldon/5d3add207b7c5e689682c29ebbc506ab to your computer and use it in GitHub Desktop.
# frozen_string_literal: true
require "bundler/inline"
gemfile(true) do
source "https://rubygems.org"
git_source(:github) { |repo| "https://github.com/#{repo}.git" }
gem "rails", '~>6.1'
end
require "active_job"
module BatchExtension
extend ActiveSupport::Concern
included do
cattr_accessor :after_batch_config, instance_accessor: false, default: {}
attr_accessor :batch_id
attr_accessor :batch_key
# Disappointing that jobs can't carry their full execution state
attr_accessor :error_message
end
class_methods do
def after_batch(key, &block)
self.after_batch_config[key] = block
end
def perform_later_in_batch(key, jobs_or_args = [])
jobs = jobs_or_args.map do |job_or_args|
job_or_args.is_a?(ActiveJob::Base) ? job_or_args : new(*job_or_args)
end
queue_adapter.enqueue_batch(key, jobs)
end
end
end
module ActiveJob
module QueueAdapters
class AsyncBatchAdapter < AsyncAdapter
def enqueue_batch(key, jobs)
# Ignore this implementation; it's purely to make the interface work
batch_id = SecureRandom.uuid
jobs.each do |job|
job.batch_key = key
job.batch_id = batch_id
end
Concurrent::ScheduledTask.execute(0, args: [jobs]) do |jobs|
jobs.each do |job|
begin
job.perform_now
rescue => e
job.error_message = e.to_s
end
end
jobs.last.class.after_batch_config[key]&.call(jobs)
end
jobs
end
end
end
end
class ApplicationJob < ActiveJob::Base
self.queue_adapter = :async_batch
include BatchExtension
end
### --- INTERFACE BELOW -- ###
class PrepareProduce < ApplicationJob
after_batch(:prepare_fruit_salad) do |jobs|
prepared_fruits = jobs.reject { |job| job.error_message.present? }.map { |job| job.arguments.first }
MakeSalad.perform_later(prepared_fruits)
discarded_fruits = jobs.select { |job| job.error_message.present? }.map { |job| job.arguments.first }
puts "Discarding #{discarded_fruits.join(', ')}"
end
def perform(produce, count = 1)
if produce.in? ["apple", "banana", "grape"]
puts "Preparing #{count} #{produce}"
else
raise "Ewww, not a #{produce}!"
end
end
end
class MakeSalad < ApplicationJob
def perform(produces)
puts "Making salad with #{produces.join(', ')}"
end
end
batch_objects = [["apple", 4], ["shoe", 2]]
batch_objects << PrepareProduce.new( "banana")
batch_objects << PrepareProduce.new( "grape", 2)
jobs = PrepareProduce.perform_later_in_batch(:prepare_fruit_salad, batch_objects)
puts "Enqueued #{jobs.count} jobs with batch_id: #{jobs.first.batch_id}"
sleep 2 # wait for batch to finish in the background
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment