実際にHadoopで処理を実装していきながら「Hadoopは、誰にだって扱える」を体感しましょう。今回は「Hadoopの処理の流れ」を解説します。
書籍の中から有用な技術情報をピックアップして紹介する本シリーズ。今回は、秀和システム発行の書籍『Hadoopファーストガイド(2012年9月20日発行)』からの抜粋です。
ご注意:本稿は、著者及び出版社の許可を得て、そのまま転載したものです。このため用字用語の統一ルールなどは@ITのそれとは一致しません。あらかじめご了承ください。
前回はHadoopのメリットとデメリットを説明しました。今回は、「Hadoopの処理の流れ」を説明していきたいと思います。なお、本稿では基本的に「Hadoop Streaming」の挙動について説明していきますが、参考のためにHadoopの挙動についても触れていきます。
MapReduceはMapフェーズ、Shuffleフェーズ、Reduceフェーズの順で処理されていきます。まず、入力データはMapフェーズでMapperにより処理されます。Mapフェーズでは主に以下のような処理が行われます。
具体的には、入力データが渡され、不要な情報を取り除いたり、値を別の形式に変換したりします。その後、Mapperでの出力結果がReducerへと渡されていきます。
なお、Mapperに対してHadoopではkeyとvalueがきちんと別々の引数として渡ってくるのですが、Hadoop Streamingの場合には単なる文字列として渡ってくるために、利用者が明示的にkeyとvalueに分解する必要があります(図2-1)。Web上の説明などではHadoop StreamingではなくHadoopに関しての説明が多いため、HadoopStreamingを利用しようとしてもここの部分のイメージがつかみにくく、混乱することが多いように感じます。
これについてはHadoopのソースコードを見ると分かりやすいと思います。Hadoopのソースコードはこちらからダウンロードできるのでぜひダウンロードして確認してみてください。2012年6月現在、最新の安定板は1.0.3となっているので1.0.3を見てみましょう(編注:2017年1月現在の安定版は2.7.3となります)。
バージョン | 説明 | |
---|---|---|
1.0.x | 現在のstable版(2012/5/16 v1.0.3リリース) | |
2.x.x | 現在のalpha版 | |
0.20.203.x | 以前のstable版 | |
0.22.x | セキュリティ非対応版 | |
0.23.x | MapReduce2.0対応版 | |
ちなみに、バージョンがたくさんあるのでそれぞれがどのような関係なのかよく分からないと思いますが、これらは以下のような関係となっています※1。動作が安定してきたからこれまでのstable版である0.20.203.x系列がついに1.0.0がナンバリングされたというだけであって、特別な変更が行われたわけではありません(図2-2)。
さて、ダウンロードしてきたソースコードの中から「src/mapred/org/apache/hadoop/mapreduce/Mapper.java」を見てみましょう。Javaで書かれたMapperのソースコードが見つかると思います。Mapper部分を抜粋するとこのようなコードとなっています。
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } }
このコードを見るとMapperで引数を受け取った時点でkeyとvalueを引数として別々に受け取っていることが確認できます。一方、Hadoop Streamingでは標準入力のテキストデータがそのまま渡される仕様となっているので明示的にkeyとvalueに分割する必要があります※2。
※2 もちろん、必要がなければ無理に分割する必要はありません
さらに、ソースコードの中にはサンプルコードも付随しています。例えばsrc/examples/org/apache/hadoop/examples/WordCount.javaは単純にスペース区切りの文字数をカウントするだけのプログラムですが、非常にシンプルで分かりやすいのではないでしょうか。
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Mapフェーズでは入力データの合計サイズがブロックサイズ何個分かによって起動するMapperの数が決まります。つまり、例えば10TBの入力データがあって、ブロックサイズが128MBである場合、Mapperの数は約78,000となります。
10000000MB÷128MB ≒ 78,000
Mapperの最適な並列処理数は、1つのノードにつきおよそ10〜100のようです※3。大量に起動させれば良さそうですが、Mapperは起動にも時間がかかるので、大量のMapperを起動するとそれだけで時間がかかってしまいます。1つのMapperが少なくとも1分程度はかかるデータサイズになるようにブロックサイズを調整すると良いでしょう。
※3 CPUにかかる負荷が非常に低いMapperの場合、300まで起動した実績もあります
Copyright © ITmedia, Inc. All Rights Reserved.