Last active
October 21, 2022 10:35
-
-
Save kirs/3a1bf876c9f15ecdba22274aaf490b73 to your computer and use it in GitHub Desktop.
Interruptible jobs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bundler/inline' | |
gemfile do | |
source "https://rubygems.org" | |
gem "resque" | |
gem "pry" | |
gem "activerecord" | |
gem 'mysql2' | |
end | |
require 'pry' | |
require 'csv' | |
require 'resque' | |
require 'active_record' | |
require 'mysql2' | |
require 'active_support/concern' | |
### Configure Resque | |
ENV["FORK_PER_JOB"] = 'false' | |
ENV['QUEUE'] = '*' | |
ENV['VERBOSE'] = 'true' | |
Resque.redis.namespace = "resque:kirs" | |
Resque.before_fork do | |
defined?(ActiveRecord::Base) and | |
ActiveRecord::Base.connection.disconnect! | |
end | |
Resque.after_fork do | |
defined?(ActiveRecord::Base) and | |
ActiveRecord::Base.establish_connection | |
end | |
# To simulate heavy ActiveRecord relation, use sample employees database from MySQL contrib | |
# https://dev.mysql.com/doc/employee/en/ | |
class Employee < ActiveRecord::Base | |
self.primary_key = 'emp_no' | |
end | |
ActiveRecord::Base.establish_connection adapter: "mysql2", database: "employees" | |
class SafeInterruptedError < StandardError | |
attr_reader :offset | |
def initialize(offset) | |
@offset = offset | |
super() | |
end | |
end | |
module ResumableInterrupt | |
extend ActiveSupport::Concern | |
class_methods do | |
def around_perform_interuptible(*job_args) | |
yield | |
rescue SafeInterruptedError => e | |
# Ghetto way to append a hash on top of job args | |
retry_args = job_args.dup | |
if retry_args.last.is_a?(Hash) | |
retry_args.last["offset"] = e.offset | |
else | |
retry_args << { "offset" => e.offset } | |
end | |
# If interrupsed, re-enqueue with the new offset | |
puts "Pushing #{self} back with #{retry_args}" | |
Resque.enqueue(self, *retry_args) | |
end | |
end | |
def perform(*job_args) | |
# Ghetto way to fetch job options on top of original arguments | |
if job_args.last.is_a?(Hash) | |
options = job_args.last | |
else | |
options = {} | |
end | |
offset = options.fetch("offset", 0) | |
if offset | |
puts "Starting with offset: #{offset}" | |
end | |
enumerator = build_enumerator(*job_args) | |
unless enumerator.is_a?(Enumerable) | |
raise "#{self}: #build_enumerator returned non-enumerable object: #{enumerator}" | |
end | |
if enumerator.is_a?(ActiveRecord::Relation) | |
batch_size = 50 | |
enumerator = enumerator.in_batches(of: batch_size, start: offset) | |
enumerator.each do |batch| | |
# Another great thing about providing people | |
# with the iteration API is that we can automatically do | |
# Podding::DatabaseStatus.await_healthy | |
if WorkerSupervisor.instance.shutdown? | |
raise SafeInterruptedError.new(offset) | |
end | |
puts "Processing with offset #{offset}" | |
enumerate(batch) | |
offset += batch_size | |
end | |
else | |
# In case of plain enumerable array (CSV), | |
# simply skip the first N rows according to the offset | |
enumerator.each_with_index do |r, i| | |
next if offset && i < offset | |
if WorkerSupervisor.instance.shutdown? | |
raise SafeInterruptedError.new(i) | |
end | |
puts "Processing #{i}" | |
enumerate(r) | |
end | |
end | |
end | |
end | |
class LongCsvJob | |
@queue = :low | |
include ResumableInterrupt | |
# to use instance #perform as normal people do | |
def self.perform(*args) | |
new.perform(*args) | |
end | |
# The contract interruptable job has to provide includes two methods: | |
# #build_enumerator and #enumerate | |
# | |
# #build_enumerator returns the enumerable object or ActiveRecord relation | |
# #enumerable is what we will call on every iteration. | |
# | |
# The new API allows developers to abstract the part of the job that does iteration, | |
# and for #pods that allows to hook code like `await_healthy` and check for shutdown signal | |
# | |
# When the worker receives QUIT signal, we finish the current iteration, | |
# store the current offset and re-enqueue job with the offset | |
# | |
# This works well with both CSV and AR relations. | |
# In case of CSV we just skip first N records depending on the offset, | |
# and with ActiveRecord we can use SQL offset. | |
def build_enumerator(csv_path, options = {}) | |
csv = File.open(csv_path) | |
CSV.parse(csv, headers: false) | |
end | |
def enumerate(record) | |
puts "Processing #{record.first}" | |
sleep 0.1 | |
end | |
end | |
class LongActiveRecordJob | |
@queue = :low | |
include ResumableInterrupt | |
# to use instance #perform as normal people do | |
def self.perform(*args) | |
new.perform(*args) | |
end | |
def build_enumerator(options = {}) | |
Employee.where("hire_date > '1999-01-01'") | |
end | |
def enumerate(batch) | |
puts "Got batch of size #{batch.size}. #{batch.first(10).map(&:first_name)}" | |
sleep 0.1 | |
end | |
end | |
# Sample jobs | |
Resque.enqueue(LongCsvJob, "/Users/kir/Downloads/huge.csv") | |
Resque.enqueue(LongActiveRecordJob) | |
class WorkerSupervisor | |
def self.instance | |
@instance ||= Resque::Worker.new | |
end | |
end | |
worker = WorkerSupervisor.instance | |
worker.prepare | |
worker.log "Starting worker #{self}" | |
worker.work(ENV['INTERVAL'] || 5) # interval, will block |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ ruby interuptible.rb | |
*** Starting worker main | |
*** Running before_first_fork hooks | |
*** got: (Job{low} | LongActiveRecordJob | []) | |
Starting with offset: 0 | |
Processing with offset 0 | |
Got batch of size 50. ["Lillian", "Hironoby", "Dietrich", "Aimee", "Neven", "Samphel", "JoAnne", "Malu", "Chaosheng", "Kensei"] | |
Processing with offset 50 | |
Got batch of size 50. ["Elvia", "Tianruo", "Herbert", "Uta", "Kenton", "Anneli", "Leaf", "Shaz", "Breannda", "Sadun"] | |
Processing with offset 100 | |
Got batch of size 50. ["Weiru", "Magy", "Guiseppe", "Alejandro", "Leon", "Gudjon", "Fumitake", "Martial", "Irena", "Teunis"] | |
Processing with offset 150 | |
Got batch of size 50. ["Mokhtar", "Along", "Geoff", "Mantis", "Mori", "Bogdan", "Jianhui", "Mart", "Kamakshi", "Gad"] | |
[...] | |
Processing with offset 900 | |
Got batch of size 50. ["Karlis", "JiYoung", "Marit", "Raimond", "Kshitij", "Barna", "Oscal", "Gaetan", "Remmert", "Kirk"] | |
^C*** Exiting... | |
Pushing LongActiveRecordJob back with [{"offset"=>950}] | |
*** done: (Job{low} | LongActiveRecordJob | []) | |
$ ruby interuptible.rb | |
*** Starting worker main | |
*** Running before_first_fork hooks | |
*** got: (Job{low} | LongActiveRecordJob | [{"offset"=>950}]) | |
Starting with offset: 950 | |
Processing with offset 950 | |
Got batch of size 50. ["Lillian", "Hironoby", "Dietrich", "Aimee", "Neven", "Samphel", "JoAnne", "Malu", "Chaosheng", "Kensei"] | |
Processing with offset 1000 | |
Got batch of size 50. ["Elvia", "Tianruo", "Herbert", "Uta", "Kenton", "Anneli", "Leaf", "Shaz", "Breannda", "Sadun"] | |
Processing with offset 1050 | |
Got batch of size 50. ["Weiru", "Magy", "Guiseppe", "Alejandro", "Leon", "Gudjon", "Fumitake", "Martial", "Irena", "Teunis"] | |
Processing with offset 1100 | |
Got batch of size 50. ["Mokhtar", "Along", "Geoff", "Mantis", "Mori", "Bogdan", "Jianhui", "Mart", "Kamakshi", "Gad"] | |
Processing with offset 1150 | |
Got batch of size 50. ["Christophe", "Pintsang", "Saniya", "Mansur", "Parke", "Sumali", "Vishv", "Kazuhisa", "Anwar", "Anoosh"] | |
Processing with offset 1200 | |
Got batch of size 50. ["Hatsukazu", "Seshu", "Ung", "Sven", "Guoxiang", "Gill", "Bernd", "Leszek", "Alois", "Tzu"] | |
^C*** Exiting... | |
Pushing LongActiveRecordJob back with [{"offset"=>1250}] | |
*** done: (Job{low} | LongActiveRecordJob | [{"offset"=>1250}]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment