Skip to content

Instantly share code, notes, and snippets.

@codekitchen
Last active January 15, 2024 19:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save codekitchen/44fc6bd733ea10430db56da10908a69e to your computer and use it in GitHub Desktop.
Save codekitchen/44fc6bd733ea10430db56da10908a69e to your computer and use it in GitHub Desktop.
Exploring the 1BRC in Ruby with Ractors
#!/usr/bin/env ruby --yjit
WORKER_THREADS = 4
CHUNK_SIZE = 2**16
# BUG: my city struct is being corrupted when moving from the worker to the main ractor.
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=1164>` becomes
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=nil>`, note that each `n` attribute becomes `nil`.
# https://bugs.ruby-lang.org/issues/20165
# I tried changing the Struct to a simple array and still using `move: true`,
# but I ran into similar data corruption issues! So I'm going to switch to `move: false`,
# it's not a huge deal in this case since it's just one hash per worker copied at the end.
City = Struct.new(:min, :tot, :max, :n)
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] }
workers = (1..WORKER_THREADS).map do
Ractor.new do
cities = city_hash
while chunk = Ractor.receive
chunk.each_line do |line|
name,val = line.split(";")
city = cities[name]
city.n += 1
val = val.to_f
city.min = val if val < city.min
city.max = val if val > city.max
city.tot += val
end
end
cities
end
end
# map step
nchunks = 0
infile = File.open('measurements.txt', 'rb')
round = workers.cycle
until infile.eof?
chunk = infile.read CHUNK_SIZE
# ensure we're on a line boundary
chunk += infile.readline unless infile.eof?
ractor = round.next
ractor << chunk.freeze # needs backpressure
nchunks += 1
# break if nchunks > 5_000
end
# reduce step
results = city_hash
workers.each do |ractor|
ractor << nil # signal done
cities = ractor.take
cities.each do |name,o|
city = results[name]
city.min = o.min if o.min < city.min
city.max = o.max if o.max > city.max
city.tot += o.tot
city.n += o.n
end
end
results.sort_by(&:first).each{|n,c|
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max]
}
#!/usr/bin/env ruby --yjit
require 'async'
require 'async/queue'
WORKER_THREADS = 4
CHUNK_SIZE = 2**16
Q_SIZE = 10_000
City = Struct.new(:min, :tot, :max, :n)
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] }
workers = (1..WORKER_THREADS).map do
Ractor.new(Ractor.current) do |main_ractor|
cities = city_hash
while chunk = main_ractor.take
chunk.each_line do |line|
name,val = line.split(";")
city = cities[name]
city.n += 1
val = val.to_f
city.min = val if val < city.min
city.max = val if val > city.max
city.tot += val
end
end
cities
end
end
# map step
Async do
chunks = Async::LimitedQueue.new(Q_SIZE)
reader = Async do
nchunks = 0
infile = File.open('measurements.txt', 'rb')
until infile.eof?
chunk = infile.read CHUNK_SIZE
# ensure we're on a line boundary
chunk += infile.readline unless infile.eof?
chunks << chunk.freeze
nchunks += 1
# break if nchunks > 5_000
end
chunks << nil
end
chunks.each { |c| Ractor.yield c }
end
# reduce step
results = city_hash
workers.size.times { Ractor.yield nil } # signal done
workers.each do |ractor|
cities = ractor.take
cities.each do |name,o|
city = results[name]
city.min = o.min if o.min < city.min
city.max = o.max if o.max > city.max
city.tot += o.tot
city.n += o.n
end
end
results.sort_by(&:first).each{|n,c|
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max]
}
#!/usr/bin/env ruby --yjit
City = Struct.new(:min, :tot, :max, :n)
cities = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] }
File.open('measurements.txt', 'rb').each_line do |line|
name,val = line.split(";")
city = cities[name]
city.n += 1
val = Float(val)
city.min = val if val < city.min
city.max = val if val > city.max
city.tot += val
end
cities.sort_by(&:first).each{|n,c|
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max]
}
@simpl1g
Copy link

simpl1g commented Jan 10, 2024

Do you also have better results for Ractors with less workers? 8 Ractors almost twice slower than 4 for me

https://github.com/simpl1g/1brc/blob/main/README.md?plain=1#L59-L69

@paddor
Copy link

paddor commented Jan 15, 2024

Hey! Cool gist. Ractors are exciting but unfortunately still experimental. I shamelessly took this as a base and updated it to use ZMQ. It forks workers and communicates with them over ZMQ sockets.

https://gist.github.com/paddor/471586839f69fe2d74f219731b861be4#file-1brc_zmq-rb

On my old MBP 2019 it completes within 162.58 sec:

$ time pv --rate --average-rate --progress ../measurements.txt | ./1brc_zmq.rb > agg.txt
main: 61827
worker 0: 61828
worker 1: 61829
worker 2: 61830
worker 3: 61831
worker 4: 61832
worker 5: 61833
worker 6: 61834
worker 7: 61835
worker 8: 61836
worker 9: 61837
worker 10: 61838
worker 11: 61839
worker 12: 61840
worker 13: 61841
worker 14: 61842
worker 15: 61843
[ 250MiB/s] [ 250MiB/s] [======================================================================>] 100%
main: all chunks sent to workers
main: sorting and printing ...
main: terminating workers: [61828, 61829, 61830, 61831, 61832, 61833, 61834, 61835, 61836, 61837, 61838, 61839, 61840, 61841, 61842, 61843]

________________________________________________________
Executed in  162.58 secs    fish           external
   usr time   29.64 mins    0.37 millis   29.64 mins
   sys time    1.46 mins    1.41 millis    1.46 mins

@codekitchen
Copy link
Author

Nice! Current ractors seem to have surprising overhead, there's some discussion on this reddit thread with ractor perf vs a couple other forking solutions and yeah, current Ruby 3.3 ractors are so much slower than just forking. It doesn't seem related to getting the data into the ractors, I haven't looked any further but it feels like a locking or scheduling issue. Hopefully it's nothing fundamental and just a sign that ractors need further development.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment