Skip to content

Instantly share code, notes, and snippets.

@romiras
Created September 23, 2022 10:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save romiras/3a4705dc61260a46f73afb4eb19a1aaa to your computer and use it in GitHub Desktop.
Save romiras/3a4705dc61260a46f73afb4eb19a1aaa to your computer and use it in GitHub Desktop.
Demo async workers
require 'logger'
module Logging # Credits: https://stackoverflow.com/a/6768164/10118318
# This is the magical bit that gets mixed into your classes
def logger
Logging.logger
end
# Global, memoized, lazy initialized instance of a logger
def self.logger
@logger ||= Logger.new($stdout)
end
end
class DemoAsyncConsumer
include Logging
NUM_THREADS = 2
def initialize
@jobs = Queue.new
@workers = []
@workers_asleep = true
end
def call
NUM_THREADS.times do |i|
@workers << Thread.new do
wid = i
logger.debug "worker #{wid}: started as asleep"
Thread.stop
until @jobs.empty?
logger.debug("worker #{wid}: pop wait")
# This will remove the first object from @jobs
job = @jobs.pop
logger.debug("worker #{wid}: pop done")
logger.debug("worker #{wid}: job start")
handle_job(job)
logger.debug("worker #{wid}: job done")
end
logger.debug("worker #{wid}: no more jobs")
end
end
5.times do
sleep 1
add_job(rand)
end
@jobs.close
@workers.each { |w| logger.debug("worker state: #{w.status}") }
@workers.each(&:join)
NUM_THREADS.times { |i| logger.debug("worker #{i}: done") }
logger.debug("Left #{@jobs.length} jobs")
end
private
def add_job(job)
logger.debug("==> pushing a job")
@jobs << job
if @workers_asleep
@workers.each(&:run)
@workers_asleep = false
end
logger.debug("==> job pushed")
logger.debug("Now #{@jobs.length} jobs are pending")
end
def handle_job(job)
p job
end
end
DemoAsyncConsumer.new.call
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment