Ractor worker pool
require 'kramdown' | |
require 'redcarpet' | |
require 'toml' | |
require 'yaml' | |
# | |
# pool | |
# | |
class WorkerPool | |
def initialize(num_workers) | |
@close_channel = new_channel | |
@num_jobs = 0 | |
@num_workers = num_workers | |
@work_channel = new_channel | |
@workers = num_workers.times.map { |i| new_worker(i) } | |
end | |
def close | |
@num_workers.times.each do | |
@close_channel << [:close] | |
end | |
@workers.each(&:take) | |
end | |
def do_work(job) | |
@num_jobs += 1 | |
@work_channel.send([:job, job]) | |
end | |
def wait | |
@num_jobs.times.each do | |
_r, res= Ractor.select(*@workers) | |
job, duration = res | |
puts "job '#{job.name}' took #{duration}s" | |
end | |
end | |
private def new_channel | |
Ractor.new do | |
loop do | |
Ractor.yield Ractor.receive | |
end | |
end | |
end | |
private def new_worker(worker_num) | |
Ractor.new(@work_channel, @close_channel, name: "worker-#{worker_num}") do |work_channel, close_channel| | |
puts "#{name}: started" | |
loop do | |
case Ractor.select(work_channel, close_channel) | |
in _, [:job, job] | |
start = Time.now | |
_res = job.work | |
Ractor.yield([job, Time.now - start]) | |
in _, [:close] | |
puts "#{name}: closing" | |
break | |
else | |
puts "received unknown message: #{message}" | |
end | |
end | |
end | |
end | |
end | |
# | |
# jobs | |
# | |
class RenderFragmentJob | |
def initialize(source) | |
@source = source | |
end | |
def name | |
"fragment: #{File.basename(@source)}" | |
end | |
def work | |
data = File.read(@source) | |
_, frontmatter_data, markdown_data = data.split("+++") | |
if frontmatter_data.empty? || markdown_data.empty? | |
raise "malformatted fragment: #{File.basename(@source)}" | |
end | |
meta = TOML::Parser.new(frontmatter_data).parsed | |
# content = Kramdown::Document.new(markdown_data).to_html | |
# data = YAML.parse("a: 123") | |
# renderer = Redcarpet::Render::HTML.new(render_options = {}) | |
# markdown = Redcarpet::Markdown.new(renderer, extensions = {}) | |
# markdown.render(markdown_data) | |
end | |
end | |
class SleepJob | |
def initialize(i) | |
@i = i | |
end | |
def name | |
"sleep: #{@i}" | |
end | |
def work | |
sleep(1) | |
end | |
end | |
# | |
# main | |
# | |
pool = WorkerPool.new(10) | |
=begin | |
50.times.each do |i| | |
pool.do_work(SleepJob.new(i)) | |
end | |
=end | |
Dir["/Users/brandur/Documents/go/src/github.com/brandur/sorg/content/fragments/*.md"].each do |source| | |
pool.do_work(RenderFragmentJob.new(source)) | |
end | |
pool.wait | |
pool.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment