Scalaの並行処理とアクター、並列コレクション:スケーラブルで関数型でオブジェクト指向なScala入門(10)(2/3 ページ)
Scalaの特徴を紹介し、基本構文や関数、クラスなど、Scalaの基本的な機能について解説する入門連載
アクターを用いた並行処理
アクターとは、簡単に言えば「メールボックスを持つスレッド」です。アクターを使用する際には、利用者がメールボックスへメッセージを送信し、メールボックスからメッセージを取り出して使用します。
補足 今後のアクター
今年リリースされる予定のScalaの新バージョン「2.10」では、ここで使用している「scala.actors」が削除され、代わりにAkkaが取り込まれる予定になっています。
Akkaとは、Actorモデルによるプログラミングを行うためのScala/Java用フレームワークです(公式サイト)。
Scala標準のscala.actorsと比較すると、高パフォーマンス、さらに分散環境で負荷分散や耐障害性に強いという特性を持っていますが、基本的な使い方は共通する部分が多いため、本連載では標準のscala.actorsを紹介しています。
シンプルなアクターの実行
まずはシンプルなアクターを作成してみましょう。次のコードをREPLで入力してください。
import scala.actors.Actor import scala.actors.Actor._ class MyActor extends Actor { def act = { println("Hello MyActor") } }
Scala標準のアクターを使用するには、scala.actors.Actorクラスを継承し、actメソッドを実装します。では定義したアクターを開始してみましょう。MyActorをインスタンス化してstartメソッドを実行します。
scala> val ma = new MyActor ma: MyActor = MyActor@d394424 scala> ma.start res0: scala.actors.Actor = MyActor@d394424 Hello MyActor
actメソッドが実行されて文字列が表示されました。また、scala.actors.Actorsのファクトリメソッドを使用してアクターを直接生成する方法もあります。
scala> val ma2 = actor { println("Hello MyActor for factory") } Hello MyActor for factory ma2: scala.actors.Actor = scala.actors.Actor$$anon$1@3f894e9c
この場合、startメソッドを呼ばなくてもそのままアクターが開始されます。
メッセージパッシング
先ほど「アクターは利用者がメールボックスへメッセージを送信し、メールボックスからメッセージを取り出して使用する」と説明しましたが、これは「メッセージパッシング」と呼ばれる、アクターが他スレッドとデータをやりとりするための機能です。
メッセージパッシングを使用すれば、アクターはテキストデータのようなシンプルなデータから自分で作成したクラスまで、どんなオブジェクトでも受け取れます。
アクターがメッセージを受け取ったとき、処理を記述するためのメソッドは下記の種類があります。
- receive
マッチした型のメッセージを受け取るまでブロックし、結果を返せる - receiveWithin
receiveと同じだが、指定時間を過ぎるとブロックを解除 - react
結果を返しない。reveiveより高速に動作 - reactWithin
receiveと同じだが、指定時間を過ぎるとブロックを解除
では、receiveを試してみましょう。まずはアクターを定義します。
import scala.actors.Actor import scala.actors.Actor._ val myActor = actor { val result = receive { case i:Int => println("receive = " + i) "Int:" + i } println("result =" + result) }
actorブロック内のreceiveブロックがメッセージ受信時に行う処理です。パターンマッチを使用してInt型のメッセージを受信した場合にコンソールに文字を出力し、結果文字列を返しています。
「!」メソッド
ではアクターにメッセージを送ってみましょう。メッセージを送信するには
<アクター> ! <メッセージ>
という形式で、「!」メソッドを使ってメッセージを送信します。
scala> myActor ! 1 receive = 1 result =Int:1
「!」メソッドは非同期でメッセージの送信を行うためのメソッドです。
「!?」メソッド
また、「!?」メソッドを使うと同期メッセージを送ることが可能です。サンプルではInt型の値がアクターに送信され、receiveブロック内で処理されています。receiveの代わりにreactを使うと、結果を返せませんが、より高速に動作します。
import scala.actors.Actor import scala.actors.Actor._ val myActor = actor { react { case i:Int => println("react = " + i) } } scala> myActor ! 1 react = 1
なお、myActorへのメッセージ送信を2回以上行っても、何も反応がないと思います。これは、アクターの受信を繰り返すようになっていないためです。myActor.restartメソッドを実行すればアクターが再起動され、メールボックス内のメッセージが1件処理されます。
scala> myActor ! 100 //2回目以降は何も反応がない scala> myActor.restart react = 100
メッセージを送るたびにrestartを呼ぶのは面倒なので、受信を続けるようにしましょう。受信を続けるようにするには、loopを使用します。
import scala.actors.Actor import scala.actors.Actor._ val myActor = actor { loop { react { case i:Int => println("loop react = " + i) } } } scala> myActor ! 1 loop react = 1 scala> myActor ! 2 loop react 2
こうすれば何度メッセージを送っても、メッセージが処理されるようになります。
また、replyメソッドを使用すれば、メッセージを受け取って送信元へ返信することもできます。
import scala.actors.Actor import scala.actors.Actor._ val myActor = actor { loop { react { case i:Int => println("react = " + i) reply(i + 1) } } }
同期メッセージ(!?)を送り、reply結果を受け取ってみましょう。myActorに渡した数値がインクリメントされた結果を受け取れます。
scala> val x = myActor !? 10 react = 10 x: Any = 11
「!!」メソッド
最後にもう1つ、「!!」というメソッドでメッセージを送ってみましょう。これは非同期でメッセージを送信し、返信を受け取るためのFutureオブジェクトを返します。返されたFutureオブジェクトのapply()メソッドを呼べば返信を取得できます。
なおapply()メソッドは、返信が返ってくるまで処理をブロックするので、メッセージを送信してから返信が返ってくるまでに別の処理を行いたい場合に便利です。
また、このメソッドはコールバック関数を渡して処理を行うことできます。
scala> myActor !! (10,{ case i:Int => println("result callback = " + i)}) react 10 result callback = 11 res14: myActor.Future[Unit] = <function0>
アクターの注意点
以上がアクターの基本的な使用方法です。アクター同士は同じデータを共有しないので競合を気にする必要はありませんが、メッセージとして送信されたオブジェクトは共有される可能性があります。同じオブジェクトを複数のアクターとやり取りする可能性がある場合には注意してください。
しかし、オブジェクトが不変であれば、そこも気にする必要がなくなるので、不変オブジェクトが推奨されています。
Copyright © ITmedia, Inc. All Rights Reserved.