Skip to content

Instantly share code, notes, and snippets.

@makoto
Created August 6, 2011 18:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save makoto/1129607 to your computer and use it in GitHub Desktop.
Save makoto/1129607 to your computer and use it in GitHub Desktop.
MoreRinda

More Rinda

Introduction

この章ではRindaの拡張例を見ていきます。

以下のコマンドでインストールできます。

gem install more_rinda

ソースコードは https://github.com/seki/MoreRinda にあります。"test"や"sample"のディレクトリに今回取り上げるもの以外にもいろいろなサンプルが置いてあるので一度覗いてみて下さい。

rinda_eval

前章では「新たなプロセスを起動する eval 操作は良いアイデアがなく用意できなかった」といいましたが、UNIX環境に限って稼働するものを作成しました。ここではLindaのevalがどういうものかを確認した後、rinda_evalの用法や実装を見て行きましょう。

Lindaのeval操作

Lindaでタプルを生成する操作にはoutとevalの二つがあります。out操作はRinda::TupleSpaceのwriteに対応します。

0から9までの二乗根のタプルの生成は次のようにします。

/* C-Linda */
for (i = 0; i < 10; i++)
    out("sqrt", i, sqrt(i));
/* Rinda */
10.times do |n|
  ts.write([:sqrt, n, Math.sqrt(n)])
end

eval操作はout操作にそっくりに見えますが、プロセスが生成される点が違います。なんと新しいプロセス側で引数の評価を行い、その結果からタプルを生成します。次の疑似コードは10個のプロセスを生成して、それぞれが一つのタプルを生成するものです。sqrtは生成されたプロセスで計算されます。

  /* C-Linda */
  for (i = 0; i < 10; i++)
      eval("sqrt", i, sqrt(i));

ふつうに考えたら、引数の評価をした後にプロセスが生成されそうに見えますが、C-Lindaはライブラリではなく、プリプロセッサか言語の拡張らしいのでそんな芸当ができるようです。

Rinda::rinda_eval

eval操作をRubyでポータブルに実装するのはちょっと難しいので、Rinda::rinda_evalはfork()を持つUNIX属だけを対象とすることにしました。このモジュールメソッドは新しいプロセスを生成しブロックを実行します。また、引数のタプルスペースへの参照をブロックへの引数として与えます。ブロックが返したArrayをタプルスペースに追加します。

  10.times do |n|
    Rinda::rinda_eval($ts) do |ts|
      [:sqrt, n, Math.sqrt(n)]
    end
  end

C-Lindaのevalみたいな字面を提供できなかったのは残念ですが、実用的なAPIになったと思います。この例では計算結果のタプルを生成するものですが、ワーカプロセスの生成に使うことが多いと思います。10個のワーカプロセスを生成する例を示します。これは無限ループですけど適当な条件で終了させてもよいですね。

Rinda::rinda_eval vs Thread

「わざわざ新しいプロセスをたちあげてなにがうれしいんだ?」と思う皆さんのために、簡単な例を通して説明して行きましょう。

まずはそれぞれ3秒、2秒、1秒かかるタスクを順に実行していった場合にかかった時間を計測してみましょう。

  require 'benchmark'
  def task(sec)
    sleep sec 
    puts "Took #{sec} sec"
  end

  puts Benchmark.measure{
    [3,2,1].each{|x| task x}
  }


  Took 3 sec
  Took 2 sec
  Took 1 sec
    0.000000   0.010000   0.010000 (  6.000387)

これにかかった時間は3+2+1秒の6秒になります。ここまでは普通ですよね。

この処理を並行してすすめるためにスレッドを使った例に書き換えてみましょう。

  require 'benchmark'
  def task(sec)
    sleep sec 
    puts "Took #{sec} sec"
  end

  puts Benchmark.measure{
    [3,2,1].map{|x| Thread.new{task x}}.map{|y| y.join}
  }

  Took 1 sec
  Took 2 sec
  Took 3 sec

    user     system      total        real
    0.010000   0.000000   0.010000 (  3.000373)

実行時間は一番大きなタスクの実行時間である3秒以内に収まりました。すばらしい。 でも待って下さい。sleepというのはOSのシステムコールを呼んでいるだけなので、実際にCPUが作業をしている訳ではありません。 そこでsleepを以下の用なメソッドで上書きしてみました。

  def sleep sec
    (sec * 10000000).times.each{}
  end

私の手元のマシン(OSx DualCore)では1000満開ループするのに0.8秒ほどかかっているのでsleepの代替としてちょうど良いです。

  Took 1 sec
  Took 2 sec
  Took 3 sec
  user     system      total        real
    5.640000   0.030000   5.670000 (  6.267379)
  copy output
  Program exited with code #0 after 6.32 seconds.

処理時間が一気に最初に戻ってしまいましたね。これの理由ですがRuby 1.8の場合はグリーンスレッドという仮想的なスレッドなためシングルコアしか使えず、ネイティブスレッドが使われている1.9でもGIL(Global Interpreter Lock)があるためマルチコアの恩恵に授かれないようになっています。そこでrinda_evalを使ったバージョンに書き換えてみましょう。

  require 'rinda/tuplespace'
  require 'rinda/eval'
  require 'benchmark'

  def sleep sec 
    (sec * 10000000).times.each{}
  end

  def task(sec)
    sleep sec 
    print "Took #{sec} sec"
  end

  place = Rinda::TupleSpace.new
  DRb.start_service

  puts Benchmark.measure{
    [3,2,1].map{|x| 
      Rinda::rinda_eval(place) do |ts|
        [:result, x, task(x)]
      end
      x
    }.map{|x|
      p place.read([:result, x, nil])    
    }
  }

  [~]$  ruby rind_eval_task.rb 
  Took 1 secTook 2 secTook 3 sec[:result, 3, nil]
  [:result, 2, nil]
  [:result, 1, nil]
    1.430000   0.010000   6.830000 (  3.623393)

少しスレッドバージョンよりもコード量が増えてしまいましたが、タスクごとに新しいプロセスを起動し、複数CPUコアを利用できることを実感できたのではないでしょうか? 実行時間も一番大きなタスクの実行時間である3秒ちょっとに収まりました

ちなみにJava VM上で走るJRubyはスレッドでマルチコアが使えます。

  [~]$ jruby thread_task.rb
  Took 1 sec
  Took 2 sec
  Took 3 sec
  user     system      total        real
    1.587000   0.000000   1.587000 (  1.543000)

これはスレッドバージョンをjRubyで走らせた結果です。実際にはJVMを立ち上げる作業そのものに時間が取られるのですが、それでも2秒ちょっとです。jRubyを使える環境でしたら迷わずjRubyを使えば良いと思いますが、MRIに依存している環境の場合、お手軽に並列処理をするためのツールとしてrinda_evalを忍ばせておくのも良いのではないでしょうか。

Rinda::rinda_evalの中身を覗いてみよう

ではお手軽並列機構であるrinda_evalの中身を覗いてみましょう。

  require 'drb/drb'
  require 'rinda/rinda'

  module Rinda
    module_function
    def rinda_eval(ts)
      Thread.pass # FIXME
      ts = DRbObject.new(ts) unless DRbObject === ts
      pid = fork do
        Thread.current['DRb'] = nil
        DRb.stop_service
        DRb.start_service
        place = TupleSpaceProxy.new(ts)
        tuple = yield(place)
        place.write(tuple) rescue nil
      end
      Process.detach(pid)
    end
  end

実は20行にもみたないコードで実装されているのですね。 いろいろ難しそうに見えますが肝は2点です。forkを通して子プロセスを起動している点と、子プロセスから親プロセスへの値渡しの手段としてTupleSpaceを使っている点です。

fork

forkというのは親プロセスのメモリ空間をコピーして新たに子プロセスを作成するUnixシステムコールをRubyから使うためのメソッドです。これのおかげで、親やプロセスから子プロセスに自由に値を渡す事はできても、子プロセスから値を受け取る事はできません。

  result = nil
  pid = fork do
   result = "hello"
  end
  p "result: #{result}, pid : #{pid}"

上の結果は"result: , pid : 72287"となります。

そこでタプルスペースを利用して値をやりとりします。

  parent = Rinda::TupleSpace.new
  DRb.start_service

  pid = fork do
   child = DRbObject.new(parent)
   DRb.start_service
   child.write([:result, "hello"])
  end
  _, result = parent.read([:result, nil])
  p "result: #{result}, pid : #{pid}"

結果は"result: hello, pid : 72317"となります。

これの凄い事なんですが、タプルスペースには値渡ししかできないのですが親プロセスから子プロセスへはクロージャがつかえるので、子プロセスにProc, Lambda, Blockなど渡し、演算結果のみ返す事ができます。

  calc = Proc.new{|a| a * 2}
  pid = fork do
   child = DRbObject.new(parent)
   DRb.start_service
   child.write([:result, calc[3]])
  end
  _, result = parent.read([:result, nil])
  p "result: #{result}, pid : #{pid}"

結果は"result: 6, pid : 72401"です。

以前TupleSpaceで分散階乗サービスを作る例がありましたが、あれの場合はあらかじめサーバサイドが計算式(この例の場合は階乗)を定義していなければいけません。しかしながらforkとタプルスペースを使った場合、あらゆる式を親プロセスで定義し、子プロセスに分散処理させたあとで結果だけもらう事ができます。これを汎用的な仕組みにしたのが20行弱のrinda_evalになります。

逆にTupleSpace分散階乗サービスと比較しての短所ですが、forkコマンドに依存しているため、複数のマシンにまたがっての分散処理はできません。

ptuplespace

Rindaのタプルスペースは複雑なプロセス間通信を容易してくれますが、タプルスペースすべてが揮発性のメモリに収まっているため、システムがクラッシュした際に全てのデータを失ってしまいます。そこがネックで使ってくれない人もいるようなので、自分なりに永続化のしくみとその制約にかんして考えてみました。

TupleSpaceの永続化

この節ではRinda::TupleSpaceをおさらいしながら、TupleSpaceに永続化したPTupleSpaceの概要を紹介します。

PTupleSpaceはTupleSpaceのサブクラスです。タプルの状態の変化を逐次二次記憶にログして、次回の起動に備えます。PTupleSpaceを再起動すると最後の(最新の)タプルの状態のままに復元されます。

PTupleSpaceの使い方

PTuplespaceを利用するのはきわめて簡単です。以下に利用例を載せます。

  require 'rinda/ptuplespace'
  store = Rinda::TupleStoreLog.new('ts_log')
  Rinda::setup_tuple_store(store)

  DRb.install_id_conv(Rinda::TupleStoreIdConv.new)
  ts = Rinda::PTupleSpace.new
  DRb.start_service('druby://localhost:23456', ts)
  ts.restore

  ts.write(['Hello', 'World'])
  p ts.read_all(['Hello', nil])
  p ts.take(['Hello', nil])

  x = ts.write(['Hello', 'cancel'], 2)
  p ts.read_all(['Hello', nil])
  ref = DRbObject.new(x)
  ref.cancel
  p ts.read_all(['Hello', nil])
  x = ts.write(['Hello', 'World'])

  p DRbObject.new(x)

肝心なのは Rinda::TupleStoreLog.newにファイル名を指定し、それをRinda::setup_tuple_store(store)に渡すだけです。

  DRb.install_id_conv(Rinda::TupleStoreIdConv.new)

これはGC(Garbage Collection)を回避するための仕組みです。詳細に関してはGCの章で取り扱います。

(注:"restore" "cancel"の用法に関して説明をしていただけますか? => 咳さん)

PTupleSpaceの制約

この節ではPTupleSpaceをKVSと比較した際の考察、そして永続化機能として考えた時に必須のクラッシュ&リカバリーに関して考えていきます。

KVS(Key Value Store)として使うことは可能?

APIの視点からストレージとしてのTupleSpaceをおさらいします。

TupleSpaceはタプル群を扱う集合構造です。同じ情報を複数持つことができるので、Bagと言えるでしょう。

最近の流行言葉にKVSという言葉ありますね。キーと値で表現するなら、同じキーを持つ要素の重複を許すストレージです。キーしかなくて、キーが値、にも見えますが。

これに対してHashは一つのキーに一つの値が関連付けられる辞書です。

TupleSpaceで辞書を模倣するのはやっかいです。[キー, 値]というタプルで辞書を構成仕様とした場合を考えてみましょう。まずデータを読むのは次のように書けそうです。

  @ts.read([キー, nil])

では要素の追加はどうでしょう。

  @ts.write([キー, 値])

このような単純なwriteでは重複を防ぐことはできません。全体をロックして、そのキーのタプルを削除してからwriteする必要があります。

  def []=(key, value)
    lock = @ts.take([:global_lock])
    @ts.take([key, nil], 0) rescue nil
    @ts.write([key, value])
  ensure
    @ts.write(lock) if lock
  end

このグローバルなロックは実はデータを読むときにも必要です。なぜなら、そのキーの情報を別のスレッドが更新中かもしれないからです。

  def [](key)
    lock = @ts.take([:global_lock])
    _, value = @ts.read([key, nil], 0) rescue nil
    return value
  ensure
    @ts.write(lock) if lock
  end

要素の増減がないケースでは前章で示した通り、グローバルなロックは不要です。だれかが更新中はその要素は取り出せませんが、更新が終わればまた書き戻されるはずです。ですから、単に要素が読めるまでreadで待ってしまえば良いことになり、局所的なロックとなります。

eachはどのように実装したらよいでしょう。TupleSpace全体を順に走査するうまい方法はありません。read_allで全ての要素のArrayを生成して、その配列にeachを委譲することになります。

  def each(&blk)
    lock = @ts.take([:global_lock])
    @ts.read_all([nil, nil]).each(&blk)
  ensure
    @ts.write(lock) if lock
  end

要素数が少ないうちは気になりませんが、多くなると損している気がしますね。

分散ハッシュテーブルなどでもeachやkeysを低コストで実装するのは難しいかもしれません。

流行のストレージには、常にキーでソートされているシーケンスを持つものがあります。並んでいることを利用して、大きな空間をブラウズするのが得意です。キーを工夫することでバージョン付きの情報を蓄えることもできます。RindaのTupleSpaceには、タプルを順序付けて並べることはできませんから、これを低コストで模倣するのは難しいです。

ところであなたが欲しかった集合は本当にHashでしたか?

「カンバン」としてのタプルとクラッシュ&リカバリーに対する考察

タプルは実世界の「カンバン」によく似ています。タプルをプロセス間でリレーしながら仕事を進めていく様子は、「カンバン」を持ち回って仕事を行うのにそっくりです。Rindaの世界では「カンバン」はTupleSpaceを介してプロセスからプロセスへ渡り歩きます。

PTupleSpaceの提供する永続化は、TupleSpaceに蓄えられたカンバンの束にのみ作用します。プロセスが持っているカンバンをPTupleSpaceが知ることはできず、永続化されません。また、待合せている様子も永続化の対象ではありません。プロセスがあるカンバンを待っている、という状況までは再現できないのです。

TupleSpaceに期待する機能がカンバンの貯蔵庫であると考えた場合には、これで充分と言えるでしょう。PTupleSpaceにwriteした情報は再起動後もそのまま手に入ります。多くのアプリケーションではこれで間に合うかもしれません。ArrayやHashをそのままdRubyで公開する、あるいはログ付きで公開するのに比べて、TupleSpaceはどのくらい便利なのでしょうか。おそらく、RindaのTupleSpaceの強力なパターンマッチングにはある程度のアドバンテージがあるでしょう。そのパターンマッチングと引き換えに、あまり効率のよいデータ構造を使うことができませんでした。実装には線形探索が残っていて、要素数が増えたときに不安があります。

TupleSpaceの本来の役割であるプロセス間の協調についてはどうでしょうか。PTupleSpaceに異常が起きてクラッシュしてしまった、再起動が必要になった、といった状況を想像してみましょう。まず、PTupleSpaceプロセスが停止することにより、readやtakeなどの待合せのRMIを実行していたプロセスではdRubyの例外があがります。PTupleSpaceが再起動されるとタプル群の最後の状態に復元されます。待合せをしていたプロセスは再起動したことを(知るのは難しいのですが)知ったのち、例外が発生した操作をやり直すことになります。しかし、そのように再開するスクリプトを書くのは難しく面倒です。

また、RMIのために抱え込む厄介な問題もあります。writeやtakeなど、タプルの状態を変える操作を考えてみましょう。通常のメソッド呼び出しでは処理が終われば呼び出した側に直ちに制御がもどりますが、RMIではサーバ側のメソッドの終了と、RMIの終了の間にソケット通信が行われます。つまり、処理が終わる前に例外が発生したのか、結果を伝える間に例外が発生したのか知ることができません。PTupleSpaceが二次記憶にタプルの操作をログしたあとに、クライアントにその完了が届く前にクラッシュしてしまう可能性があります。(全てがうまくいってからログする実装を選んでも、クライアントにタプルが届いたのち、ログするまえにクラッシュする可能性があります)

異常終了といえば、プロセス側のクラッシュも考えられますね。PTupleSpaceの対象外ですがちょっと想像してみましょう。カンバンをプロセスが取り出したままクラッシュしてしまうと、復元する方法がありません。次の短いスクリプトを見てみましょう。

  def succ
    _, count = @ts.take([:count, nil])
    count += 1
    yield(count)
  ensure
    @ts.write([:count, count) if count
  end

これは[:count, 整数]のタプルを取り出し、一つ大きくしてまた書き込むスクリプトです。カンバンを取り出し、カウンタを一つ進め、最後にTupleSpaceに書き戻します。カンバンがプロセスにある間は、別のプロセスはカンバンをTupleSpaceから読んだり、取り出したりすることはできないので安全にカウンタを操作できます。さて、もしもカンバンがプロセスにある間にそのプロセスがクラッシュしたらどうなるでしょう。PTupleSpaceは自身の中にあるカンバンしか復元できませんから、そのカンバンは失われたままです。このカウンタを操作するプロセス群は全て停まってしまいます。こういった使い方(協調に使うケースの多くはそうなんだと思うのですが)をする場合、TupleSpaceだけでなく関係するプロセス群も再起動する必要があるだけでなく、TupleSpace内のタプルも初期状態にする必要があります。せっかくタプルの状態を復元できるようにしたというのに‥。

PTupleSpaceはTupleSpace自体の永続化を目的としたもので、それ自体はおそらく期待した通りに動作すると思います(そういうつもりで作ったので)。しかし、それだけでは協調するプロセス群をもとに戻すことはできません。ちょっとだまされた気分ですよね。

Moving Ahead

この章では以下の事について学びました。

  • 並列計算をするためのrinda_eval
  • 永続化のしくみを提供するためのptuplespace

永続化に関しては可能な事は可能なのですが、TupleSpaceにただ永続化を提供するだけでは十分でなく、それに即した独自なものが必要なのではないかと考えるようになりました。次章ではその考えの末たどりついたDripについて解説していきます。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment