Skip to content

Instantly share code, notes, and snippets.

@apeiros
Created November 21, 2017 14:38
Show Gist options
  • Save apeiros/0478ccf738f3df9018c35b33ddce4118 to your computer and use it in GitHub Desktop.
Save apeiros/0478ccf738f3df9018c35b33ddce4118 to your computer and use it in GitHub Desktop.
require "time"
require "pry"
require "thread"
class Logger
def initialize
@mutex = Mutex.new
end
def log_error(error)
now = Time.now
pid = $$
tid = Thread.current.object_id.to_s(16)
puts "[#{now.iso8601}|PID:#{pid}|TID:#{tid}] Job interrupted through exception #{exception.message}"
end
def log_job(name, last_execution_ended_at)
start_time = Time.now
delay = last_execution_ended_at ? (start_time - last_execution_ended_at).round : 0
pid = $$
tid = Thread.current.object_id.to_s(16)
@mutex.synchronize do
puts "[#{start_time.iso8601}|PID:#{pid}|TID:#{tid}] Starting job #{name.inspect}, delay: #{delay}s"
end
yield
end_time = Time.now
duration = (end_time - start_time).round
@mutex.synchronize do
puts "[#{end_time.iso8601}|PID:#{pid}|TID:#{tid}] Ending job #{name.inspect}, duration: #{delay}s"
end
end
end
class Worker
def initialize(queue, logger)
@queue = queue
@logger = logger
@active = true
process_loop
end
def process_loop
while @active && job = @queue.shift
process(job)
job.last_execution_ended_at = Time.now
@queue.push job # re-queue job after done
end
end
def stop
@active = false
end
def process(job)
@logger.log_job(job.name, job.last_execution_ended_at) do
job.call
end
rescue NoMemoryError, SignalException, Interrupt, SystemExit
raise
rescue Exception => e
@logger.log_error(e)
end
end
class Job
def initialize(name, &block)
raise ArgumentError, "Block required" unless block
@name = name
@block = block
@last_execution_ended_at = nil
end
attr_accessor :last_execution_ended_at
attr_reader :name, :block
def call(*args, &block)
@block.call(*args, &block)
end
end
job_queue = Queue.new
pool_size = 2
logger = Logger.new
# fill queue with all jobs
job_queue.push(Job.new("demo") { puts "hi"; sleep 10 })
# /fill
Thread.abort_on_exception = true
workers = Array.new(pool_size) { Thread.new { Worker.new(job_queue, logger) } }
binding.pry
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment