Skip to content

Instantly share code, notes, and snippets.

@harai harai/gigasort.rb
Created Jun 20, 2015

Embed
What would you like to do?
10GBの数値データをソートするプログラム
require 'securerandom'
INIT_CHUNK_LINES = 10000000
WRITE_CHUNK_LINES = 1000000
def find_smallest_pair(path)
files = Dir["#{path}/*"].sort_by do |f|
File.size(f)
end
if files.size < 2
nil
else
[files[0], files[1]]
end
end
def new_file(dir)
name = "#{dir}/#{SecureRandom.uuid}.txt"
open(name, 'w') do |f|
yield f
end
end
def merge(reader1, reader2, writer)
until reader1.eof? || reader2.eof?
writer.put(reader1.peek < reader2.peek ? reader1.get : reader2.get)
end
remaining = reader1.eof? ? reader2 : reader1
until remaining.eof?
writer.put(remaining.get)
end
writer.close
end
class Writer
def initialize(output)
@output = output
@buf_arr = []
end
def put(v)
@buf_arr << v
if WRITE_CHUNK_LINES <= @buf_arr.size
@output.puts @buf_arr.join("\n")
@buf_arr = []
end
end
def close
@output.puts @buf_arr.join("\n") if 0 < @buf_arr.size
end
end
class Reader
def initialize(input)
@input = input
@buf = nil
end
def peek
if @buf
@buf
else
@buf = @input.gets.chomp.to_i
end
end
def get
if @buf
v = @buf
@buf = nil
v
else
@input.gets.chomp.to_i
end
end
def eof?
if @buf
false
else
return true if @input.eof?
s = @input.gets.chomp
return true if s.size == 0
@buf = s.to_i
false
end
end
end
def initial_chunking(input, out_dir)
reader = Reader.new(input)
until reader.eof?
create_chunk(reader, out_dir)
end
end
def create_chunk(reader, out_dir)
nums = []
while nums.size < INIT_CHUNK_LINES && !reader.eof?
nums << reader.get
end
return if nums.size == 0
new_file(out_dir) do |output|
output.puts nums.sort.join("\n")
end
$stderr.puts 'Created a chunk.'
end
def merge_files(input_path1, input_path2, output_dir)
open(input_path1, 'r') do |input1|
open(input_path2, 'r') do |input2|
new_file(output_dir) do |output|
merge(Reader.new(input1), Reader.new(input2), Writer.new(output))
end
end
end
File.delete(input_path1)
File.delete(input_path2)
$stderr.puts "#{input_path1} >=< #{input_path2}"
end
def merge_and_reduce(dir)
while (pair = find_smallest_pair(dir)) && pair.length == 2
merge_files(pair[0], pair[1], dir)
end
end
def gigasort(input_file, work_dir)
open(input_file, 'r') do |f|
initial_chunking(f, work_dir)
end
merge_and_reduce(work_dir)
end
# gigasort('10gb.txt', 'work')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.