それでは、前編で紹介した「文章で使用されている英字をカウントする」プログラムをHadoop用に作成してみましょう。Hadoopのサンプル「wordcount」のソースコードを参考にして作成しました。この特集では、Hadoopを使ったプログラムの作成方法を紹介することに主眼を置いていますので、これから作成するプログラムはデフォルトの状態で実行します。
まず、パッケージ用のディレクトリを作成します。ここで作成するクラスはsampleパッケージに含めることにしましょう。
次に、メインとなるクラスとして、sample.CharCountクラスを用意しますが、その前に前編で作成したMapReduceCharCounterAppクラスをもう一度見ておきましょう。
MapReduceCharCounter.java(再掲) |
import java.util.List;
public class MapReduceCharCounter {
public static int[] charCount = new int[128];
public static void emit(ReduceInput input, int count) {
charCount[input.key] = count;
}
public void count(String target) {
charCount = new int[128];
MapTask map = new MapTask();
map.execute(target);
java.util.Collections.sort(map.list);
ReduceTask rt = new ReduceTask();
List<ReduceInput> inputList =
ReduceInputListFactory.createInstance(map.list);
for (ReduceInput input : inputList) {
rt.execute(input);
}
}
public int getCharCount(char c) {
int index = (int)c;
return charCount[index];
}
} |
MapTaskクラスやReduceTaskクラスからインスタンスを生成して利用しています。
sample.CharCountクラスでは、自分では各クラスのインスタンスを生成せずに、どんなクラスを利用するのかの設定だけします。具体的に見てみましょう。
sample.CharCountクラスは、Hadoopの提供するorg.apache.hadoop.util.Toolインターフェイスを実装します。main()メソッドでは、org.apache.hadoop.util.ToolRunnerクラスを使って、org.apache.hadoop.util.Toolインターフェイスを実装したクラス(ここでは、sample.CharCountクラス)のrun()メソッドが実行されるようにします。
sample/CharCount.java |
package sample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class CharCount extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(
getConf(), sample.CharCount.class);
conf.setJobName("charcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(sample.MapClass.class);
conf.setCombinerClass(sample.Reduce.class);
conf.setReducerClass(sample.Reduce.class);
FileInputFormat.setInputPaths(conf, args[0]);
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(
new Configuration(), new CharCount(), args);
System.exit(res);
}
} |
このクラスは、基本的で、お決まりの処理をしているだけです。run()メソッド内では、org.apache.hadoop.mapred.JobConfクラスのインスタンスのみ生成し、後は、「MapクラスはsetMapperClass()メソッドで指定したり、ReduceクラスはsetReducerClass()メソッドで指定したりするだけだ」という点に注目してください。
さて、今回の例では、keyとなる1文字はorg.apache.hadoop.io.Textクラスで表現することにし、valueとなる値はorg.apache.hadoop.io.IntWritableクラスで表現することにします。前編では、「'a', 1」というペアを表現するために、MapEntryクラスを作りました。
MapEntry.java(再掲) |
public class MapEntry implements Comparable<MapEntry> {
public char key;
public int value;
public MapEntry(char k, int v) {
key = k;
value = v;
}
@Override
public int compareTo(MapEntry e) {
return e.key - key;
}
@Override
public boolean equals(Object o) {
return (o != null) &&
(o instanceof MapEntry) &&
(((MapEntry)o).key == key);
}
@Override
public int hashCode() {
return key;
}
} |
図1 MapEntryのクラス図(再掲)
しかし、Hadoopでは、JobConfクラスのsetOutputKeyClass()メソッド、setOutputValueClass()メソッドを使って指定するだけです。
またHadoopでは、Mapper→Reducerとデータが流れていくので、それぞれのフェイズで処理を行うクラスをsetMapperClass()メソッドなどで登録します。ここでは、Mapperにはsample.MapClassクラス、Reducerにはsample.Reduceクラスを登録しています。
ところで、setCombinerClass()メソッドというのがあることにも気が付きます。Hadoopでは、Mapperの出力をReducerへ渡す前に結合するクラスをユーザーが「Combiner」として指定できるようになっています。Combinerには、普通はReducerと同じクラスを登録すればいいので、ここではsample.Reduceクラスを指定しています。
なお、APIドキュメントがありますから、詳細はそちらを参考にしてください。
次に、Mapperクラスを作成します。先に前編のMapTaskクラスを見ておきましょう。
MapTask.java(再掲) |
public class MapTask {
// 指定された文字列から生成されるEntryのリスト
public java.util.List<MapEntry> list =
new java.util.LinkedList<MapEntry>();
public void execute(String target) {
byte[] bs = target.getBytes();
for (byte b : bs) {
// このサンプルでは、どの文字も同じ値 1を与える
MapEntry entry = new MapEntry((char) b, 1);
list.add(entry);
}
}
} |
これと対応するsample.MapClassクラスは、次のようになります。MapTaskクラスではbyte型で1文字ずつ取り出していましたが、今回はString型で1文字ずつ取り出し、org.apache.hadoop.io.Text型のデータへセットしています。
sample/MapClass.java |
package sample;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text c = new Text();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter)
throws IOException {
String line = value.toString();
int length = line.length();
for (int i = 0; i < length; i++) {
c.set(line.substring(i, i + 1));
output.collect(c, one);
}
}
} |
また、MapTaskクラスでは値として「1」を使っていましたが、ここではorg.apache.hadoop.io.IntWritable型のフィールド「one」が参照する値を指定しています。oneは1を値に持つIntWritable型オブジェクトを参照していますから、このクラスはMapTaskクラスと実質的には同じことをしているのが分かるはずです。
次はReducerクラスです。ここでも先に前編のReduceTaskクラスを見ておきましょう。次のとおりです。
ReduceTask.java(再掲) |
public class ReduceTask {
public int count;
public void execute(ReduceInput input) {
count = 0;
for (MapEntry e : input.list) {
count++;
}
MapReduceCharCounter.emit(input, count);
}
} |
これと対応するsample.Reduceクラスは次のようになります。
sample/Reduce.java |
package sample;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
} |
ReduceTaskクラスでは、ReduceInputクラスを自前で用意しましたが、Hadoopの場合は、reduce()メソッドに、Iterator<IntWritable>型で計算対象のデータリストが渡されます。そのため、これを使ってkeyの文字が何回現れたかをwhile文で計算をしています。こちらも、ReduceTaskクラスと実質的に同じ処理をしていることが分かるはずです。
前編では、MapTaskクラスやReduceTaskクラスが連携しやすくなるようにほかのクラスを用意しました。しかしHadoopでは、メインとMapper、Reducerクラスだけ自作すればプログラムを実行できます。
次ページでは、Hadoopを使ったプログラムをコンパイルして実行し、さらにHadoop用のEclipseプラグインについても紹介します。