みなさん、元気にMapReduceしてますか?HadoopのMapReduceをJavaで書くのはとっつきづらい、アドホックな処理に向かない、といった欠点があるため、それを克服するためのプロダクトがいろいろとあります。HiveやPigはMapReduceを楽にコーディングするための工夫ですね。最近ではClouderaからImpalaというプロダクトが出てきました。これはHiveの文法(HiveQL)が使え、かつMapReduceをせずに高速にクエリを実行できるという優れものです。新しいプロダクトが出てくるとわくわくしますね!でも本連載では今回もMapReduceをゴリゴリ書いていきます。よろしく!
なお、今回は扱うソースコードが多いため、弊社Webサイトからソースコードをまとめてダウンロードできるようにしています。興味がある方は、弊社Webサイトの右上にある検索窓で「HadoopでTwitterを分析してみた」と検索してみてください!
ソートはどこで行っているの?
前回作成した処理をおさらいしてみましょう。Mapフェーズでツイートは単語ごとに分解され、単語をkey、そのツイート内での出現回数をvalueとしたkey-valueペアでShuffleフェーズへと送られます。Shuffleフェーズでは各Map処理(Mapper)から送られてきたkey-valueのデータをkeyごとにまとめ、Reduceフェーズに送ります。この際、Reduce処理(Reducer)が複数あれば、各処理のデータ量が偏らないように分散してデータを送ります。Reduceフェーズではkey単位で送られてきたvalueのコレクションをすべて足し上げてkeyと共に出力します。
このように、MapReduceではShuffleフェーズの際にデータがkey単位でまとめられます。これを行うためにshuffleフェーズでは自動的にkeyによるソートが行われます。よって、この自動ソートをうまく利用すればきれいにソートされた出力結果が得られそうです。
しかし、今回行おうとしている「単語出現数の降順でのソート」を実現するためには大きな問題が3つあります。
- 問題1: ソートはkeyに対してのみ実施される。
- 問題2: ソート順はkeyに使用しているシリアライズ(オブジェクトをバイト列に変換)するためのクラスで定義されている。
- 問題3: Reducerが複数ある場合、デフォルトではkeyのハッシュ値に基づいて各処理にデータが分散される。
問題1から順に説明しましょう。ShuffleフェーズはMapフェーズの後に動作します。そのため、Mapperから出力されるkey項目でしかソートができません。今回は単語の出現数でソートしたいのですが、これはMapフェーズ終了時では未確定であり、しかもvalue項目に入っています。つまり、ソートするためには前回出力した結果をもう一度MapReduce処理にかける必要があるのです。前回作成した処理のReduceフェーズで最終出力を「key:出現数」、「value:単語」としたのはこのためです。
問題2は、ソート順を変更するにはクラスに手を入れなければならないことを意味しています。SQLのORDER BY句のような手軽さはありません。そのため、Hadoopの解説書ではPigやHiveなどのプロダクトを使用してソートすることを勧めていたりします。
問題3は、複数のReducerからの出力をそのままつなげても全体としてはソートされないことを意味しています。図2を見ていただくとわかる通り、処理されるkeyはReducerへハッシュ値を使って(言い換えればランダムに)割り振られていくため、各Reducer内ではソートされているものの、それぞれのReducerから出力されたファイル同士をつなげてもソートされた状態にはなりません。なお、動かすReducerを1つだけにすれば全体でソートされたファイルが1つ出来上がります。ただし、これではせっかく並列処理をさせるためにHadoopを使っているのに、魅力が半減してしまいますね。
それでは、どのような処理を追加・変更すればよいのでしょうか。具体的に見ていきましょう。