Skip to content

Instantly share code, notes, and snippets.

@oinak
Created November 9, 2017 21:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oinak/6ed7eb0e42545ad7fc2439ff6ed27225 to your computer and use it in GitHub Desktop.
Save oinak/6ed7eb0e42545ad7fc2439ff6ed27225 to your computer and use it in GitHub Desktop.

Exercise:

Given this calls:

Taskman.queue(2.5) { print 'world!' }
Taskman.queue(1.0) { print 'hello ' }

When we do this one:

Taskman.work

You get the output:

hello world

class Taskman
class << self
def queue(delay, &block)
store << [delay, block]
end
def work(thread_provider: Thread)
workers = []
while (item = store.shift) do
delay,task = item
workers << thread_provider.new do
sleep delay
task.call
end
end
workers.each(&:join)
end
private
def store
@queue ||= []
end
end
end
if $PROGRAM_NAME == __FILE__
Taskman.queue(2.5) { print 'world!' }
Taskman.queue(1.0) { print 'hello ' }
Taskman.work # => hello world
end
require "minitest/autorun"
require "minitest/pride"
require_relative "taskman"
class TaskmanTest < Minitest::Spec
subject { Taskman }
describe ".queue" do
it "stores a job" do
job = -> { true }
list = subject.queue(0, &job)
value(list[0][0]).must_equal(0)
value(list[0][1]).must_be_instance_of(Proc)
end
end
describe ".work" do
it "creates a thread" do
class ThreadMock
@instances = []
class << self
attr_accessor :instances
end
def initialize
self.class.instances << self
end
attr_accessor :called_join # accessible for test
def join
@called_join = true # I was called!
end
end
subject.queue(1) { true }
subject.work(thread_provider: ThreadMock)
value(ThreadMock.instances.first.called_join).must_equal(true)
end
# This version suffers race conditions
# it "creates a thread" do
# job = -> { true }
# subject.queue(1, &job)
# thread_mock = Minitest::Mock.new
# r = Object.new
# def r.join;end
# thread_mock.expect(:new, r)
# subject.work(thread_provider: thread_mock)
# thread_mock.verify
# end
it "does the work" do
executed = false
job = -> { executed = true }
subject.queue(0, &job)
subject.work
value(executed).must_equal(true)
end
end
end
@oinak
Copy link
Author

oinak commented Feb 3, 2022

And now with Ruby 3:

# frozen_string_literal: true

require 'logger'

# pool
class WorkerPool
  def initialize
    @workers = []
    @tasks = []
  end

  def logger
    @logger ||= Logger.new($stdout).tap do |l|
      l.level = (ENV['DEBUG'] ? :debug : :error)
    end
  end

  def schedule(delay, message)
    log "scheduling #{message} for #{delay}s later"
    @tasks << [delay, message]
  end

  def worker_do(delay, message)
    @workers << Ractor.new(delay, message, self) do |delay, message, obj|
      obj.log "Initiating '#{delay}' wait"
      sleep(delay)
      obj.log 'Wake up and return message'
      message
    end
  end

  def work
    log 'INITIATE workerS'
    @tasks.map { |(d, m)| worker_do(d, m) }
    log 'RECOLLECT RESULTS'
    until @workers.empty?
      r, val = Ractor.select(*@workers)
      puts val
      @workers.delete(r)
      log "deleted '#{val}' ractor"
    end
    log 'END'
  end

  def log(text)
    logger.info "\033[1;35m#{text}\033[0m"
  end
end

wp = WorkerPool.new
wp.schedule 3, 'world!'
wp.schedule 1, 'Hello '
wp.work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment