Skip to content

Instantly share code, notes, and snippets.

@kyuden
Created March 21, 2018 16:46
Show Gist options
  • Save kyuden/de50e72a781ddf3b109b30bbe5bc5a62 to your computer and use it in GitHub Desktop.
Save kyuden/de50e72a781ddf3b109b30bbe5bc5a62 to your computer and use it in GitHub Desktop.
class ProducerConsumer
KILL = :kill
def initialize(produce:, consume:, producer_num: 1, consumer_num: 5, max_queue_size: 1000)
@produce = produce
@consume = consume
@consumer_num = consumer_num
@producer_num = producer_num
@ids_index = 0
@max_queue_size = max_queue_size
@ids_index_mutex = Mutex.new
end
def run
queue = SizedQueue.new(@max_queue_size)
producers = Array.new(@producer_num) { create_worker(method(:produce), queue) }
consumers = Array.new(@consumer_num) { create_worker(method(:consume), queue) }
producers.each(&:join)
consumers.size.times { queue.push :kill }
consumers.each(&:join)
end
private
def create_worker(func, *args)
Thread.new do
loop { break if KILL == func.call(*args) }
end
end
def produce(queue)
values = @produce.call(next_ids_index)
return KILL if kill_worker?(values)
queue.push(values)
end
def consume(queue)
values = queue.pop
return KILL if kill_worker?(values)
@consume.call(values)
end
def next_ids_index
i = nil
@ids_index_mutex.synchronize do
i = @ids_index
@ids_index += 1
end
i
end
def kill_worker?(v)
v == KILL
end
end
nested_ids = [*1..100].each_slice(10).to_a
fetch = proc do |i|
ids = nested_ids[i]
next :kill unless ids
ids.map { |id| id * 2 }
end
insert = proc do |values|
p values
end
ProducerConsumer.new(produce: fetch, consume: insert, producer_num: 5, consumer_num: 1).run
# $ ruby producer_consumer.rb 1:44
# [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
# [22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
# [42, 44, 46, 48, 50, 52, 54, 56, 58, 60]
# [62, 64, 66, 68, 70, 72, 74, 76, 78, 80]
# [82, 84, 86, 88, 90, 92, 94, 96, 98, 100]
# [102, 104, 106, 108, 110, 112, 114, 116, 118, 120]
# [122, 124, 126, 128, 130, 132, 134, 136, 138, 140]
# [142, 144, 146, 148, 150, 152, 154, 156, 158, 160]
# [162, 164, 166, 168, 170, 172, 174, 176, 178, 180]
# [182, 184, 186, 188, 190, 192, 194, 196, 198, 200]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment