「Hadoopの処理の流れ」を理解し、実践する:きょうから試せる Hadoop“スモールスタート”ガイド(2)(1/3 ページ)
実際にHadoopで処理を実装していきながら「Hadoopは、誰にだって扱える」を体感しましょう。今回は「Hadoopの処理の流れ」を解説します。
書籍の中から有用な技術情報をピックアップして紹介する本シリーズ。今回は、秀和システム発行の書籍『Hadoopファーストガイド(2012年9月20日発行)』からの抜粋です。
ご注意:本稿は、著者及び出版社の許可を得て、そのまま転載したものです。このため用字用語の統一ルールなどは@ITのそれとは一致しません。あらかじめご了承ください。
Hadoopの処理の流れ
前回はHadoopのメリットとデメリットを説明しました。今回は、「Hadoopの処理の流れ」を説明していきたいと思います。なお、本稿では基本的に「Hadoop Streaming」の挙動について説明していきますが、参考のためにHadoopの挙動についても触れていきます。
Mapフェーズ
MapReduceはMapフェーズ、Shuffleフェーズ、Reduceフェーズの順で処理されていきます。まず、入力データはMapフェーズでMapperにより処理されます。Mapフェーズでは主に以下のような処理が行われます。
- 不要な情報を取り除く
- 値を別の形式に変換する
具体的には、入力データが渡され、不要な情報を取り除いたり、値を別の形式に変換したりします。その後、Mapperでの出力結果がReducerへと渡されていきます。
なお、Mapperに対してHadoopではkeyとvalueがきちんと別々の引数として渡ってくるのですが、Hadoop Streamingの場合には単なる文字列として渡ってくるために、利用者が明示的にkeyとvalueに分解する必要があります(図2-1)。Web上の説明などではHadoop StreamingではなくHadoopに関しての説明が多いため、HadoopStreamingを利用しようとしてもここの部分のイメージがつかみにくく、混乱することが多いように感じます。
これについてはHadoopのソースコードを見ると分かりやすいと思います。Hadoopのソースコードはこちらからダウンロードできるのでぜひダウンロードして確認してみてください。2012年6月現在、最新の安定板は1.0.3となっているので1.0.3を見てみましょう(編注:2017年1月現在の安定版は2.7.3となります)。
バージョン | 説明 | |
---|---|---|
1.0.x | 現在のstable版(2012/5/16 v1.0.3リリース) | |
2.x.x | 現在のalpha版 | |
0.20.203.x | 以前のstable版 | |
0.22.x | セキュリティ非対応版 | |
0.23.x | MapReduce2.0対応版 |
ちなみに、バージョンがたくさんあるのでそれぞれがどのような関係なのかよく分からないと思いますが、これらは以下のような関係となっています※1。動作が安定してきたからこれまでのstable版である0.20.203.x系列がついに1.0.0がナンバリングされたというだけであって、特別な変更が行われたわけではありません(図2-2)。
さて、ダウンロードしてきたソースコードの中から「src/mapred/org/apache/hadoop/mapreduce/Mapper.java」を見てみましょう。Javaで書かれたMapperのソースコードが見つかると思います。Mapper部分を抜粋するとこのようなコードとなっています。
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } }
このコードを見るとMapperで引数を受け取った時点でkeyとvalueを引数として別々に受け取っていることが確認できます。一方、Hadoop Streamingでは標準入力のテキストデータがそのまま渡される仕様となっているので明示的にkeyとvalueに分割する必要があります※2。
※2 もちろん、必要がなければ無理に分割する必要はありません
さらに、ソースコードの中にはサンプルコードも付随しています。例えばsrc/examples/org/apache/hadoop/examples/WordCount.javaは単純にスペース区切りの文字数をカウントするだけのプログラムですが、非常にシンプルで分かりやすいのではないでしょうか。
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Mapフェーズでは入力データの合計サイズがブロックサイズ何個分かによって起動するMapperの数が決まります。つまり、例えば10TBの入力データがあって、ブロックサイズが128MBである場合、Mapperの数は約78,000となります。
10000000MB÷128MB ≒ 78,000
Mapperの最適な並列処理数は、1つのノードにつきおよそ10〜100のようです※3。大量に起動させれば良さそうですが、Mapperは起動にも時間がかかるので、大量のMapperを起動するとそれだけで時間がかかってしまいます。1つのMapperが少なくとも1分程度はかかるデータサイズになるようにブロックサイズを調整すると良いでしょう。
※3 CPUにかかる負荷が非常に低いMapperの場合、300まで起動した実績もあります
Copyright © ITmedia, Inc. All Rights Reserved.
関連記事
- もし、あなたが「“ビッグデータプロジェクト”を任せる。何とかするように」と言われたら
「ビッグデータプロジェクトを始めることになった」ら、具体的に何をするのか。本連載は、「ビッグデータプロジェクトの“進め方”」を業務視点/ビジネス視点の両面から体系的に理解し、具体的に実践していく方のためのナレッジアーカイブです。第1回目は、「ビッグデータとは何か」の基礎と、「ビッグデータ基盤の概要とメリット」を解説します。 - Hadoopは「難しい・遅い・使えない」? 越えられない壁がある理由と打開策を整理する
ブームだったHadoop。でも実際にはアーリーアダプター以外には、扱いにくくて普及が進まないのが現状だ。その課題に幾つかの解決策が出てきた。転換期を迎えるHadoopをめぐる状況を整理しよう。 - いまさら聞けないHadoopとテキストマイニング入門
Hadoopとは何かを解説し、実際にHadoopを使って大規模データを対象にしたテキストマイニングを行います。テキストマイニングを行うサンプルプログラムの作成を通じて、Hadoopの使い方や、どのように活用できるのかを解説します - 欧米の金融業界は今、どうHadoopを活用しているか
Hadoopは、欧米の金融関連サービス業界でどう活用されているか。米Hortonworksの金融サービス業界担当ゼネラルマネージャーへのインタビューで得た情報を、2回に分けてお届けする。今回は金融業界におけるHadoopのユースケースを概観する。