Skip to content

Instantly share code, notes, and snippets.

@yuroyoro yuroyoro/sample.rb
Created Feb 15, 2016

Embed
What would you like to do?
青空文庫からテキスト形式で100本ダウンロード、zip解凍し、`OpenSSL::Cipher` でAES256bitで暗号化する処理を、ThreadとWorkerクラスでのマルチプロセス化とで比較するサンプルプログラム
#!/usr/bin/env ruby
require 'openssl'
require 'base64'
require './worker.rb'
$urls = %w(
http://www.aozora.gr.jp/cards/001779/files/56647_ruby_58166.zip
http://www.aozora.gr.jp/cards/000148/files/752_ruby_2438.zip
http://www.aozora.gr.jp/cards/001383/files/56866_ruby_58168.zip
http://www.aozora.gr.jp/cards/000148/files/789_ruby_5639.zip
http://www.aozora.gr.jp/cards/000035/files/301_ruby_5915.zip
http://www.aozora.gr.jp/cards/000096/files/2093_ruby_28087.zip
http://www.aozora.gr.jp/cards/000081/files/456_ruby_145.zip
http://www.aozora.gr.jp/cards/000879/files/127_ruby_150.zip
http://www.aozora.gr.jp/cards/001562/files/52409_ruby_51058.zip
http://www.aozora.gr.jp/cards/001562/files/52410_ruby_51060.zip
http://www.aozora.gr.jp/cards/000156/files/1465_ruby_16804.zip
http://www.aozora.gr.jp/cards/000035/files/1565_ruby_8220.zip
http://www.aozora.gr.jp/cards/000879/files/92_ruby_164.zip
http://www.aozora.gr.jp/cards/000035/files/1567_ruby_4948.zip
http://www.aozora.gr.jp/cards/000148/files/794_ruby_4237.zip
http://www.aozora.gr.jp/cards/001566/files/52504_ruby_49666.zip
http://www.aozora.gr.jp/cards/000296/files/47061_ruby_28378.zip
http://www.aozora.gr.jp/cards/001235/files/49866_ruby_41853.zip
http://www.aozora.gr.jp/cards/001562/files/56758_ruby_57843.zip
http://www.aozora.gr.jp/cards/000074/files/424_ruby_19825.zip
http://www.aozora.gr.jp/cards/000052/files/5016_ruby_9746.zip
).cycle(5).to_a
$password = 'password'
$salt = OpenSSL::Random.random_bytes(8)
def crawl(url)
puts "pid #{Process.pid} : crawling #{url}"
IO.popen("curl -s #{url} | funzip -c", external_encoding: "SJIS").read
end
def encrypt(url, text, times)
puts "pid #{Process.pid} : encrypting #{url} #{times} times"
result = nil
times.times do
cipher = OpenSSL::Cipher::Cipher.new("AES-256-CBC")
cipher.encrypt
cipher.pkcs5_keyivgen($password, $salt)
result = cipher.update(text) + cipher.final
end
result
end
def crawl_and_encrypt(times, *urls)
urls.map{|url|
text = crawl(url)
Base64.encode64(encrypt(url, text, times))
}
end
def run_parallelly(num, times)
workers = num.times.map{
Worker.new{|times, *urls|
crawl_and_encrypt(times, *urls)
}
}
puts "run parallelly #{num} processes"
workers.map(&:run).each(&:join)
threads = $urls.zip(workers.cycle).group_by(&:last).map{|worker, args|
args = args.map(&:first).unshift(times)
puts "start worker pid #{worker.pid}"
worker.execute(*args)
}
threads.each(&:join)
workers.each(&:stop)
results = threads.flat_map(&:value)
end
def run_concurrently(num, times)
puts "run concurrently #{num} threads"
threads = $urls.zip(num.times.cycle).group_by(&:last).flat_map{|_, args|
Thread.new {
urls = args.map(&:first)
crawl_and_encrypt(times, *urls)
}
}
threads.each(&:join)
results = threads.flat_map(&:value)
end
def run_serially(times)
puts "run serially"
crawl_and_encrypt(times, *$urls)
end
mode, times, num = ARGV.to_a
results = case mode
when "parallel" then run_parallelly(num.to_i, times.to_i)
when "concurrent" then run_concurrently(num.to_i, times.to_i)
when "serial" then run_serially(times.to_i)
end
puts "encrypted total size: #{results.map(&:bytesize).inject(&:+)} bytes"
# 子プロセスを管理するWorkerクラス
class Worker
attr_reader :pid
def initialize(&block)
@child_read, @parent_write = create_pipe # 親から子へのpipe
@parent_read, @child_write = create_pipe # 子から親へのpipe
@block = block # forkして実行する処理
end
def create_pipe
# Marshal.dumpの結果はASCII-8BITなのでpipeのエンコーディングもあわせる
IO.pipe.map{|pipe| pipe.tap{|_| _.set_encoding("ASCII-8BIT", "ASCII-8BIT") } }
end
# 子プロセスの起動処理
def run
@pid = fork do # forkする
# 子で使わないpipeは閉じる
@parent_read.close
@parent_write.close
# 親プロセスに起動終了を伝える
write_to_parent(:ready)
loop do
# 親からの依頼待ち
args = read_from_parent
# stopが飛んで来たらloopを抜けて子プロセスを終了させる
break if args == :stop
# 処理を実行する
result = @block.call(*args)
# 結果をpipeに書き込んで完了を親に伝える
write_object(result, @child_write)
end
@child_read.close
@child_write.close
end
wait_after_fork if @pid
end
# 子プロセスに処理を行わせる
def execute(*msg)
write_to_child(msg)
Thread.new { read_from_child } # Threadを起こして子からpipeに書き込まれるのを待つ
end
def stop
return unless alive?
# 子を終了させる
write_to_child(:stop)
# waitpidで子プロセスを回収する
Process.wait(@pid)
end
def alive?
Process.kill(0, @pid)
true
rescue Errno::ESRCH
false
end
def write_object(obj, write)
# RubyオブジェクトをMarshalしてpipeに書き込む
# 改行をデリミタにする
data = Marshal.dump(obj).gsub("\n", '\n') + "\n"
write.write data
end
def read_object(read)
# pipeから読み込んだデータをRubyオブジェクトに復元する
data = read.gets
Marshal.load(data.chomp.gsub('\n', "\n"))
end
def read_from_child
read_object(@parent_read)
end
def write_to_child(obj)
write_object(obj, @parent_write)
end
def read_from_parent
read_object(@child_read)
end
def write_to_parent(obj)
write_object(obj, @child_write)
end
def wait_after_fork
@child_read.close
@child_write.close
install_exit_handler
install_signal_handler
# 子から起動完了が通知されるまで待つ
Thread.new {
result = read_from_child
raise "Failed to start worker pid #{ @pid }" unless result == :ready
result
}
end
def install_exit_handler
# Kernel#at_exitで子を回収
at_exit do
next unless alive?
begin
Process.kill("KILL", @pid)
Process.wait(@pid)
rescue Errno::ESRCH
# noop
rescue => e
puts "error at_exit: #{ e }"
raise e
end
end
end
def install_signal_handler
# 親のSIGINT, SIGQUITは子プロセスにも転送する
[:INT, :QUIT].each do |signal|
old_handler = Signal.trap(signal) {
Process.kill(signal, @pid)
Process.wait(@pid)
old_handler.call
}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.