Skip to content

Instantly share code, notes, and snippets.

@paddor
Forked from codekitchen/1brc_ractors.rb
Last active January 15, 2024 19:11
Show Gist options
  • Save paddor/471586839f69fe2d74f219731b861be4 to your computer and use it in GitHub Desktop.
Save paddor/471586839f69fe2d74f219731b861be4 to your computer and use it in GitHub Desktop.
Exploring the 1BRC in Ruby with Ractors
#!/usr/bin/env ruby --yjit
WORKER_THREADS = 4
CHUNK_SIZE = 2**16
# BUG: my city struct is being corrupted when moving from the worker to the main ractor.
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=1164>` becomes
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=nil>`, note that each `n` attribute becomes `nil`.
# https://bugs.ruby-lang.org/issues/20165
# I tried changing the Struct to a simple array and still using `move: true`,
# but I ran into similar data corruption issues! So I'm going to switch to `move: false`,
# it's not a huge deal in this case since it's just one hash per worker copied at the end.
City = Struct.new(:min, :tot, :max, :n)
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] }
workers = (1..WORKER_THREADS).map do
Ractor.new do
cities = city_hash
while chunk = Ractor.receive
chunk.each_line do |line|
name,val = line.split(";")
city = cities[name]
city.n += 1
val = val.to_f
city.min = val if val < city.min
city.max = val if val > city.max
city.tot += val
end
end
cities
end
end
# map step
nchunks = 0
infile = File.open('measurements.txt', 'rb')
round = workers.cycle
until infile.eof?
chunk = infile.read CHUNK_SIZE
# ensure we're on a line boundary
chunk += infile.readline unless infile.eof?
ractor = round.next
ractor << chunk.freeze # needs backpressure
nchunks += 1
# break if nchunks > 5_000
end
# reduce step
results = city_hash
workers.each do |ractor|
ractor << nil # signal done
cities = ractor.take
cities.each do |name,o|
city = results[name]
city.min = o.min if o.min < city.min
city.max = o.max if o.max > city.max
city.tot += o.tot
city.n += o.n
end
end
results.sort_by(&:first).each{|n,c|
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max]
}
#!/usr/bin/env ruby --yjit
require 'async'
require 'async/queue'
WORKER_THREADS = 4
CHUNK_SIZE = 2**16
Q_SIZE = 10_000
City = Struct.new(:min, :tot, :max, :n)
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] }
workers = (1..WORKER_THREADS).map do
Ractor.new(Ractor.current) do |main_ractor|
cities = city_hash
while chunk = main_ractor.take
chunk.each_line do |line|
name,val = line.split(";")
city = cities[name]
city.n += 1
val = val.to_f
city.min = val if val < city.min
city.max = val if val > city.max
city.tot += val
end
end
cities
end
end
# map step
Async do
chunks = Async::LimitedQueue.new(Q_SIZE)
reader = Async do
nchunks = 0
infile = File.open('measurements.txt', 'rb')
until infile.eof?
chunk = infile.read CHUNK_SIZE
# ensure we're on a line boundary
chunk += infile.readline unless infile.eof?
chunks << chunk.freeze
nchunks += 1
# break if nchunks > 5_000
end
chunks << nil
end
chunks.each { |c| Ractor.yield c }
end
# reduce step
results = city_hash
workers.size.times { Ractor.yield nil } # signal done
workers.each do |ractor|
cities = ractor.take
cities.each do |name,o|
city = results[name]
city.min = o.min if o.min < city.min
city.max = o.max if o.max > city.max
city.tot += o.tot
city.n += o.n
end
end
results.sort_by(&:first).each{|n,c|
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max]
}
#!/usr/bin/env ruby --yjit
City = Struct.new(:min, :tot, :max, :n)
cities = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] }
File.open('measurements.txt', 'rb').each_line do |line|
name,val = line.split(";")
city = cities[name]
city.n += 1
val = Float(val)
city.min = val if val < city.min
city.max = val if val > city.max
city.tot += val
end
cities.sort_by(&:first).each{|n,c|
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max]
}
#!/usr/bin/env ruby --yjit
require 'bundler/inline'
gemfile do
gem 'timeout'
gem 'async'
gem 'cztop'
end
WORKERS = 16
CHUNK_SIZE = 2**20 # 1 MiB
MAP_ENDPOINT = "ipc:///tmp/1brc.map.#$$"
REDUCE_ENDPOINT = "ipc:///tmp/1brc.reduce.#$$"
City = Struct.new(:min, :tot, :max, :n)
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] }
warn "main: #$$"
worker_pids = WORKERS.times.map do |worker_id|
fork do
warn "worker #{worker_id}: #$$"
Process.setproctitle("#$0: worker #%d" % worker_id)
pull_socket = CZTop::Socket::PULL.new
push_socket = CZTop::Socket::PUSH.new
pull_socket.connect MAP_ENDPOINT
push_socket.connect REDUCE_ENDPOINT
while true
chunk = pull_socket.receive.to_a.first
cities = city_hash
chunk.each_line do |line|
name, val = line.split(";", 2)
city = cities[name]
city.n += 1
val = Float val
city.min = val if val < city.min
city.max = val if val > city.max
city.tot += val
end
cities.default_proc = nil # make dumpable
push_socket << Marshal.dump(cities)
end
end
end
at_exit do
Timeout.timeout 1 do
warn "main: terminating workers: #{worker_pids}"
Process.kill(:TERM, *worker_pids)
Process.waitall
end
rescue Interrupt, Timeout::Error
worker_pids.each do |pid|
Process.kill(:KILL, pid)
rescue SystemCallError
# already dead
else
warn "main: killed worker #{pid}"
end
end
results = city_hash
Async do |task|
# map step
all_chunks_sent = false
nchunks = 0
task.async do
push_socket = CZTop::Socket::PUSH.new
push_socket.bind MAP_ENDPOINT
chunk = ''
until STDIN.eof?
STDIN.read CHUNK_SIZE, chunk # ensures UTF-8
# ensure we're on a line boundary
chunk += STDIN.readline unless STDIN.eof?
push_socket << chunk
nchunks += 1
end
all_chunks_sent = true
warn "main: all chunks sent to workers"
end
# reduce step
task.async do
pull_socket = CZTop::Socket::PULL.new
pull_socket.bind REDUCE_ENDPOINT
until all_chunks_sent && nchunks.zero?
cities = Marshal.load pull_socket.receive.to_a.first
results.merge! cities do |name, left, right|
left.min = right.min if right.min < left.min
left.max = right.max if right.max > left.max
left.tot += right.tot
left.n += right.n
left
end
nchunks -= 1
end
end
end
warn "main: sorting and printing ..."
results.sort_by(&:first).each do |name, city|
puts format("%s=%.1f/%.1f/%.1f", name, city.min, city.tot / city.n, city.max)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment