Skip to content

Instantly share code, notes, and snippets.

@romiras
Created May 27, 2022 18:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save romiras/70cd02ff186ad64f571dc991de0e4fc7 to your computer and use it in GitHub Desktop.
Save romiras/70cd02ff186ad64f571dc991de0e4fc7 to your computer and use it in GitHub Desktop.
Demo async queue consumer with concurrent N workers in Ruby
require 'concurrent'
require 'benchmark'
require 'logger'
module Logging # Credits: https://stackoverflow.com/a/6768164/10118318
# This is the magical bit that gets mixed into your classes
def logger
Logging.logger
end
# Global, memoized, lazy initialized instance of a logger
def self.logger
@logger ||= Logger.new(STDOUT)
end
end
class AsyncConsumer
include Logging
NUM_WORKERS = 5
NUM_JOBS = 100
attr_accessor :stats
def initialize(executor, options: { n_workers: NUM_WORKERS, n_jobs: NUM_JOBS })
@jobs = SizedQueue.new(options[:n_jobs])
push_sample_jobs(10)
@executor = executor
@stats = Concurrent::Hash.new
end
def run
logger.info 'Starting workers'
t = Benchmark.realtime do
promises = Array.new(NUM_WORKERS) do |worker|
Concurrent::Promises.future do
logger.info "Starting worker #{worker}"
while !@jobs.empty? && (x = pop_job)
logger.info "Worker #{worker} picked job: #{x.inspect}"
incr_counter(worker)
job_name, sleep_time = x
begin
@executor.call(job_name, sleep_time)
rescue => e
logger.error "Bad luck! Job #{job_name} has failed due to error: #{e.message}\n#{e.backtrace[0]}"
end
logger.info "Worker #{worker} finished job: #{x.inspect}"
end
logger.info "Stopping worker #{worker}"
end
end
promises.each(&:value!) # resolve promises
end
logger.info "Elapsed %.3f s" % t
rescue => e
logger.error "Terminated due to error: #{e.message}\n#{e.backtrace[0]}"
ensure
logger.info 'Finished workers'
end
def print_performance
puts "Performance: #{stats.inspect}"
end
private
def push_sample_jobs(n)
@jobs.push(["L1", 3.0]) # a bit longer job
@jobs.push(["L2", 2.0]) # a bit longer job
n.times do
@jobs.push([["A", "B", "C"].sample, rand*0.1]) # shorter job
end
@jobs.push(nil)
@jobs.close
end
def pop_job
@jobs.pop
end
def incr_counter(worker)
@stats[worker] ||= 0
@stats[worker] += 1
end
end
executor = lambda do |job_name, sleep_time|
Logging.logger.info "job_name: #{job_name}, sleep_time: #{sleep_time}"
raise "Boom!" if !job_name.start_with?("L") && sleep_time > 0.8
sleep(1e-3 + sleep_time)
end
consumer = AsyncConsumer.new(executor)
consumer.run
consumer.print_performance
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment