今回は、分散処理フレームワークの概要や、Apache Hadoop、MapReduce、Apache Sparkの概要を説明し、Apache Sparkの使い方を紹介します。
リアクティブプログラミング(RP)の概要や、それに関連する技術、RPでアプリを作成するための手法について解説する本連載「リアクティブプログラミング超入門」。Lightbend社のリアクティブ開発プラットフォームである「Lightbend Reactive Platform」を使用して、実際にリアクティブなアプリケーションを作成しています。
前回の記事では、Lightbend Reactive Platformの構成要素の一つである、マイクロサービス向けフレームワーク「Lagom」を紹介しました。今回は、同じくLightbend Reactive Platformの構成要素の一つである、分散処理フレームワーク「Apache Spark」について解説します。
Lightbend Reactive Platformを構成するプロダクトの一つである「Apache Spark」は分散処理フレームワークです。
分散処理フレームワークとは、簡単にいうと「大量のデータを複数コンピュータによって並列処理させ、処理を短時間で実行させるための仕組みを持った」フレームワークです。
代表的な分散処理フレームワークとしては、Apache Hadoopが有名です。Hadoopはテキストデータやアクセスログといった大量のデータの蓄積/分析を分散処理技術によって実現します。「大量のデータ」を「分散処理(複数の環境で並列に実行できる)」で効率良く処理できるところが特長ですね。
Hadoopは、「HDFS」と呼ばれる分散ファイルシステムと「MapReduce」という分散処理を行うための方法から構成されています。次にこれらについて簡単に説明しましょう。
HDFS(Hadoop Distributed File System)は、Hadoopで使われるファイルシステムで、ファイルを分割して複数のディスクで管理し、大量のデータを処理する場合でも効率良く処理することが可能です。HDFSは図のようにNameNodeと複数のDataNodeで構成されます。NameNodeはファイルシステムのメタ情報を管理し、DataNodeは、データ自体を管理します。
また、データを保存する場合は、対象データを一定のサイズで分割してDataNodeで保存します。データを取得する場合は、最初にNameNodeにデータの場所を問い合わせ、その後にデータを取得します。
MapReduceとは、Googleから提唱された「分散処理を行うための方法」で、map処理(入力ファイルのレコードからkey-valueの組み合わせを作る処理)とreduce処理(map処理で作成されたデータから同じkeyを持つものを集約する処理)を組み合わせて行います。
Hadoopでは、このMapReduceの処理方法を実装しています。なお、Hadoop以外でMapReduce処理を実装したアプリケーションとしては、「Disco」(Pythonで実装)などがあります。
分散処理フレームワークを用いたデータ分析とその利用については、近年では一般的になってきました。実際に、さまざまな業種で、分散処理フレームワークによって分析されたデータが活用されていますが、課題も出てきています。
Hadoopの場合、ディスクベースのアーキテクチャで大量のデータを効率良く処理するための設計となっています。そのため処理のレイテンシも決して小さくなく、用途によっては時間がかかってしまいます。
こういった問題から、「もっと手軽に短時間で処理を行いたい」といった要件が出てきました。それを実現するための一つの答えが、後述する「Apache Spark」です。Apache Sparkはメモリ上でデータの処理をすることで高速化を実現しており、ディスクアクセスを多用するHadoopとは違って機械学習などの用途に向いているといわれています。
ここからは、Apache Sparkについて説明します。
Apache Sparkとは、Hadoopと同じく分散処理のフレームワークです。もともとはカリフォルニア大学バークレー校で開発が開始され、2014年にApache Software Foundationに寄贈されました。また、Apache SparkはScalaで実装されており、Spark用のコードはさまざまな言語で記述することが可能です。
Apache Sparkの主な特徴を3つ紹介します。
先述しましたが、HadoopのMapReduceはI/O処理時にディスクへのアクセスを多用します。それに比べてApache Sparkの場合、処理データをメモリ上で展開することでI/O処理の高速化を実現しています。
Apache Sparkの場合、HDFSの使用が必須ということはありません。HDFS以外にOpenStack SwiftやAmazon S3を使用することが可能です(※AWSのEMR(Elastic MapReduce)でもApache Sparkを選択することができます)。
Apache Sparkを制御するため、Scala、Java、PythonなどのAPIが提供されています。また、Datasetへのアクセス方法として「Spark SQL」と呼ばれる言語を使用したり、RDBからデータを取得したりすることもできます。
Apache Sparkコンポーネントの一つ。Spark SQLは構造化されたデータを処理するためのApache Spark用モジュールです。標準のSQLを使用してHDFS上のファイルやHiveテーブルにアクセスしたり、JDBCを用いてRDBにアクセスしたりすることができます。
Hadoopによるデータ処理をシンプルに使用するための、Hadoop上で動作するデータウェアハウスシステム。これはHadoop上で動作し、ユーザーに対してDBMSの機能を提供します。また、Hiveでは「HiveQL」と呼ばれるSQL互換言語で操作可能です。
Apache Sparkで使用するデータ構造。RDBのテーブルのようにレコードとカラム(名前と型を持つ)概念を持っている抽象的な構造で、Hiveテーブルやファイルなどから生成できる。DatasetはApache Spark 1.6から採用されたデータ構造で、それまではDataFrameと呼ばれるデータ型を使用していた。DatasetはDataFrameを拡張したデータ構造。
実際にApache Sparkの動作を試してみましょう。このページからダウンロードしてもいいですし、下記のようにHomebrewでもインストールできます。
% brew install apache-spark
Sparkが正しくインストールされていれば、下記のようにバージョン情報が表示されます。
% spark-shell --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_121 Branch Compiled by user vanzin on 2018-06-01T20:37:04Z Revision Url Type --help for more information.
Sparkシェルとは、Sparkのコマンドをコマンドラインから入力して対話的に実行内容を確認することができるモードです。
まずはSparkシェルを起動してみましょう。spark-shellコマンドを実行すればScalaのSparkシェル用REPLが起動します。
% spark-shell ・ ・ scala>
シェルを終了するときは「:q」を入力します。
scala> :q
なお、helpは「:help」で表示できるので必要なときに使いましょう。
Sparkシェルでログファイルにアクセスしてみましょう。まずは「log.csv」という名前で下記のようなファイルを準備します。
100,taro,30,2018-03-20 10:41:20 101,hanako,20,2018-03-03 11:32:34 102,Mike,35,2018-01-28 20:20:11
Sparkシェルでは自動的にSpark関連のライブラリ(org.apache.spark.SparkContext._)がimportされています。このため、Sparkのクラスや関数が使用できます。では用意したログファイルを読んでみましょう。scという変数名でSparkContextのインスタンスが使用できるので、textFike関数を使ってログファイルを読み込みます。
scala> val file = sc.textFile("./log.csv") file: org.apache.spark.rdd.RDD[String] = ./log.csv MapPartitionsRDD[1] at textFile at <console>:24
読み込んだファイルの行数を数えることができます。
scala> file.count() res0: Long = 3
先頭行はfirst関数で取得できます。
scala> file.first() res1: String = 100,taro,30,2018-03-20 10:41:20
filterを使って任意の条件にマッチした行を取得することも可能です。
scala> file.filter(row => row.contains("hanako")).count res4: Long = 1
ここまではコンソールだけでやってきましたが、Sparkシェルを起動すると「Spark shell application UI」というWebインタフェースのサービスも同時に起動されます。これは、ブラウザでSparkシェルのジョブステータスや設定情報を確認することができます。
Sparkシェルを起動すると、起動時に下記のようなログメッセージが表示されるので、ブラウザでアクセスしてみてください。
・ ・ Spark context Web UI available at http://{your local ip}:4040 ・ ・
実行したコマンドのステータスや実行時間などが表示されています。
Copyright © ITmedia, Inc. All Rights Reserved.