Apache Spark、Sparkシェルで手軽に分散処理リアクティブプログラミング超入門(終)(2/2 ページ)

» 2018年09月10日 05時00分 公開
[中村修太クラスメソッド]
前のページへ 1|2       

ScalaアプリケーションからSpark APIを使う

 ScalaアプリケーションからSpark APIを使ってみます。Apahe Sparkのサンプルとなるテンプレートプロジェクトが用意されているので、giter8を使ってひな型を作成しましょう。

 ここにあるように、giter8はConscriptかHomebrewで簡単にインストールできます。

giter8とは

 「giter8」は、GitHubなどのリポジトリ上にあるテンプレートを使ってファイルやディレクトリを生成するためのツールです。sbt 0.13.13から、「sbt new」コマンドから呼び出せるようになりました。

 下記のコマンドを実行すればApache Sparkのsbtを用いたサンプルプロジェクトが生成されます。

% sbt new holdenk/sparkProjectTemplate.g8 --name=example --organization=jp.classmethod --sparkVersion=2.3.1

 g8コマンドを直接使っても問題ありません。

% g8 holdenk/sparkProjectTemplate --name=example --organization=jp.classmethod --sparkVersion=2.3.1

 このサンプルは、実行時に指定したファイルの単語数をカウントして出力するワードカウントプログラムです。inputファイルを用意すればすぐ動かすことができるはずですが、筆者の環境では使用ライブラリのバージョン指定でうまく動作しなかったため、build.sbtを少し修正しました。

//2.3.1_0.9.0から2.3.1_0.10.0に修正
"com.holdenkarau" %% "spark-testing-base" % "2.3.1_0.10.0" % "test"

 では、単語数をカウントするファイル(input.txt)を用意しましょう。今回は「Lorem ipsum」を使用しました(※日本語を使用した場合、想定する「単語数カウント」の動作はしないと思います)。

 sbt packageコマンドでjarを作成します。

% sbt package

 targetディレクトリ以下にjarが作成されたら、Sparkアプリケーションの実行コマンド「spark-submit」を使ってサンプルを実行します。

% spark-submit
  \--class "jp.classmethod.example.CountingApp"
  \--master local target/scala-2.11/example_2.11-0.0.1.jar input.txt output

 classでは、SparkアプリケーションのMainクラスを指定します。masterでは、「マスターURL」と呼ばれるSparkConfのmasterに指定する値を指定します。これによってSparkアプリケーションがどの環境で動かすのかが決まります。

 ここではlocalを指定していますが、これはローカルモードで動作させるための指定です。この場合、ローカルで1つだけスレッドを割り当てます。

 ちなみに、「local[スレッド数]」と指定すれば、指定した数のスレッド数を割り当てますし、スレッド数に「*」を指定すればCPUのコア数分割り当てます。

 spark-submitの詳細はこちらのドキュメントをご確認ください。

 最後にmainの引数となるinputファイルパスとoutput用ディレクトリ名を指定しています。実行後、outputディレクトリ内に「part-00000」というファイルが作成されています。単語ごとに登場回数がカウントされているのが分かります。

(enim,1)
(consectetur,1)
(deserunt,1)
(aliqua.,1)
(ea,1)
・・・

 ソースコードを少し見てみましょう。MainクラスのCountingAppでは、引数で渡されたinputファイルとoutputディレクトリのパスを設定し、Runner(自作のタスクランナー)を実行しています。

//CountingApp.scala
object CountingApp extends App{
  val (inputFile, outputFile) = (args(0), args(1))
  // spark-submit command should supply all necessary config elements
  Runner.run(new SparkConf(), inputFile, outputFile)
}

 RunnerではSparkContextを用いてinputファイルをRDD(データを保持するコレクション型)に変換して、単語数カウントモジュール(WordCount)へ渡して結果を出力しています。

object Runner {
  def run(conf: SparkConf, inputFile: String, outputFile: String): Unit = {
    val sc = new SparkContext(conf)
    val rdd = sc.textFile(inputFile)
    val counts = WordCount.withStopWordsFiltered(rdd)
    counts.saveAsTextFile(outputFile)
  }
}

 WordCount.withStopWordsFilteredではルールに従って単語数をカウントし、単語と回数のペアのコレクションを返しています。

//WordCount.scala
object WordCount {
  def withStopWordsFiltered(rdd : RDD[String],
    separators : Array[Char] = " ".toCharArray,
    stopWords : Set[String] = Set("the")): RDD[(String, Int)] = {
    val tokens: RDD[String] = rdd.flatMap(_.split(separators).
      map(_.trim.toLowerCase))
    val lcStopWords = stopWords.map(_.trim.toLowerCase)
    val words = tokens.filter(token =>
      !lcStopWords.contains(token) && (token.length > 0))
    val wordPairs = words.map((_, 1))
    val wordCounts = wordPairs.reduceByKey(_ + _)
    wordCounts
  }
}

 このように、簡単なプログラムでApache Sparkによる処理が実行できました。今回はローカルで単一スレッドによる処理ですが、並列処理させる場合でもやり方は変わりません。

まとめ

 今回は、分散処理フレームワークの概要や、Apache Hadoop、MapReduce、Apache Sparkの概要を説明し、Apache Sparkの使い方を紹介しました。どんなケースでもApache Sparkが向いているわけではありませんが、手軽に分散処理を行いたい場合には採用を検討してみるのもいいと思います。

最後に

 本連載では、ここまで「リアクティブプログラミング」という観点からLightbend社の開発プラットフォーム「Lightbend Reactive Platform」を構成する下記のプロダクトを紹介してきました。

  • Play framework
  • Akka、Akka Streams
  • Lagom
  • Apache Spark

 Play frameworkでは非同期通信の標準サポートやIterateeベースのリアクティブモデルを持ち、スケーラブルなアプリケーションでもCPUやメモリなどのリソースを効率良く使えます。Akka Streamsでは非同期ストリームやBack pressureを提供し、伸縮性と耐障害性を実現できます。Lagomを使えばリアクティブネイティブでマイクロサービスを開発できます。

 Lightbend Reactive Platformのプロダクトを全て使用しなければリアクティブなアプリケーションが開発できないというわけではありません。各プロダクトを目的に応じて取捨選択し、リアクティブ本来の要件である「高速レスポンス、耐障害性、高可用性、メッセージ駆動」を実現してください。

 今までお付き合いいただきありがとうございました。

著者紹介

中村修太(なかむら しゅうた)

中村修太

クラスメソッド勤務の新しもの好きプログラマーです。数年前に東京から山口県に引っ越し、現在もリモート勤務しています。最近の趣味は空手とぬか漬け作り。


前のページへ 1|2       

Copyright © ITmedia, Inc. All Rights Reserved.

スポンサーからのお知らせPR

注目のテーマ

Microsoft & Windows最前線2025
AI for エンジニアリング
「サプライチェーン攻撃」対策
1P情シスのための脆弱性管理/対策の現実解
OSSのサプライチェーン管理、取るべきアクションとは
システム開発ノウハウ 【発注ナビ】PR
あなたにおすすめの記事PR

RSSについて

アイティメディアIDについて

メールマガジン登録

@ITのメールマガジンは、 もちろん、すべて無料です。ぜひメールマガジンをご購読ください。