Skip to content

Instantly share code, notes, and snippets.

@Zapotek
Last active February 3, 2022 10:10
Show Gist options
  • Save Zapotek/238002f356344e5dbf1dcf9e8686282c to your computer and use it in GitHub Desktop.
Save Zapotek/238002f356344e5dbf1dcf9e8686282c to your computer and use it in GitHub Desktop.
Ruby Ractor job scheduler.
# https://github.com/ko1/ractor-tvar
#
# gem install ractor-tvar
require 'ractor/tvar'
class Pipe
def initialize
@pipe = Ractor.new do
loop do
Ractor.yield Ractor.recv
end
end
end
def send( obj )
@pipe.send( obj, move: true )
end
def receive
@pipe.take
end
end
class Job
def perform
raise 'Not implemented.'
end
def self.perform( *args )
new( *args ).perform
end
def self.for( *args )
[self, *args]
end
end
class Worker
attr_reader :id
def initialize( id )
@id = id
end
def work( job_klass, args )
job_klass.perform( *args )
end
end
class Scheduler
WORKERS = 5
def initialize( options = {} )
@pending_counter = Ractor::TVar.new( 0 )
initialize_pipe
initialize_workers
end
def queue( job_klass, *args )
Ractor.atomically { @pending_counter.value += 1 }
@pipe.send( [job_klass, args] )
end
def wait_until_done
sleep( 0.1 ) while !done?
end
def done?
Ractor.atomically do
@pending_counter.value == 0
end
end
private
def initialize_pipe
@pipe = Pipe.new
end
def initialize_workers
@workers = WORKERS.times.map do |index|
Ractor.new( @pipe, Worker.new( index ), @pending_counter ) do
|pipe, worker, pending_counter|
loop do
worker.work( *pipe.receive )
Ractor.atomically { pending_counter.value -= 1 }
end
end
end
end
end
class EchoJob < Job
def initialize( str )
@str = str.to_s
end
def perform
puts "#{self.class}: #{@str}"
end
end
jobs = []
10_000.times do |i|
jobs << EchoJob.for( i )
end
scheduler = Scheduler.new
while job = jobs.pop
scheduler.queue *job
end
scheduler.wait_until_done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment