ScalaアプリケーションからSpark APIを使ってみます。Apahe Sparkのサンプルとなるテンプレートプロジェクトが用意されているので、giter8を使ってひな型を作成しましょう。
ここにあるように、giter8はConscriptかHomebrewで簡単にインストールできます。
「giter8」は、GitHubなどのリポジトリ上にあるテンプレートを使ってファイルやディレクトリを生成するためのツールです。sbt 0.13.13から、「sbt new」コマンドから呼び出せるようになりました。
下記のコマンドを実行すればApache Sparkのsbtを用いたサンプルプロジェクトが生成されます。
% sbt new holdenk/sparkProjectTemplate.g8 --name=example --organization=jp.classmethod --sparkVersion=2.3.1
g8コマンドを直接使っても問題ありません。
% g8 holdenk/sparkProjectTemplate --name=example --organization=jp.classmethod --sparkVersion=2.3.1
このサンプルは、実行時に指定したファイルの単語数をカウントして出力するワードカウントプログラムです。inputファイルを用意すればすぐ動かすことができるはずですが、筆者の環境では使用ライブラリのバージョン指定でうまく動作しなかったため、build.sbtを少し修正しました。
//2.3.1_0.9.0から2.3.1_0.10.0に修正 "com.holdenkarau" %% "spark-testing-base" % "2.3.1_0.10.0" % "test"
では、単語数をカウントするファイル(input.txt)を用意しましょう。今回は「Lorem ipsum」を使用しました(※日本語を使用した場合、想定する「単語数カウント」の動作はしないと思います)。
sbt packageコマンドでjarを作成します。
% sbt package
targetディレクトリ以下にjarが作成されたら、Sparkアプリケーションの実行コマンド「spark-submit」を使ってサンプルを実行します。
% spark-submit \--class "jp.classmethod.example.CountingApp" \--master local target/scala-2.11/example_2.11-0.0.1.jar input.txt output
classでは、SparkアプリケーションのMainクラスを指定します。masterでは、「マスターURL」と呼ばれるSparkConfのmasterに指定する値を指定します。これによってSparkアプリケーションがどの環境で動かすのかが決まります。
ここではlocalを指定していますが、これはローカルモードで動作させるための指定です。この場合、ローカルで1つだけスレッドを割り当てます。
ちなみに、「local[スレッド数]」と指定すれば、指定した数のスレッド数を割り当てますし、スレッド数に「*」を指定すればCPUのコア数分割り当てます。
spark-submitの詳細はこちらのドキュメントをご確認ください。
最後にmainの引数となるinputファイルパスとoutput用ディレクトリ名を指定しています。実行後、outputディレクトリ内に「part-00000」というファイルが作成されています。単語ごとに登場回数がカウントされているのが分かります。
(enim,1) (consectetur,1) (deserunt,1) (aliqua.,1) (ea,1) ・・・
ソースコードを少し見てみましょう。MainクラスのCountingAppでは、引数で渡されたinputファイルとoutputディレクトリのパスを設定し、Runner(自作のタスクランナー)を実行しています。
//CountingApp.scala object CountingApp extends App{ val (inputFile, outputFile) = (args(0), args(1)) // spark-submit command should supply all necessary config elements Runner.run(new SparkConf(), inputFile, outputFile) }
RunnerではSparkContextを用いてinputファイルをRDD(データを保持するコレクション型)に変換して、単語数カウントモジュール(WordCount)へ渡して結果を出力しています。
object Runner { def run(conf: SparkConf, inputFile: String, outputFile: String): Unit = { val sc = new SparkContext(conf) val rdd = sc.textFile(inputFile) val counts = WordCount.withStopWordsFiltered(rdd) counts.saveAsTextFile(outputFile) } }
WordCount.withStopWordsFilteredではルールに従って単語数をカウントし、単語と回数のペアのコレクションを返しています。
//WordCount.scala object WordCount { def withStopWordsFiltered(rdd : RDD[String], separators : Array[Char] = " ".toCharArray, stopWords : Set[String] = Set("the")): RDD[(String, Int)] = { val tokens: RDD[String] = rdd.flatMap(_.split(separators). map(_.trim.toLowerCase)) val lcStopWords = stopWords.map(_.trim.toLowerCase) val words = tokens.filter(token => !lcStopWords.contains(token) && (token.length > 0)) val wordPairs = words.map((_, 1)) val wordCounts = wordPairs.reduceByKey(_ + _) wordCounts } }
このように、簡単なプログラムでApache Sparkによる処理が実行できました。今回はローカルで単一スレッドによる処理ですが、並列処理させる場合でもやり方は変わりません。
今回は、分散処理フレームワークの概要や、Apache Hadoop、MapReduce、Apache Sparkの概要を説明し、Apache Sparkの使い方を紹介しました。どんなケースでもApache Sparkが向いているわけではありませんが、手軽に分散処理を行いたい場合には採用を検討してみるのもいいと思います。
本連載では、ここまで「リアクティブプログラミング」という観点からLightbend社の開発プラットフォーム「Lightbend Reactive Platform」を構成する下記のプロダクトを紹介してきました。
Play frameworkでは非同期通信の標準サポートやIterateeベースのリアクティブモデルを持ち、スケーラブルなアプリケーションでもCPUやメモリなどのリソースを効率良く使えます。Akka Streamsでは非同期ストリームやBack pressureを提供し、伸縮性と耐障害性を実現できます。Lagomを使えばリアクティブネイティブでマイクロサービスを開発できます。
Lightbend Reactive Platformのプロダクトを全て使用しなければリアクティブなアプリケーションが開発できないというわけではありません。各プロダクトを目的に応じて取捨選択し、リアクティブ本来の要件である「高速レスポンス、耐障害性、高可用性、メッセージ駆動」を実現してください。
今までお付き合いいただきありがとうございました。
Copyright © ITmedia, Inc. All Rights Reserved.