先ほどのMapReduceを実行すると、著者の寿命をKey、作品をベクトル化したValueが並んだファイルが出力されます。
例えば、以下のようなKey/Valueが出力されます(実際にはHadoop独自のバイナリ形式です)。
<86, {奉納:1,黄泉路:1,密教:3,胎子:2,山鉾:1...}> <41, {駄菓子:1,掌:2,チョンチョンチョン:1,凝り性:6...}> <52, {コンニャク:3,世間体:5,アルキメーデス:2...}>
では、このファイルを入力として、重みベクトルを計算していきましょう。まずはDriverから書きます。
1 public class IterativeParameterMixingDriver { 2 public static void main(String[] args) throws Exception { 3 Configuration conf = new Configuration(); 4 String[] otherArgs = new GenericOptionsParser(conf, args) 5 .getRemainingArgs(); 6 if (otherArgs.length < 2) { 7 System.err.println("Usage: hadoop <input file> <output dir> 8 "); 8 System.exit(2); 9 } 10 int maxIteration = conf.getInt("iteration.num", 15); 11 Path input = new Path(otherArgs[0]); 12 int iteration = 1; 13 while (maxIteration >= iteration) { 14 Path output = new Path(otherArgs[1], "weight-" + iteration); 15 if (iteration > 1) { 16 Path prevWeight = new Path(otherArgs[1], "weight-"+ (iteration - 1) + "/part-r-00000"); 17 conf.set("weight.file", prevWeight.toString()); 18 } 19 runIteration(conf, input, output); 20 iteration++; 21 } 22 } 23 private static void runIteration(Configuration conf, Path input, Path output) 24 throws IOException, InterruptedException, ClassNotFoundException { 25 Job job = new Job(conf, "least squares: " + output); 26 job.setJarByClass(IterativeParameterMixingDriver.class); 27 job.setMapperClass(LeastSquaresMapper.class); 28 job.setReducerClass(RegressionReducer.class); 29 job.setInputFormatClass(SequenceFileInputFormat.class); 30 job.setOutputFormatClass(SequenceFileOutputFormat.class); 31 job.setOutputKeyClass(NullWritable.class); 32 job.setOutputValueClass(MapWritable.class); 33 FileInputFormat.addInputPath(job, input); 34 FileOutputFormat.setOutputPath(job, output); 35 if (!job.waitForCompletion(true)) { 36 throw new InterruptedException("job failed: " + output); 37 } 38 } 39 }
「1回目のMapReduceで重みベクトルを計算し、2回目のMapReduceで、その重みベクトルを読み込んで更新する」プロセスを繰り返し実行するという流れです。
10行目は、そのMapReduceの繰り返し回数をコマンドラインで設定したオプションから読み込みます。デフォルトは15回です。
14〜21行目では、実際に繰り返し処理を行っています。例えば、2回目の繰り返しのMapReduceでは、「weight-2」というファイル名で出力し、14行目で設定しています。
さらにMapperで1回目の繰り返しで出力された重みベクトルを読み込まなければいけないので、16行目で「weight-1/part-r-00000」というファイル名をConfigurationに設定して、Mapperから参照できるようにします。
19行目のrunIteration関数でMapReduceを実行します。runIteration関数内の設定項目は、ほとんど解説したので省きます。
Mapperを記述します。
1 public class LeastSquaresMapper 2 extends Mapper<VIntWritable, MapWritable, NullWritable, MapWritable> { 3 protected MapWritable weightMap = new MapWritable(); 4 private double stepSize = 0.00000025; 5 @Override 6 public void setup(Context context) throws IOException,InterruptedException { 7 Configuration conf = context.getConfiguration(); 8 stepSize = conf.getFloat("sdm.step.size", (float) stepSize); 9 String weightFile = conf.get("weight.file"); 10 if (weightFile != null) { 11 FileSystem fs = FileSystem.get(conf); 12 Reader reader = new SequenceFile.Reader(fs, new Path(weightFile),conf); 13 try { 14 NullWritable key = NullWritable.get(); 15 reader.next(key, weightMap); 16 } finally { 17 reader.close(); 18 } 19 } 20 } 21 @Override 22 public void map(VIntWritable key, MapWritable value, Context context) 23 throws IOException, InterruptedException { 24 int b = key.get(); 25 value.put(new Text(CommonUtil.BIAS_KEY), new VIntWritable(1)); 26 double wx = DiscriminantFunctionAlgorithm.predict(value, weightMap); 27 for (Entry<Writable, Writable> entry : value.entrySet()) { 28 Text word = (Text) entry.getKey(); 29 int x = ((VIntWritable) entry.getValue()).get(); 30 DoubleWritable weightWritable = (DoubleWritable) weightMap.get(word); 31 double diffWeight = -stepSize * (wx - b) * x; 32 if (weightWritable == null) { 33 weightMap.put(word, new DoubleWritable(diffWeight)); 34 } else { 35 double w = weightWritable.get() + diffWeight; 36 weightWritable.set(w); 37 } 38 } 39 } 40 @Override 41 public void cleanup(Context context) throws IOException,InterruptedException { 42 context.write(NullWritable.get(), weightMap); 43 } 44 }
まずは、Mapperの初期化処理です。8行目は、Widrow-Hoffの学習規則のところで解説した刻み幅
を設定しています。9行目では、更新する重みベクトル(前回の繰り返しのMapReduceで出力した重みベクトル)のファイル名を読み込んで、10〜19行目で重みベクトルをweightMapに読み込んでいます。最初の繰り返しでは、重みベクトルはすべてゼロでスタートします。
map関数は、寿命をKey、作品ベクトルをValueに繰り返し呼び出され、重みベクトル(weightMap)を逐次更新していきます。24行目では、正しい寿命を設定しています。
25行目は、バイアス項というものを設定しています。解説しませんでしたが、実際はバイアス項bを加えて、「y=wx+b」として寿命を予測します。これは、「x=(6,4)」「w=(5,3)」の次元を1つ増やして、「x=(1,6,4)」「w=(b,5,3)」のようにしてwxを計算することと同じです。そこで、作品のベクトルに必ず値が「1」になるような適当なバイアスキーを設定しているわけです。
26行目では、与えられた作品のベクトルに対して、現在の重みから寿命の予測値を計算しています。predict関数の中身は以下の通りです。
public static double predict(MapWritable train, MapWritable weight) { double result = 0; for (Entry<Writable, Writable> entry : train.entrySet()) { Text word = (Text) entry.getKey(); int x = ((VIntWritable) entry.getValue()).get(); DoubleWritable w = (DoubleWritable) weight.get(word); if (w != null) result += x * w.get(); } return result; }
predict関数は、作品ベクトルと、重みベクトルを入力にして、wxを計算し、寿命の予測値を出力します。MapReduceで求まった重みベクトルに対して、テキストデータから寿命を予測したいというときも、この関数を使います。
Mapperの27〜39行目では、作品ベクトルの値とそれに対応する重みベクトルを1つずつ読み込んで、重みベクトルを更新しています。28行目が作品ベクトルの1つの次元に対応する単語、29行目がその頻度で、30行目で、対応する重みベクトルの次元の値を求めています。31行目でWidrow-Hoffの学習規則で更新する重みの差分を計算し、32〜37行目で重みを更新して、weightMapに設定しています。
cleanup関数は、Mapperの終了時に1回だけ呼び出される関数です。更新した重みベクトル、weightMapをValueにして出力しています。なお、Keyは必要ないので、NullWritableを設定しています。
次にReducerを解説していきます。
1 public class RegressionReducer 2 extends Reducer<NullWritable, MapWritable, NullWritable, MapWritable> { 3 @Override 4 public void reduce(NullWritable key, <IterableMapWritable> values, 5 Context context) throws IOException, InterruptedException { 6 MapWritable mergedMap = new MapWritable(); 7 int length = 0; 8 for (MapWritable value : values) { 9 for (Entry<Writable, Writable> entry : value.entrySet()) { 10 Text word = (Text) entry.getKey(); 11 DoubleWritable weight = (DoubleWritable) entry.getValue(); 12 DoubleWritable mergedWeight = (DoubleWritable) (mergedMap.get(word)); 13 if (mergedWeight == null) { 14 mergedMap.put(word, weight); 15 } else { 16 mergedWeight.set(mergedWeight.get() + weight.get()); 17 } 18 } 19 length++; 20 } 21 for (Entry<Writable, Writable> entry : mergedMap.entrySet()) { 22 DoubleWritable weight = (DoubleWritable) entry.getValue(); 23 weight.set(weight.get() / length); 24 } 25 context.write(NullWritable.get(), mergedMap); 26 } 27 }
Reducerでは、Mapperでそれぞれ独立して学習した重みベクトルを平均して出力します。MapperのキーはすべてNullWritable、つまり同一キーで設定されているので、reduce関数は1回だけ呼び出されることになります。
6行目のmergedMapは平均して出力する重みベクトルです。8〜20行目で、重みベクトルをそれぞれ足し合わせて、mergedMapに設定しています。22〜24行目で、mergedMapの値をMapperの数で割って平均を取っています。そして、25行目で出力しています。
実行は以下のようにします。刻み幅、MapReduceの繰り返し回数を引数で設定しています。
$ hadoop jar job.jar IterativeParameterMixingDriver -D sdm.step.size=0.00000025 -D iteration.num=15 /hdfs/path/to/input /hdfs/path/to/output
MapReduceで、重みベクトルが求まったので、任意のテキストから著者の寿命を予測してみます。
1 public class TestRegression { 2 public static void main(String[] args) throws Exception { 3 if (args.length < 2) { 4 System.err.println("Usage: cmd <test file> <train file>"); 5 return; 6 } 7 MapWritable testMap = CommonUtil.readTestFile(args[0]); 8 MapWritable weightMap = CommonUtil.readWeightFile(args[1]); 9 testMap.put(new Text(CommonUtil.BIAS_KEY), new VIntWritable(1)); 10 double age = DiscriminantFunctionAlgorithm.predict(testMap, weightMap); 11 System.out.println("" + age + ""); 12 } 13 }
7行目で、寿命を予測したいテキストファイルをベクトル化して、testMapに設定しています。8行目は、MapReduceで求めた重みベクトルのファイルを読み込んで、weightMapに設定しています。
9行目はバイアスキーを設定しています。10行目は、MapReduceプログラムのときに説明したDiscriminantFunctionAlgorithmクラスのpredict関数で、寿命を予測します。
例えば、こちらの記事のテキストから作者の寿命を予測してみましょう。
あなたは2.0285976773748837歳で死ぬでしょう。
今回は、テキストデータから著者の寿命を推定するための手法を解説し、実際にMapReduceプログラムを作成していきました。
しかし、今回の手法は、基本となる概念を説明するのにはいいのですが、極端に外れた値に左右されやすいなどさまざまな問題があります。
そこで、次回はPassive-Aggressiveなどのより良い手法を解説し、プログラムを作成していきましょう。
Copyright © ITmedia, Inc. All Rights Reserved.