Last active
January 5, 2021 04:30
-
-
Save GustavoCaso/f6b14360ec1e4031f438c51045ee2d64 to your computer and use it in GitHub Desktop.
Simple implementation for a job scheduler using ruby Ractor primitive
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# How to install the latests version of ruby | |
# 1. clone ruby/ruby | |
# 2. autoconf | |
# 3. ./configure | |
# 4. cd ext/openssl && ruby extconf.rb --with-openssl-dir=<openssl_root>; make; make install | |
# 5. cd ../../ | |
# 6. make | |
# 7. make install-nodoc | |
# You should be ready ti run this script!!!! | |
require 'optparse' | |
require 'json' | |
require 'bundler/inline' | |
gemfile do | |
source 'https://rubygems.org' | |
gem 'redis' | |
gem 'oj' | |
end | |
require 'redis' | |
require 'oj' | |
options = {} | |
OptionParser.new do |opts| | |
opts.banner = "Usage: ractor.rb [options]" | |
opts.on("-s", "server", "Run Server") do |s| | |
options[:server] = true | |
end | |
opts.on("-e", "enqueue", "Enqueue work") do |s| | |
options[:enqueue] = true | |
end | |
end.parse! | |
class Worker | |
attr_reader :id | |
def initialize(id) | |
@id = id | |
end | |
def work(job) | |
puts "Worker[#{id}] working #{job}" | |
parsed_job = Oj.load(job) | |
Object.const_get(parsed_job['job_class']).new.perform(parsed_job['args']) | |
end | |
end | |
class DummyJob | |
def perform(*args) | |
puts "Performing" | |
puts args.inspect | |
end | |
end | |
def new_job | |
{'job_class' => DummyJob, 'args' => "DummyJob"}.to_json | |
end | |
QUEUE_NAME = "jobs:queue:low" | |
if options[:enqueue] | |
redis = Redis.new | |
puts "enqueue" | |
result = redis.rpush(QUEUE_NAME, new_job) | |
puts "Result from enqueue #{result}" | |
else | |
pipe = Ractor.new do | |
loop do | |
Ractor.yield(Ractor.recv, move: true) | |
end | |
end | |
workers = 4.times.map do |index| | |
worker = Worker.new(index) | |
Ractor.new(pipe, worker) do |pipe, worker| | |
loop do | |
job = pipe.take | |
puts "taken job from pipe by #{Ractor.current} and work it by worker #{worker.id}" | |
worker.work(job) | |
end | |
end | |
end | |
redis = Redis.new | |
loop do | |
puts "Waiting on work" | |
queue, job = redis.brpop(QUEUE_NAME) | |
puts "Pushing work from #{queue} to available workers" | |
pipe.send(job, move: true) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Running the server:
./ruby ~/Desktop/ractor.rb -s
On a separate tab run:
./ruby ~/Desktop/ractor.rb -e
You should be able to see work been done on the server tab