Skip to content

Instantly share code, notes, and snippets.

@GustavoCaso
Last active January 5, 2021 04:30
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save GustavoCaso/f6b14360ec1e4031f438c51045ee2d64 to your computer and use it in GitHub Desktop.
Save GustavoCaso/f6b14360ec1e4031f438c51045ee2d64 to your computer and use it in GitHub Desktop.
Simple implementation for a job scheduler using ruby Ractor primitive
# How to install the latests version of ruby
# 1. clone ruby/ruby
# 2. autoconf
# 3. ./configure
# 4. cd ext/openssl && ruby extconf.rb --with-openssl-dir=<openssl_root>; make; make install
# 5. cd ../../
# 6. make
# 7. make install-nodoc
# You should be ready ti run this script!!!!
require 'optparse'
require 'json'
require 'bundler/inline'
gemfile do
source 'https://rubygems.org'
gem 'redis'
gem 'oj'
end
require 'redis'
require 'oj'
options = {}
OptionParser.new do |opts|
opts.banner = "Usage: ractor.rb [options]"
opts.on("-s", "server", "Run Server") do |s|
options[:server] = true
end
opts.on("-e", "enqueue", "Enqueue work") do |s|
options[:enqueue] = true
end
end.parse!
class Worker
attr_reader :id
def initialize(id)
@id = id
end
def work(job)
puts "Worker[#{id}] working #{job}"
parsed_job = Oj.load(job)
Object.const_get(parsed_job['job_class']).new.perform(parsed_job['args'])
end
end
class DummyJob
def perform(*args)
puts "Performing"
puts args.inspect
end
end
def new_job
{'job_class' => DummyJob, 'args' => "DummyJob"}.to_json
end
QUEUE_NAME = "jobs:queue:low"
if options[:enqueue]
redis = Redis.new
puts "enqueue"
result = redis.rpush(QUEUE_NAME, new_job)
puts "Result from enqueue #{result}"
else
pipe = Ractor.new do
loop do
Ractor.yield(Ractor.recv, move: true)
end
end
workers = 4.times.map do |index|
worker = Worker.new(index)
Ractor.new(pipe, worker) do |pipe, worker|
loop do
job = pipe.take
puts "taken job from pipe by #{Ractor.current} and work it by worker #{worker.id}"
worker.work(job)
end
end
end
redis = Redis.new
loop do
puts "Waiting on work"
queue, job = redis.brpop(QUEUE_NAME)
puts "Pushing work from #{queue} to available workers"
pipe.send(job, move: true)
end
end
@GustavoCaso
Copy link
Author

Running the server:

./ruby ~/Desktop/ractor.rb -s

Waiting on work

On a separate tab run:

./ruby ~/Desktop/ractor.rb -e

enqueue
Result from enqueue 1

You should be able to see work been done on the server tab

Pushing work from jobs:queue:low to available workers
Waiting on work
taken job from pipe by #<Ractor:0x00007fe6608c33b0> and work it by worker 0
Worker[0] working {"job_class":"DummyJob","args":"DummyJob"}
Performing
["DummyJob"]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment