Skip to content

Instantly share code, notes, and snippets.

@leonid-shevtsov
Created March 13, 2024 21:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leonid-shevtsov/c99c05149a7fa03b54afcb1419e3d68c to your computer and use it in GitHub Desktop.
Save leonid-shevtsov/c99c05149a7fa03b54afcb1419e3d68c to your computer and use it in GitHub Desktop.
require 'json'
require 'strscan'
FILENAME = ARGV[0]
POOLSIZE = 6
BUFSIZE = 128 * 1024 * 1024
pool = Array.new(POOLSIZE) do |_idx| # rubocop:disable Metrics/BlockLength
outpipe_out, outpipe_in = IO.pipe(encoding: Encoding::BINARY)
cmdpipe_out, cmdpipe_in = IO.pipe(encoding: Encoding::BINARY)
inpipe_out, inpipe_in = IO.pipe(encoding: Encoding::BINARY)
pid = fork do
inpipe_in.close
cmdpipe_in.close
outpipe_out.close
loop do
break if cmdpipe_out.eof?
len = cmdpipe_out.gets.to_i
part = inpipe_out.read(len)
stats = Hash.new { |h, k| h[k] = [10_000, -10_000, 0, 0] }
scanner = StringScanner.new(part)
loop do
city = scanner.scan_until(/;/)
temp_s = scanner.scan_until(/\n/)
temp = temp_s.to_f
existing = stats[city]
min, max, sum, count = existing
existing[0] = temp if temp < min
existing[1] = temp if temp > max
existing[2] = sum + temp
existing[3] = count + 1
break if scanner.eos?
end
outpipe_in.puts(JSON.generate(stats))
end
outpipe_in.close
end
outpipe_in.close
inpipe_out.close
cmdpipe_out.close
{
pid:, cmdpipe_in:, inpipe_in:, outpipe_out:
}
end
collectpipe_out, collectpipe_in = IO.pipe(encoding: Encoding::BINARY)
fork do
collectpipe_out.close
pool.each do |w|
w[:inpipe_in].close
w[:cmdpipe_in].close
end
statparts = []
output_pipes = pool.map { |w| w[:outpipe_out] }
loop do
ready_io = IO.select(output_pipes).first.first
if ready_io.eof?
output_pipes.delete(ready_io)
break if output_pipes.empty?
next
end
statparts << JSON.parse(ready_io.gets)
end
stats = {}.merge(*statparts) do |_, s1, s2|
[
s1[0] < s2[0] ? s1[0] : s2[0],
s1[1] > s2[1] ? s1[1] : s2[1],
s1[2] + s2[2],
s1[3] + s2[3]
]
end
collectpipe_in.puts JSON.generate(stats)
end
collectpipe_in.close
workeridx = 0
File.open(FILENAME, encoding: Encoding::BINARY) do |f|
loop do
buf = String.new('', encoding: Encoding::BINARY, capacity: BUFSIZE + 1000)
break if f.read(BUFSIZE, buf).nil?
begin
remainder = f.readline("\n")
buf.bytesplice(buf.length, buf.length, remainder)
rescue EOFError
# we still need to handle the part, but there is no remainder
''
end
workeridx = (workeridx + 1) % pool.length
worker = pool[workeridx]
# puts "writing #{buf.length}"
worker[:cmdpipe_in].puts(buf.length.to_s)
worker[:inpipe_in].write(buf)
end
rescue EOFError
# finished processing the file
end
pool.each do |worker|
worker[:cmdpipe_in].close
worker[:inpipe_in].close
# Process.wait(worker[:pid])
end
stats = JSON.parse(collectpipe_out.gets)
exit if ENV['NOOUT']
puts JSON.pretty_generate(stats.sort.map do |(city, cstats)|
"#{city[0..-2]}=#{cstats[0]}/#{format('%0.1f', (cstats[2] / cstats[3]))}/#{cstats[1]}"
end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment