WebUIでは処理の状況を確認できます。アプリケーション一覧から、Application IDをクリックします(app-20160729140037-0000のような文字列です)。その次のページで、Application Detail UIというリンクを開きましょう。そうすると、先ほど実行した処理がジョブ一覧に表示されています。
ジョブを選択すると、ステージ一覧が表示されます。また、ステージを選択するとタスクの一覧が表示されます。
※アクション系のメソッドを指します。この意味は後述します。
Sparkには、重要な抽象的概念として、「RDD(Resilient Distributed Dataset)」があります。RDDはクラスであり、大規模データセットを操作する役割を持ちます。
scala> sc.parallelize(1 to 1000, 10) res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:28
上記コマンドの返り値の型を見てみるとRDD[Int]とあり、Int型の値のデータセットであることが分かります。上記はInt型のRDDですが、RDD[String]や、自分で用意したMyClass型を持つRDD[MyClass]も作ることができます。
RDDオブジェクトに対して、実行できるメソッドは「変換」と「アクション」の2種類に分けられます。
変換系のメソッドはアクション系のメソッドが実行されるまで実行されないという特徴があります。この遅延評価のおかげで、結果として必要なデータだけで計算することができます。これは、計算を必要最低限にするために有効です。大規模データの分析では、こういうことが処理スピードに大きくかかわることがあります。
例えば、下記コードだけでは計算されずRDDオブジェクトを返却するだけですが、
val evenNumbers = sc.parallelize(1 to 1000, 10).filter(_%2 == 0)
下記のようにアクションのメソッドを呼び出すことで初めて計算されます。
evenNumbers.collect
データに対して連続的にアクセスする際にはキャッシュが有効です。
下記は、1から1000までの数値で5の倍数のデータセットです。
val multiplesOf5 = sc.parallelize(1 to 1000, 10).filter(_%5 == 0).cache multiplesOf5.reduce(_+_) // 1回目のアクションは通常通りです。 multiplesOf5.reduce(_+_) // 2回目の参照からキャッシュを使用します。
キャッシュを取得しないと、都度フィルター処理が実行されてしまいますが、キャッシュを使うと、2回目から各ワーカー内のキャッシュから、結果を直接取得できます。
キャッシュは連続してデータを扱う際に有効な機能です。
次回は、Sparkのアプリケーションを作成して、Amazon EMR上で動かすところまで説明したいと思いますので、お楽しみに。
Copyright © ITmedia, Inc. All Rights Reserved.