Skip to content

Instantly share code, notes, and snippets.

@wuputah
Created Sep 30, 2019
Embed
What would you like to do?
require 'thread'
def QPromise(*args, &block)
QPromise.new(*args, &block)
end
class QPromise < BasicObject
# Allows you to change the defaults, e.g.:
# QPromise.config[:thread_limit] = 100
# Changes to these settings will have an instant effect to all existing QPromises.
@@config = {
thread_limit: 10,
wait_time: 0.1
}
cattr_accessor :config
def initialize(*args, &block)
@item = (::Thread.current[:qpromise_queue] ||= ::QPromise::Queue.new).push(args, block)
end
def method_missing(*args, &block)
@item.value.send(*args, &block)
end
def respond_to?(method)
@item.value.respond_to?(method)
end
def ==(obj)
@item.value == obj
end
class Item
def initialize(args, block, queue)
@args = args
@block = block
@thread = nil
@queue = queue
end
def run!
@thread = ::Thread.new(*@args, &@block)
end
def run
@queue.run(self)
end
def value
run unless @thread
@thread.value
end
def running?
!@thread.nil?
end
def completed?
@thread && @thread.status == false
end
end
class Queue
def initialize
@work = []
@current = 0
end
def push(*args, &block)
item = ::QPromise::Item.new(args, block, self)
@work << item
check_queue!
item
end
def check_queue!(run_new_items = true)
sweep = []
@work.each do |item|
if item.completed?
@current -= 1
sweep << item
end
if run_new_items && !item.running? && @current < ::QPromise.config[:thread_limit]
item.run!
@current += 1
end
end
sweep.each do |item|
@work.delete(item)
end
end
def run(item)
loop do
check_queue!(false)
break if @current < ::QPromise.config[:thread_limit]
sleep(::QPromise.config[:wait_time])
end
item.run!
@current += 1
if @current < @limit
check_queue!
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment