RubyのThread、Fiber、Kernel、forkで並列処理やプロセスの深淵へ若手エンジニア/初心者のためのRuby 2.1入門(11)(2/3 ページ)

» 2014年12月15日 18時00分 公開
[著:麻田優真、監修:山根剛司株式会社アジャイルウェア]

協調的なスレッドを実現する「軽量スレッド」Fiberクラス

 前ページまでで、Threadクラスを用いたスレッドの生成とその利用方法の基本を学びました。ここでは、いわゆる「軽量スレッド」と呼ばれる、Fiberクラスについて見ていきましょう。

Fiberクラスの特徴

 Fiberクラスを使うと、Threadクラスのようにスレッドを実現できるのですが、こちらは少し毛色が違います。FiberクラスではThreadクラスと違い、Fiberオブジェクトを生成した親のコンテキストと、Fiberオブジェクト内での子コンテキストの間を、協調的に互いに行ったり来たりすることができます。

Fiberクラスの使用例

 言葉では伝わりづらいので、fiber01.rbに利用例を示しましょう。

g = Fiber.new do |x|
  loop { Fiber.yield(x); x += 1 }
end
 
5.times { puts g.resume(0) }
fiber01.rb
$ ruby fiber01.rb
0 
1
2
3
4
fiber01.rbの実行結果

 では、処理の流れを追いましょう。まず1行目で「Fiber.new」によってFiberオブジェクトを生成し、変数gに代入しています。ファイバーの中で実行する処理は、`Fiber.new`にブロックとして渡します。

 2行目がファイバーの中で実行する処理の本体です。ここでは、loopを使って、「Fiber.yield(x)」を実行したのち、xに1を足し込む処理を無限ループとして記述しています。実は、このyieldがファイバーの動きを理解する鍵となります。

 5行目では、ファイバー「g」の「resume」メソッドに引数に0を与えてファイバーを実行しています。ファイバーはスレッドと異なり、明示的に「resume」によって動かさない限り、動くことはありません。

 「resume」メソッドに与えられた0はファイバーのブロック変数「x」に格納され、2行目に処理が移ります。ファイバー内で「Fiber.yield」メソッドが実行されると、「yield」に与えた引数を返り値として返して、ファイバー内での実行は一時的に止まります。つまり、2行目の「Fiber.yield(x)」に出会った時点で、5行目での1回目のループでは0を出力して次のループに移行します。

 5行目の2回目のループで再び「resume」メソッドが実行されると、ファイバー「g」は処理を再開し「x += 1」が実行されて変数「x」の中身が1になります。そのあと、2行目がloopによって囲まれているので、再び「Fiber.yield(x)」が実行され、5行目に処理が戻り、1が出力されます。このような動作を繰り返すことで、0から4の整数が生成されます。

関数型言語っぽい「ジェネレーター」とは

 この考え方は、一般的に「ジェネレーター」と呼ばれ、その名の通り、何がしかのデータ列を生成するようなものです。ジェネレーターの中の世界では、あくまで無限に1ずつ大きくした整数を生成し続けているだけです。

 似たような考え方は、関数型言語において、無限長のリストを生成するような関数を定義して、遅延評価によって一定数の要素を持つリストを得るような場合に相当します。

ThreadやFiberはそれだけで本が1冊できるほど複雑

 ここでは、スレッドやファイバーの基本的な使い方を紹介しました。実のところ、並列処理では、データの整合性を保つことが難しい、デバッグが困難であるなど、並列処理それ自身で1冊の本になってしまうような複雑さを持っています。

 ThreadクラスやFiberクラスは、ここでは紹介していないですが、もっと複雑なことを行うためのメソッドを持っています。詳細は、Rubyの公式ページなどを参照してください。

「グルー言語」でもあるRubyから“プロセス”を操る

 ここまでスレッド処理を行うためのThreadクラスやFiberクラスについて紹介しました。Rubyはスレッドをサポートする一方で、システムで利用できる別のプログラムを起動して、それらをつなげるような「グルー言語」としての側面も持ちます。

 ここからは、Rubyスクリプトから他のプログラムを起動し、それらを操る方法を学びます。

最も簡単なのは「`」(バッククオート)で囲む方法

 Rubyスクリプトから他のコマンドを起動する最も簡単な方法は、「`」(バッククオート)を使うやり方でしょう。pryで確認してみます。

[1] pry(main)> `date`
=> "Sat Nov 22 10:23:12 JST 2014\n"

 他のコマンドを起動して出力を得るだけなら簡単です。例のように、バッククオートに実行したいコマンドを囲むだけです。コマンドの出力は文字列として返ります。

標準出力か標準エラー出力に流れる「Kernel.#system」メソッド

 もう一つ、Kernelモジュールに定義されている「system」メソッドを使う方法もあります。pryで実行してみましょう。

[2] pry(main)> system("date")
Sat Nov 22 10:26:33 JST 2014
=> true

 「system」メソッドの場合は、出力はプログラムの呼び出し元と同じ標準出力か標準エラー出力に流れます。「system」メソッドの返り値はtrueかfalseで、コマンドの終了ステータスが0(正常)ならtrue、それ以外(たいていの場合はエラーで終了した場合)はfalseが返ります。

Rubyのプロセスと共に終了する「Kernel.#exec」メソッド

 他にも、「exec」メソッドを使う方法もあります。ただし、「exec」メソッドを使った場合は、「system」メソッドのように実行が返ってくることはなく、そのまま「exec」を実行したRubyのプロセスと共に終了します。

 pryで試してみましょう。「exec "date"」を実行して日付が出力された後、pryが終了することが分かるでしょう。

[8] pry(main)> exec "date"
Sat Nov 22 11:30:50 JST 2014

時間のかかるコマンドを非同期に実行する場合の「Process.spawn」「Kernel.#spawn」メソッド

 時間のかかるコマンドを非同期に実行したい場合は、「Process.spawn」メソッドが役に立ちます。Kernelモジュールにも同等な「spawn」メソッドが定義されているので、単に「spawn」だけでも動きます。

 利用例をprocess01.rbに示しましょう。

pid = Process.spawn "sleep 5; echo terminated" # spawn でも
puts "waiting..."
puts "waiting..."
puts "waiting..."
Process.waitpid(pid)
process01.rb
$ ruby process01.rb 
waiting...
waiting...
waiting...
terminated
process01.rbの実行結果

 1行目で新しくプロセスを立ち上げて、sleepコマンドとechoコマンドを実行しています。sleepは指定した秒数処理を止めるコマンドで、ここでは5秒スリープした後に、標準出力に「terminated」という文字を流すようなプロセスを立ち上げています。

 2〜4行目では、3回「puts」を実行して、標準出力に「waiting...」という文字列を流しています。

 「spawn」で立ち上げたプロセスは非同期的に実行されるので、ここまでの4行だけではRubyスクリプトが終了してしまいます。そこで、終了を待つ場合には5行目のように「Process.waitpid」メソッドを使って終了を待ちます。1行目でpidにプロセスIDが格納されるので、5行目の「waitpid」にそのプロセスIDを与えています。

 実行結果を見ると分かるように、非同期にsleepとechoが実行されているので、先に「waiting...」が表示されてから「terminated」が表示されています。「system」やバッククオートによる実行だと、5秒待って「terminated」と画面に表示されてから「waiting...」が3回表示されることになります。

他のプロセスの出力を処理する「IO.popen」メソッド

 バッククオートや「system」メソッドを使った場合は、呼び出したコマンドの処理が終わるまで出力を得ることができません。

 ですので、pingコマンドのように、標準出力にひたすら出力が行われるような場合にうまく扱うことができません。このような場合、「IO.popen」メソッドを使うと、次々に送られてくる出力を処理できます。process02.rbに例を示しましょう。

IO.popen("ping -c 10 localhost", "r") do |pipe|
  while line = pipe.gets
    line.match(/time=(\d\.\d+) ms/)
    puts line if $1.to_f < 0.1
  end
end
process02.rb
$ ruby process01.rb 
PING localhost (127.0.0.1): 56 data bytes
64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.046 ms
64 bytes from 127.0.0.1: icmp_seq=1 ttl=64 time=0.066 ms
64 bytes from 127.0.0.1: icmp_seq=2 ttl=64 time=0.035 ms
64 bytes from 127.0.0.1: icmp_seq=4 ttl=64 time=0.038 ms
64 bytes from 127.0.0.1: icmp_seq=5 ttl=64 time=0.077 ms
64 bytes from 127.0.0.1: icmp_seq=9 ttl=64 time=0.035 ms
 
--- localhost ping statistics ---
10 packets transmitted, 10 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 0.035/0.099/0.246/0.073 ms
process02.rbの実行例

 1行目の「IO.popen」で、localhostに対してpingを10回打つようにpingコマンドを実行し、ブロック変数「pipe」にIOオブジェクトが入ります。pingコマンドの出力をパイプに流し、パイプから流れてきたデータを、「gets」メソッドで取り出すイメージです。

 「pipe」はIOオブジェクトですので、ファイルや標準入出力の読み書きと同じ要領でデータを取り出せます。2行目でpipeから1行出力を取り出し、変数lineに格納しています。前回紹介した「io05.rb」と全く同じようなパターンになっていることに注意してください。

 このように、他のコマンドの入出力ですらIOオブジェクトとして抽象化されていて、同じように扱えるようになっています。ここでは、簡単に説明するために出力を取り出しているだけですが、もちろん入力することもできます。これは素晴らしいことです!

 3行目は正規表現が使われていて少しややこしいですが、pingの出力の「time=0.046 ms」の数値の部分だけを抜き出すような正規表現になっています。マッチした小数の部分は、グローバル変数「$1」に格納されます。

 4行目は「$1」の値を実数に変換して、0.1より小さければ標準出力にpingの結果をそのまま流します。0.1より大きければ、そのまま捨ててしまいます。もし3行目でマッチしなかった場合(「PING localhost (127.0.0.1): 56 data bytes」の行など)は、「$1」にnilが格納され、「nil.to_f」の結果は0.0となるので、その行はうまい具合に出力されます。

 このように、他のプロセスの出力をフィルターするようなプログラムも、IOオブジェクトを利用することで簡単に作ることができます。

Copyright © ITmedia, Inc. All Rights Reserved.

RSSについて

アイティメディアIDについて

メールマガジン登録

@ITのメールマガジンは、 もちろん、すべて無料です。ぜひメールマガジンをご購読ください。