Spark 2.0の回帰分析アプリをScalaのSBTで実装し、EMRで実行:Amazon EMRで構築するApache Spark超入門(2)(2/3 ページ)
本連載では、Sparkの概要や、ローカル環境でのSparkのクラスタの構築、Sparkの基本的な概念やプログラミングの方法を説明していきます。今回は、簡単な機械学習のSparkアプリケーションを作成し、Amazon EMRで実行するまでを説明します。
メインとなる実装ファイルを記述する
メインとなる実装ファイルは、「src/main/scala/」以下に書いていきます。今回は、「SparkExampleApp.scala」というファイルを作って処理を記述していきます。
SparkExampleApp.scalaを作成して下記のように編集してください。
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature._
import org.apache.spark.ml.regression.LinearRegression
object SparkExampleApp {
def main(args: Array[String]) {
// SparkSessionオブジェクトの作成
val spark = SparkSession
.builder()
.appName("Spark Sample App")
.getOrCreate()
import spark.implicits._ // データフレームを扱うためのおまじない
// この後に処理を書いていきます。
}
}
上記のコードの解説です。
1〜4行目では必要なパッケージをimportしています。
Spark 2のSparkSessionオブジェクトを作成
次に、SparkExampleAppというシングルトンオブジェクトを定義して(6〜20行目)、その中のmainメソッド内にアプリケーションの処理を記述しています(8〜19行目)。
mainメソッド内では、まず、Spark 2.0.0から追加されたSparkSessionオブジェクトを作成しています。このオブジェクトを通して、データの処理が行えます。生成時には、メソッドチェーンでSessionの設定を記述できます。今回はAppNameメソッドでアプリケーションの名前を指定しています(10〜13行目)。
他にもいろいろ指定できますが、実行時に直接指定したりconfigファイルに別途記述したりすることもできるので、そちらの方が便利です。そのため筆者は、ここでは簡潔に書くようにしています。
15行目で「import spark.implicits._」という記述がありますが、これは「データフレーム」を扱うための記述です。今回用いる「spark.ml」という機械学習のパッケージは、データフレームを扱うので必要になります。
データフレームとは
簡単に説明すると、前回説明した「RDD(Resilient Distributed Dataset)」をより便利したもので、より直感的にデータを扱える仕組みです。
SQLライクにデータを取得できる高レベルなAPIがそろっているので、簡潔により短いコードで実装していくことが可能です。また、処理の最適化も行われています。そのため、直接RDDを扱うよりもメリットがあります。
spark.mlを含む「MLLib」について
Sparkには「MLlib」という機械学習のパッケージがあります。回帰や分類、クラスタリングと基本的なアルゴリズムを備えています。
MLlibにはRDDベース(spark.mllib)とDataFrameベース(spark.ml)の2つがあります。前者はSpark 2.0.0からメンテナンスモードとなりました。なので、spark.mlを使うことを推奨されています。今回は後者の方を使っていきます。
AWSへの接続とcsvからデータを読み込むデータフレームの作成
実装の解説に戻ります。続けて、先ほどのコードのmainメソッド内の17行目に下記のコードを書いていきます。
// AWSのAccessKeyの設定
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "*********")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "*********")
// csvからデータを読み込むデータフレームの作成
val filePath = args(0)
val df = spark.sqlContext.read
.format("com.databricks.spark.csv") // データのフォーマットを指定します。
.option("header", "true") // カラム名をデータフレームのスキーマに反映します。
.option("inferSchema", "true") // 型の自動変換も行います。
.load(filePath)
まず、S3へアクセスするためのAccessKeyIdを設定しています(1〜3行目)。こちらは各自の環境に合わせて設定してください。
次に、S3上のcsvファイルを読み込んでデータフレームを作成しています(5〜11行目)。filePathはアプリケーション実行時に引数で指定する想定です。
パイプラインオブジェクトを作成
続けて、下記のコードを書いてください。
// Pipelineの各要素を作成
// 【1】素性のベクトルを生成します
val assembler = new VectorAssembler()
.setInputCols(Array("x")) // 説明変数を指定します。
.setOutputCol("features") // 変換後の値をfeaturesという名前でデータフレームに追加
// 【2】素性のベクトルを多項式にします
val polynomialExpansion = new PolynomialExpansion()
.setInputCol(assembler.getOutputCol) // featuresの文字列です。
.setOutputCol("polyFeatures") // 変換後の値をpolyFeaturesという名前でデータフレームに追加
.setDegree(4) // 4次の多項式です。
// 【3】線形回帰の予測器を指定します
val linearRegression = new LinearRegression()
.setLabelCol("y") // 目的変数です。
.setFeaturesCol(polynomialExpansion.getOutputCol) // polyFeaturesの文字列です。
.setMaxIter(100) // 繰り返しが100回
.setRegParam(0.0) // 正則化パラメータ
// 【1】〜【3】を元にパイプラインオブジェクトを作ります
val pipeline = new Pipeline()
.setStages(Array(assembler, polynomialExpansion, linearRegression))
上記はパイプラインオブジェクトを生成しています。
「Pipeline」とは
Pipelineはデータの整形から予測モデルの生成までの一連の処理を、簡潔に書くことができる仕組みです。
Pipelineのコンポーネントは「変換器」「予測器」の2つで構成されています。変換器を使ってデータを整形し、それを元に予測器でモデルを作成します。
- 変換器(Transformer)
データフレームを分析可能な値に変換できる。例えば、複数のスカラー値からベクトルオブジェクトへの変換、形態素解析、文字数カウントなどが可能 - 予測器(Estimator)
ロジスティック回帰などの分類、回帰分析などの推定、クラスタリングなどができる
上記コードの解説に戻ります。
まず、コード内の【1】の部分では、csvデータの中から分析に使うカラムを指定しています。
// 【1】素性のベクトルを生成します
val assembler = new VectorAssembler()
.setInputCols(Array("x")) // 説明変数を指定します。
.setOutputCol("features") // 変換後の値をfeaturesという名前でデータフレームに追加
今回は、説明変数が1つなので、xカラムだけを指定していますが、もし多変量の分析がしたい場合はsetInputCols(Array("x1", "x2", "x3", "x4"))などのように複数指定することもできます。
次に、コード内の【2】では、上で指定したカラム「x」の値を4次の多項式化しています。
// 【2】素性のベクトルを多項式にします
val polynomialExpansion = new PolynomialExpansion()
.setInputCol(assembler.getOutputCol) // featuresの文字列です。
.setOutputCol("polyFeatures") // 変換後の値をpolyFeaturesという名前でデータフレームに追加
.setDegree(4) // 4次の多項式です。
具体的には、カラム「x」をインプットにして、(x, x^2, x^3, x^4)のベクトルを「polyFeatures」という新しいカラムにセットしています。
コード内の【3】では予測器について定義しています。先ほど生成したベクトルを説明変数に、データフレーム(つまりcsv内)のカラム「y」を目的変数として学習モデルを作ります。
// 3. 線形回帰の予測器を指定します
val linearRegression = new LinearRegression()
.setLabelCol("y") // 目的変数です。
.setFeaturesCol(polynomialExpansion.getOutputCol) // polyFeaturesの文字列です。
.setMaxIter(100) // 繰り返しが100回
.setRegParam(0.0) // 正則化パラメータ
今回は、繰り返し回数を「100」に、正則化パラメータ(setRegParam)を「0.0」に指定して正則化は「なし」に設定しています。
パラメータの設定はモデルの作成において重要です。
繰り返し回数は大きく設定すればするほど、精度の向上が期待できますが、計算時間が増えてしまいます。また、正則化パラメータは、大きくすればするほどモデルの複雑さを解消できますが、大きくし過ぎると、素性の効果が薄れ過ぎてしまいます。
これらのパラメータは最適な値を設定する必要があります。最適な値を求める方法として「グリッドサーチ」がありますが、Sparkではそれも簡単にできます(次回以降に説明する予定です)。
最後に、pipelineオブジェクトを作成しています。変換器や予測器がステージごとに処理されるように、setStageメソッドでそれらを指定しています。
データの指定と実行結果の保存
続けて、下記のコードを書きます。
val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))
val model = pipeline.fit(trainingData) // 学習データを指定します。
// csvに保存
val outputFilePath = args(1)
model.transform(testData) // テストデータを指定します
.select("x", "prediction")
.write
.format("com.databricks.spark.csv") // データのフォーマットを指定します。
.option("header", "false") // ヘッダーにカラム名をつけるか
.save(outputFilePath) // ファイルの保存先です。
まず、最初に生成したデータフレームを学習用とテスト用に分けています。次に、piplelineオブジェクトからモデルを作成しています。そして、そのモデルを元にテスト用データの変換(予測)を行い、csvとして書き込みます。
SparkExampleApp.scalaの完成コード
完成すると下記のようなコードになっているかと思います。GitHubに上げておきました。
Copyright © ITmedia, Inc. All Rights Reserved.
関連記事
知らないと大損する、Apache Sparkの基礎知識と3つのメリット
社会一般から大きな注目を集めているIoT(Internet of Things)。だが、その具体像はまだ浸透しているとはいえない。今回は、IoTやビッグデータのキーテクノロジとして注目されている「Apache Spark」について、Sparkを製品に取り込んでいる日本IBMの土屋敦氏と、数多くの企業のデータ分析を担うブレインパッドの下田倫大氏に話をうかがった。
Sparkのエンタープライズ対応が「成熟」――Clouderaが宣言
HadoopディストリビューターもあらためてSparkへの注力をアピール。既に800ノード超のSparkクラスターを運用するユーザーも存在するという。
Sparkは“誰”に例えられる?──多様化と進化を続ける「Hadoop」、人気急上昇「Spark」
先日、日本Hadoopユーザー会主催のイベントが開催されました。データベースと関係性が深いデータ分散処理プラットフォームである「Hadoop」と「Spark」の最近事情に迫ります。
