Scalaの並行処理とアクター、並列コレクションスケーラブルで関数型でオブジェクト指向なScala入門(10)(1/3 ページ)

Scalaの特徴を紹介し、基本構文や関数、クラスなど、Scalaの基本的な機能について解説する入門連載

» 2012年09月06日 00時00分 公開
[中村修太クラスメソッド株式会社]

前回のおさらいと今回の内容

 前回の記事「Scalaの抽象型と暗黙の型変換/引数、パラメータ制約」では、「implicit」キーワードを使用した暗黙の型変換暗黙のパラメータなどを紹介しました。

 今回はScalaが標準で持っている並行処理用ライブラリと並列コレクションを紹介します。

 第1回記事では、Scala標準のREPLScala IDEで動作を確認してみました。今後本記事のサンプルコードは、どちらで確認しても問題はありませんが、対話的に実行でき、1文ごとにコードの結果が分かって便利なので、基本的にはREPLを用いて説明していきます。

 Scala IDEを使用する場合、第1回記事の『Scala IDE for Eclipseで「Hello Scala!」』を参照してプロジェクトを作成して実行してください。REPLを使用する場合は、コンソール上でscalaコマンドを実行し、REPLを起動してください。

Javaでの並行処理とその問題

 マルチコアCPUが一般的なPCでも当たり前になっている昨今、プログラムの並行処理で無駄をなくし、パフォーマンスを向上させることは当然の技術となっています。

共有データとロックを使用するスレッドモデル

 Javaの場合、共有データロックを使用するスレッドモデルを持っています。Javaで並行処理を実行するにはConcurrent API(「java.util.concurrent」パッケージのAPI)を使用したり、「synchronized」キーワードを使用して共有データに対するアクセスをコントロールして並行処理を実現します。

 Javaの並行処理に関する基本的な部分は、下記の記事を参考にしてみてください。

 しかし、このモデルの場合、アプリケーションの処理が複雑化してくると、信頼性が高いマルチスレッドのアプリケーションを作ろうとした場合に難しくなります。

 例えば、共有データを制御する範囲やデータにロックをかけるタイミング、そしてデッドロックの回避策などをしっかりと考えて実装する必要があります。

 なお、Javaでは5.0から並行処理ライブラリとして、「java.util.concurrent」が導入されました。このAPIを使用すればマルチスレッドプログラミングにおいて、以前よりも問題を起こしにくくなります。

Concurrent APIの使用例

 ではjava.util.concurrent APIを使用した簡単なサンプルを見てみましょう。下記のサンプルはJavaで記述されています。mainメソッドでExecutorsクラスを使用し、ThreadSampleのrunメソッドを別スレッドで実行しています。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ThreadSample implements Runnable {
 
    private String name = "";
 
    public ThreadSample(String name) {
        this.name = name;
    }
 
    /**
     * ExecutorService#executeで実行されるメソッド.
     */
    public void run() {
        for (int i = 0; i < 3; i++) {
            try {
                System.out.println(name + "count=" + i);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
        System.out.println("ThreadSample#main start");
        // ExecutorService作成
        ExecutorService ex = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 3; i++) {
            ex.execute(new ThreadSample("[Thread-" + i + "]"));
        }
        ex.shutdown();
        System.out.println("ThreadSample#main end");
    }
}

 上記プログラムを実行すると、私の環境では下記のような順番で処理が実行されました。別スレッドで1秒ごとにカウントアップしています。

ThreadSumple#main start
[Thread-0]count=0
ThreadSumple#main end
[Thread-0]count=1
[Thread-0]count=2
[Thread-1]count=0
[Thread-1]count=1
[Thread-1]count=2
[Thread-2]count=0
[Thread-2]count=1
[Thread-2]count=2

 java.util.concurrent APIには、スレッドプーリングと再利用の仕組みやスケジューリング非同期処理に関する機能もあり、マルチスレッドアプリケーションの実装負荷を大幅に軽減できます。しかし、共有データとロックのモデルという基本的な仕組みは変わっていません。

Scalaで並行処理を行うための3つの方法

 では、Scalaで並行処理を実行したい場合はどのようにすればよいのでしょうか。いくつか方法はあるので、順番に紹介していきましょう。

【1】Threadクラスやsynchronizedを使う

 1つ目は、通常のJavaと同じくThreadクラスやsynchronizedを使う方法です。ScalaからJavaのクラスはそのまま使えるので、Threadクラスはそのまま使えます。

 また、Runnableインターフェイスはトレイトとして扱い、synchronizedキーワードは、「AnyRef」クラスのメソッドとして提供されています。先ほどのJavaサンプルをThreadを直接使用する形でScalaに書き直してみましょう(※このサンプルアプリはEclipseで実行してください)。

object Main {
    def main(args: Array[String]) = {
        println("ThreadSample#main start");
        for (i <- 0 to 3) {
            val t = new Thread(new ThreadSample("[Thread-" + i + "]"))
            t.start()
        }
        println("ThreadSample#main end")
    }
}
 
class ThreadSample(name: String) extends Runnable {
    def run() = {
        for (i <- 0 to 3) {
            println(name + "count=" + i)
            Thread.sleep(1000)
        }
    }
}

 プログラムを実行してみると、問題なく動作することが分かります。Scalaでは例外をキャッチする必要がないので余計なtry-catch文がなく、シンプルな記述ができてますね。

【2】java.util.concurrent APIをScalaから使う

 2つ目は先ほど説明したjava.util.concurrent APIをScalaから使う方法です。単純にScalaの構文に置き換えるだけで動作します(※このサンプルアプリはEclipseで実行してください)。

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
 
object Main {
    def main(args: Array[String]) = {
        println("ThreadSumple#main start")
        // ExecutorService作成
        val ex = Executors.newSingleThreadExecutor()
        for (i <- 0 to 3) {
            ex.execute(new ThreadSample("[Thread-" + i + "]"))
        }
        ex.shutdown()
        println("ThreadSumple#main end")
    }
}
 
class ThreadSample(name: String) extends Runnable {
    def run() = {
        for (i <- 0 to 3) {
            println(name + "count=" + i)
            Thread.sleep(1000)
        }
    }
}

【3】Scalaの持つ並行処理機能を使う

 3つ目は、Scalaの持つ並行処理機能を使う方法です。Scalaでは「アクター」と呼ばれるスレッドに似たオブジェクト同士でメッセージを交換し、並行処理が行えます。これはJavaの持つ共有データロックモデルとは違い、データの共有がないメッセージ交換モデルです。

 パフォーマンスにおいては適切に設計された共有データロックモデルにはかないませんが、アクターを使うとデッドロックや競合を回避しやすいことがメリットになります。次ページから、実際にアクターを使用してサンプルプログラムを作成してみましょう。

       1|2|3 次のページへ

Copyright © ITmedia, Inc. All Rights Reserved.

RSSについて

アイティメディアIDについて

メールマガジン登録

@ITのメールマガジンは、 もちろん、すべて無料です。ぜひメールマガジンをご購読ください。