Skip to content

Instantly share code, notes, and snippets.

@dmshvetsov
Created February 5, 2019 05:33
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dmshvetsov/fd1ea8cfc96a908ca0e6fae298c0a7e3 to your computer and use it in GitHub Desktop.
Save dmshvetsov/fd1ea8cfc96a908ca0e6fae298c0a7e3 to your computer and use it in GitHub Desktop.
Example of how we may use queue with threads to make efficient background worker.
class Worker
def self.start(num_threads:, queue_size:)
queue = SizedQueue.new(queue_size)
worker = new(num_threads: num_threads, queue: queue)
worker.spawn_threads
worker
end
def initialize(num_threads:, queue:)
@num_threads = num_threads
@queue = queue
@threads = []
end
attr_reader :num_threads, :threads
private :threads
def spawn_threads
num_threads.times do
threads << Thread.new do
while running? || actions?
action_proc, action_payload = wait_for_action
action_proc.call(action_payload) if action_proc
end
end
end
end
def enqueue(action, payload)
queue.push([action, payload])
end
def stop
queue.close
threads.each(&:exit)
threads.clear
true
end
private
attr_reader :queue, :threads
def actions?
!queue.empty?
end
def running?
!queue.closed?
end
def dequeue_action
queue.pop(true)
end
def wait_for_action
queue.pop(false)
end
end
@SathiyarajS
Copy link

Hi @iamdidev, dequeue_action method is declared as private here. I do not see the usage of this method inside this class. Could you please explain the purpose of this method?

@dmshvetsov
Copy link
Author

Hi @SathiyarajS good catch. This method is used in the "production" version of the worker, take a look https://github.com/yabeda-rb/yabeda-datadog/blob/8c6e40f24038d4420be500c97032c38096f541e2/lib/yabeda/datadog/worker.rb

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment