-
-
Save paddor/471586839f69fe2d74f219731b861be4 to your computer and use it in GitHub Desktop.
Exploring the 1BRC in Ruby with Ractors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby --yjit | |
WORKER_THREADS = 4 | |
CHUNK_SIZE = 2**16 | |
# BUG: my city struct is being corrupted when moving from the worker to the main ractor. | |
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=1164>` becomes | |
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=nil>`, note that each `n` attribute becomes `nil`. | |
# https://bugs.ruby-lang.org/issues/20165 | |
# I tried changing the Struct to a simple array and still using `move: true`, | |
# but I ran into similar data corruption issues! So I'm going to switch to `move: false`, | |
# it's not a huge deal in this case since it's just one hash per worker copied at the end. | |
City = Struct.new(:min, :tot, :max, :n) | |
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
workers = (1..WORKER_THREADS).map do | |
Ractor.new do | |
cities = city_hash | |
while chunk = Ractor.receive | |
chunk.each_line do |line| | |
name,val = line.split(";") | |
city = cities[name] | |
city.n += 1 | |
val = val.to_f | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
end | |
cities | |
end | |
end | |
# map step | |
nchunks = 0 | |
infile = File.open('measurements.txt', 'rb') | |
round = workers.cycle | |
until infile.eof? | |
chunk = infile.read CHUNK_SIZE | |
# ensure we're on a line boundary | |
chunk += infile.readline unless infile.eof? | |
ractor = round.next | |
ractor << chunk.freeze # needs backpressure | |
nchunks += 1 | |
# break if nchunks > 5_000 | |
end | |
# reduce step | |
results = city_hash | |
workers.each do |ractor| | |
ractor << nil # signal done | |
cities = ractor.take | |
cities.each do |name,o| | |
city = results[name] | |
city.min = o.min if o.min < city.min | |
city.max = o.max if o.max > city.max | |
city.tot += o.tot | |
city.n += o.n | |
end | |
end | |
results.sort_by(&:first).each{|n,c| | |
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby --yjit | |
require 'async' | |
require 'async/queue' | |
WORKER_THREADS = 4 | |
CHUNK_SIZE = 2**16 | |
Q_SIZE = 10_000 | |
City = Struct.new(:min, :tot, :max, :n) | |
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
workers = (1..WORKER_THREADS).map do | |
Ractor.new(Ractor.current) do |main_ractor| | |
cities = city_hash | |
while chunk = main_ractor.take | |
chunk.each_line do |line| | |
name,val = line.split(";") | |
city = cities[name] | |
city.n += 1 | |
val = val.to_f | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
end | |
cities | |
end | |
end | |
# map step | |
Async do | |
chunks = Async::LimitedQueue.new(Q_SIZE) | |
reader = Async do | |
nchunks = 0 | |
infile = File.open('measurements.txt', 'rb') | |
until infile.eof? | |
chunk = infile.read CHUNK_SIZE | |
# ensure we're on a line boundary | |
chunk += infile.readline unless infile.eof? | |
chunks << chunk.freeze | |
nchunks += 1 | |
# break if nchunks > 5_000 | |
end | |
chunks << nil | |
end | |
chunks.each { |c| Ractor.yield c } | |
end | |
# reduce step | |
results = city_hash | |
workers.size.times { Ractor.yield nil } # signal done | |
workers.each do |ractor| | |
cities = ractor.take | |
cities.each do |name,o| | |
city = results[name] | |
city.min = o.min if o.min < city.min | |
city.max = o.max if o.max > city.max | |
city.tot += o.tot | |
city.n += o.n | |
end | |
end | |
results.sort_by(&:first).each{|n,c| | |
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby --yjit | |
City = Struct.new(:min, :tot, :max, :n) | |
cities = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
File.open('measurements.txt', 'rb').each_line do |line| | |
name,val = line.split(";") | |
city = cities[name] | |
city.n += 1 | |
val = Float(val) | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
cities.sort_by(&:first).each{|n,c| | |
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby --yjit | |
require 'bundler/inline' | |
gemfile do | |
gem 'timeout' | |
gem 'async' | |
gem 'cztop' | |
end | |
WORKERS = 16 | |
CHUNK_SIZE = 2**20 # 1 MiB | |
MAP_ENDPOINT = "ipc:///tmp/1brc.map.#$$" | |
REDUCE_ENDPOINT = "ipc:///tmp/1brc.reduce.#$$" | |
City = Struct.new(:min, :tot, :max, :n) | |
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
warn "main: #$$" | |
worker_pids = WORKERS.times.map do |worker_id| | |
fork do | |
warn "worker #{worker_id}: #$$" | |
Process.setproctitle("#$0: worker #%d" % worker_id) | |
pull_socket = CZTop::Socket::PULL.new | |
push_socket = CZTop::Socket::PUSH.new | |
pull_socket.connect MAP_ENDPOINT | |
push_socket.connect REDUCE_ENDPOINT | |
while true | |
chunk = pull_socket.receive.to_a.first | |
cities = city_hash | |
chunk.each_line do |line| | |
name, val = line.split(";", 2) | |
city = cities[name] | |
city.n += 1 | |
val = Float val | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
cities.default_proc = nil # make dumpable | |
push_socket << Marshal.dump(cities) | |
end | |
end | |
end | |
at_exit do | |
Timeout.timeout 1 do | |
warn "main: terminating workers: #{worker_pids}" | |
Process.kill(:TERM, *worker_pids) | |
Process.waitall | |
end | |
rescue Interrupt, Timeout::Error | |
worker_pids.each do |pid| | |
Process.kill(:KILL, pid) | |
rescue SystemCallError | |
# already dead | |
else | |
warn "main: killed worker #{pid}" | |
end | |
end | |
results = city_hash | |
Async do |task| | |
# map step | |
all_chunks_sent = false | |
nchunks = 0 | |
task.async do | |
push_socket = CZTop::Socket::PUSH.new | |
push_socket.bind MAP_ENDPOINT | |
chunk = '' | |
until STDIN.eof? | |
STDIN.read CHUNK_SIZE, chunk # ensures UTF-8 | |
# ensure we're on a line boundary | |
chunk += STDIN.readline unless STDIN.eof? | |
push_socket << chunk | |
nchunks += 1 | |
end | |
all_chunks_sent = true | |
warn "main: all chunks sent to workers" | |
end | |
# reduce step | |
task.async do | |
pull_socket = CZTop::Socket::PULL.new | |
pull_socket.bind REDUCE_ENDPOINT | |
until all_chunks_sent && nchunks.zero? | |
cities = Marshal.load pull_socket.receive.to_a.first | |
results.merge! cities do |name, left, right| | |
left.min = right.min if right.min < left.min | |
left.max = right.max if right.max > left.max | |
left.tot += right.tot | |
left.n += right.n | |
left | |
end | |
nchunks -= 1 | |
end | |
end | |
end | |
warn "main: sorting and printing ..." | |
results.sort_by(&:first).each do |name, city| | |
puts format("%s=%.1f/%.1f/%.1f", name, city.min, city.tot / city.n, city.max) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment