HadoopではMapフェーズ、Shuffleフェーズ、Reduceフェーズの順で処理が行われます(第2回「『Hadoopの処理の流れ』を理解し、実践する」を参照)。ここで重要なルールは、Mapフェーズの出力は「"key<タブ>value"という形式で行えば、同じkeyのデータは必ず同じReducerに渡される」というものでした。
そのこと忘れずに、実際にHadoopで処理を実装していきましょう。
まずは、例として検索キーワードランキングを生成する場合を考えてみましょう。短い期間での計算であればもちろんリレーショナルデータベースを使って計算した方が簡単で早いですが、長期間での計算となれば(データ量にもよりますが)Hadoopで計算した方が良いのではないかと思います。
例えば入力データとしては以下のような、検索キーワードとユーザIDで構成されているログが流れてくるとします。
ナス,193032 お弁当,39110 きゅうり,1039001 カレー 豚肉,90123 ズッキーニ,82919 パスタ トマト,1281032 からあげ,72920 じゃがいも キャベツ,402918 トマト,2101302 ナス ピーマン,822924 お弁当 おかず,291038 きゅうり サラダ,224703 豚肉 たまねぎ,90123 ハンバーグ,1038733 たいやき,547291 トマト,392018 お弁当 ウインナー,39110 にんじん カレー,382027 キャベツ,1039200 なす トマト,2101302 卵 スープ,191938 ...
検索キーワードはスペースで区切られて複数のキーワードが指定されている場合もあり、それらは別々のキーワードとしてカウントするものとします。
Mapper、Reducerはこのようになります。
keyword_counter = Hash.new {|h,k| h[k] = 0 } ARGF.each do |log| log.chomp! keyword, user_id = log.split(',') keywords = keyword.split(' ') keywords.each do |keyword| keyword_counter[keyword] += 1 keyword_counter['__TOTAL__'] += 1 end end keyword_counter.each do |keyword, count| puts "#{keyword}\t#{count}"end end
keyword_counter = Hash.new {|h,k| h[k] = 0 } previous_keyword = nil ARGF.each do |log| log.chomp! keyword, count = log.split(/\t/) if keyword == previous_keyword keyword_counter[keyword] += count.to_i else if previous_keyword puts "#{previous_keyword}\t#{keyword_counter[previous_keyword]}" end previous_keyword = keyword keyword_counter[keyword] += count.to_i end end puts "#{previous_keyword}\t#{keyword_counter[previous_keyword]}"
手元でこのようにして動作確認が取れたら、入力データやMapper、ReducerはS3に上げておきます。
ではHadoopで処理してみましょう。まずEMRクラスタを起動します。
ここに先ほどのMapper、Reducerでステップを追加します。
このような結果が出力されるはずです。__TOTAL__は全検索回数を表します。
__TOTAL__ 32 おかず 1 お弁当 3 からあげ 1 きゅうり 2 じゃがいも 1 たいやき 1 たまねぎ 1 なす 1 にんじん 1 ウインナー 1 カレー 2 キャベツ 2 サラダ 1 スープ 1 ズッキーニ 1 トマト 4 ナス 2 ハンバーグ 1 パスタ 1 ピーマン 1 卵 1 豚肉 2
これで確かに欲しいデータは得られたわけですが、まだいくつか問題があります。
などです。それならば、EMRで出力されたファイルを連結して、それを並び替えてあげれば良さそうです。そういった処理を行うにはどうしたら良いのでしょうか。
Copyright © ITmedia, Inc. All Rights Reserved.