Skip to content

Instantly share code, notes, and snippets.

@kirs
Last active October 21, 2022 10:35
Show Gist options
  • Save kirs/3a1bf876c9f15ecdba22274aaf490b73 to your computer and use it in GitHub Desktop.
Save kirs/3a1bf876c9f15ecdba22274aaf490b73 to your computer and use it in GitHub Desktop.
Interruptible jobs
require 'bundler/inline'
gemfile do
source "https://rubygems.org"
gem "resque"
gem "pry"
gem "activerecord"
gem 'mysql2'
end
require 'pry'
require 'csv'
require 'resque'
require 'active_record'
require 'mysql2'
require 'active_support/concern'
### Configure Resque
ENV["FORK_PER_JOB"] = 'false'
ENV['QUEUE'] = '*'
ENV['VERBOSE'] = 'true'
Resque.redis.namespace = "resque:kirs"
Resque.before_fork do
defined?(ActiveRecord::Base) and
ActiveRecord::Base.connection.disconnect!
end
Resque.after_fork do
defined?(ActiveRecord::Base) and
ActiveRecord::Base.establish_connection
end
# To simulate heavy ActiveRecord relation, use sample employees database from MySQL contrib
# https://dev.mysql.com/doc/employee/en/
class Employee < ActiveRecord::Base
self.primary_key = 'emp_no'
end
ActiveRecord::Base.establish_connection adapter: "mysql2", database: "employees"
class SafeInterruptedError < StandardError
attr_reader :offset
def initialize(offset)
@offset = offset
super()
end
end
module ResumableInterrupt
extend ActiveSupport::Concern
class_methods do
def around_perform_interuptible(*job_args)
yield
rescue SafeInterruptedError => e
# Ghetto way to append a hash on top of job args
retry_args = job_args.dup
if retry_args.last.is_a?(Hash)
retry_args.last["offset"] = e.offset
else
retry_args << { "offset" => e.offset }
end
# If interrupsed, re-enqueue with the new offset
puts "Pushing #{self} back with #{retry_args}"
Resque.enqueue(self, *retry_args)
end
end
def perform(*job_args)
# Ghetto way to fetch job options on top of original arguments
if job_args.last.is_a?(Hash)
options = job_args.last
else
options = {}
end
offset = options.fetch("offset", 0)
if offset
puts "Starting with offset: #{offset}"
end
enumerator = build_enumerator(*job_args)
unless enumerator.is_a?(Enumerable)
raise "#{self}: #build_enumerator returned non-enumerable object: #{enumerator}"
end
if enumerator.is_a?(ActiveRecord::Relation)
batch_size = 50
enumerator = enumerator.in_batches(of: batch_size, start: offset)
enumerator.each do |batch|
# Another great thing about providing people
# with the iteration API is that we can automatically do
# Podding::DatabaseStatus.await_healthy
if WorkerSupervisor.instance.shutdown?
raise SafeInterruptedError.new(offset)
end
puts "Processing with offset #{offset}"
enumerate(batch)
offset += batch_size
end
else
# In case of plain enumerable array (CSV),
# simply skip the first N rows according to the offset
enumerator.each_with_index do |r, i|
next if offset && i < offset
if WorkerSupervisor.instance.shutdown?
raise SafeInterruptedError.new(i)
end
puts "Processing #{i}"
enumerate(r)
end
end
end
end
class LongCsvJob
@queue = :low
include ResumableInterrupt
# to use instance #perform as normal people do
def self.perform(*args)
new.perform(*args)
end
# The contract interruptable job has to provide includes two methods:
# #build_enumerator and #enumerate
#
# #build_enumerator returns the enumerable object or ActiveRecord relation
# #enumerable is what we will call on every iteration.
#
# The new API allows developers to abstract the part of the job that does iteration,
# and for #pods that allows to hook code like `await_healthy` and check for shutdown signal
#
# When the worker receives QUIT signal, we finish the current iteration,
# store the current offset and re-enqueue job with the offset
#
# This works well with both CSV and AR relations.
# In case of CSV we just skip first N records depending on the offset,
# and with ActiveRecord we can use SQL offset.
def build_enumerator(csv_path, options = {})
csv = File.open(csv_path)
CSV.parse(csv, headers: false)
end
def enumerate(record)
puts "Processing #{record.first}"
sleep 0.1
end
end
class LongActiveRecordJob
@queue = :low
include ResumableInterrupt
# to use instance #perform as normal people do
def self.perform(*args)
new.perform(*args)
end
def build_enumerator(options = {})
Employee.where("hire_date > '1999-01-01'")
end
def enumerate(batch)
puts "Got batch of size #{batch.size}. #{batch.first(10).map(&:first_name)}"
sleep 0.1
end
end
# Sample jobs
Resque.enqueue(LongCsvJob, "/Users/kir/Downloads/huge.csv")
Resque.enqueue(LongActiveRecordJob)
class WorkerSupervisor
def self.instance
@instance ||= Resque::Worker.new
end
end
worker = WorkerSupervisor.instance
worker.prepare
worker.log "Starting worker #{self}"
worker.work(ENV['INTERVAL'] || 5) # interval, will block
$ ruby interuptible.rb
*** Starting worker main
*** Running before_first_fork hooks
*** got: (Job{low} | LongActiveRecordJob | [])
Starting with offset: 0
Processing with offset 0
Got batch of size 50. ["Lillian", "Hironoby", "Dietrich", "Aimee", "Neven", "Samphel", "JoAnne", "Malu", "Chaosheng", "Kensei"]
Processing with offset 50
Got batch of size 50. ["Elvia", "Tianruo", "Herbert", "Uta", "Kenton", "Anneli", "Leaf", "Shaz", "Breannda", "Sadun"]
Processing with offset 100
Got batch of size 50. ["Weiru", "Magy", "Guiseppe", "Alejandro", "Leon", "Gudjon", "Fumitake", "Martial", "Irena", "Teunis"]
Processing with offset 150
Got batch of size 50. ["Mokhtar", "Along", "Geoff", "Mantis", "Mori", "Bogdan", "Jianhui", "Mart", "Kamakshi", "Gad"]
[...]
Processing with offset 900
Got batch of size 50. ["Karlis", "JiYoung", "Marit", "Raimond", "Kshitij", "Barna", "Oscal", "Gaetan", "Remmert", "Kirk"]
^C*** Exiting...
Pushing LongActiveRecordJob back with [{"offset"=>950}]
*** done: (Job{low} | LongActiveRecordJob | [])
$ ruby interuptible.rb
*** Starting worker main
*** Running before_first_fork hooks
*** got: (Job{low} | LongActiveRecordJob | [{"offset"=>950}])
Starting with offset: 950
Processing with offset 950
Got batch of size 50. ["Lillian", "Hironoby", "Dietrich", "Aimee", "Neven", "Samphel", "JoAnne", "Malu", "Chaosheng", "Kensei"]
Processing with offset 1000
Got batch of size 50. ["Elvia", "Tianruo", "Herbert", "Uta", "Kenton", "Anneli", "Leaf", "Shaz", "Breannda", "Sadun"]
Processing with offset 1050
Got batch of size 50. ["Weiru", "Magy", "Guiseppe", "Alejandro", "Leon", "Gudjon", "Fumitake", "Martial", "Irena", "Teunis"]
Processing with offset 1100
Got batch of size 50. ["Mokhtar", "Along", "Geoff", "Mantis", "Mori", "Bogdan", "Jianhui", "Mart", "Kamakshi", "Gad"]
Processing with offset 1150
Got batch of size 50. ["Christophe", "Pintsang", "Saniya", "Mansur", "Parke", "Sumali", "Vishv", "Kazuhisa", "Anwar", "Anoosh"]
Processing with offset 1200
Got batch of size 50. ["Hatsukazu", "Seshu", "Ung", "Sven", "Guoxiang", "Gill", "Bernd", "Leszek", "Alois", "Tzu"]
^C*** Exiting...
Pushing LongActiveRecordJob back with [{"offset"=>1250}]
*** done: (Job{low} | LongActiveRecordJob | [{"offset"=>1250}])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment