Skip to content

Instantly share code, notes, and snippets.

@RadoMark
Last active January 11, 2018 11:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RadoMark/5d7742876e088beee73970fd583e6135 to your computer and use it in GitHub Desktop.
Save RadoMark/5d7742876e088beee73970fd583e6135 to your computer and use it in GitHub Desktop.
OCRbot rate limit
# initializer
ActiveJob::TrafficControl.client = ConnectionPool.new(size: 5, timeout: 5) { Redis.new }
# app/workers/finishing_worker.rb
class FinishingWorker
@queue = :ocr_finishing_workers
def self.perform
redis = Redis.new
attempt = []
SingleWorker::OCR_BATCH_SIZE.times { attempt << redis.lpop(SingleWorker::IDS_TO_OCR_QUEUE) }
meaningful_values = attempt.reject(&:blank?)
return if meaningful_values.empty?
MultipleWorker.perform_later meaningful_values
end
end
gem "rails", "~> 5.1.4"
gem "redis"
gem "resque"
gem "connection_pool"
gem "resque-scheduler"
gem "activejob-traffic_control"
# app/workers/multiple_worker.rb
class MultipleWorker < ActiveJob::Base
EmptyIdsQueueError = Class.new(StandardError)
PROCESSED_IDS_QUEUE = "processed_ids".freeze
queue_as :ocr_multiple_workers
# Run 1 task per 3 seconds
throttle threshold: 1, period: 3.second
def perform(ids = [])
raise EmptyIdsQueueError if ids.blank?
puts "Processing IDs: [#{ids.join(", ")}]"
Redis.new.rpush PROCESSED_IDS_QUEUE, ids
end
end
# lib/tasks/resque.rake
require "resque/tasks"
require "resque-scheduler"
require "resque/scheduler/tasks"
namespace :resque do
task setup: :environment
task setup_schedule: :setup do
# The schedule doesn"t need to be stored in a YAML, it just needs to
# be a hash. YAML is usually the easiest.
Resque.schedule = YAML.load_file("resque_schedule.yml")
end
task scheduler: :setup_schedule
end
# https://github.com/resque/resque-scheduler#scheduled-jobs-recurring-jobs
FinishingWorker:
every: "5m"
description: "This job ensures all trailing documents will be OCRed"
# app/workers/single_worker.rb
class SingleWorker < ActiveJob::Base
IDS_TO_OCR_QUEUE = "ids_to_ocr".freeze
OCR_BATCH_SIZE = 10
queue_as :ocr_single_workers
def perform(id)
redis = Redis.new
redis.rpush IDS_TO_OCR_QUEUE, id
attempt = []
OCR_BATCH_SIZE.times { attempt << redis.lpop(IDS_TO_OCR_QUEUE) }
meaningful_values = attempt.reject(&:blank?)
if meaningful_values.count == OCR_BATCH_SIZE
MultipleWorker.perform_later meaningful_values
else
redis.rpush IDS_TO_OCR_QUEUE, meaningful_values
end
end
end
[(1..100).to_a, (101..200).to_a, (201..300).to_a].each do |ids|
Thread.new do
ids.each { |id| SingleWorker.perform_later(id) }
end
end
# after some time...
raise "error!" if Redis.new.lrange(MutlipleWorker::PROCESSED_IDS_QUEUE, 0, -1).uniq.count != 300
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment