MapReduceのパターン、アルゴリズム、そしてユースケース

Ilya Katsov氏による「MapReduce Patterns, Algorithms, and Use Cases」の翻訳
http://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/
(下書きに入れて推敲するつもりが、なんか公開されてしまっていたので、あとでいろいろ修正すると思います)


February 1, 2012
この記事では、Webや科学論文で見られる異なるテクニックの体系的な視点を与えるために、数々のMapReduceパターンとアルゴリズムをまとめた。
いくつかの実用的なケーススタディも提供している。
すべての説明とコードスニペットでは、Mapper、Reducer、Combiner、Partitionaer、ソーティングにおいてHadoopの標準的なMapReduceモデルを利用します。このフレームワークは、次の図に描かれています。

基本的なMapReduceパターン

カウントと合計

問題:多数の文書があり、それぞれの文書は単語の集合である。すべての文書内の各単語の出現数の合計を計算する必要がある。
もしくは、単語への任意の関数でもよい。例えば、ログファイルがあり、それぞれのレコードがレスポンス時間を含み、そして、レスポンス時間の平均を計算する必要がある。


解法:
本当にシンプルなものから始めよう。下のコードスニペットは、それぞれの単語ごとの処理で単純に「1」を発するMapperと、「1」のリストを走査してそれらを足し合わせるReducerを示す。

class Mapper
   method Map(docid id, doc d)
      for all term t in doc d do
         Emit(term t, count 1)

class Reducer
   method Reduce(term t, counts [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + c
      Emit(term t, count sum)

このアプローチの明らかな欠点は、Mapperから発せられた多量のダミーカウンターである。Mapperはそれぞれの文章についてカウンターを足し合わせることで、カウンターの数を減らすことができる。

class Mapper
   method Map(docid id, doc d)
      H = new AssociativeArray
      for all term t in doc d do
          h{t} = h{t} + 1
      for all term t in H do
         Emit(term t, count H{t})

ひとつの文章のためではなく、ひとつのMapperノードで処理されるすべての文章のためのカウンターを計算するために、Combinerを投入することができる。

class Mapper
   method Map(docid id, doc d)
      for all term t in doc d do
         Emit(term t, count 1)

class Combiner
   method Combine(term t, [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + c
      Emit(term t, count sum)

class Reducer
   method Reduce(term t, counts [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + c
      Emit(term t, count sum)


応用:
ログ解析、データクエリー

照合

問題:
要素の集合があって、ひとつの要素に対する関数がある。関数の値が同じになるすべての要素をひとつのファイルに保存するか、グループとして処理するすべての要素に対してほかの計算を行う必要がある。もっとも典型的な例は、転置インデックスの構築である。


解法:
解法はそのままである。Mapperはそれぞれの要素について与えられた関数を計算し、関数の値をキーとして出力し、要素はそれ自体を値として出力する。Reducerは関数の値によってグループ化されたすべての要素を得て、それらの処理や保存を行う。転置インデックスの場合は、要素は単語であり、関数は要素がみつかった文章のIDである。


応用:
転置インデックス、ETL
※extract, transform and load

フィルタリング(grep)、パース、バリデーション

問題:
レコードの集合があって、ある条件に合うかそれぞれのレコードを(他のレコードと独立して)別の表現に変換するすべてのレコードを集める必要がある。


解法:
解法は全くそのままである。Mapperはひとつずつのレコードを取り、受け取った要素か変換結果を出力する。


応用:
ログ解析、データクエリー、ETL、データバリデーション

分散タスク処理

問題:
複数の部分に分割できて、すべての部分からの結果を結合すると最終的な結果になるような、大きな計算問題がある。


解法:
問題の記述は要求内容の集合に分割されていて、要求内容はMapperの入力データとして保存される。それぞれのMapperは要求内容を取り、対応する計算を行い、結果を出力する。Reducerはすべての出力された部分を最終的な結果に統合する。


ケーススタディ:デジタル通信システムのシミュレーション
システムモデル内でいくつかのランダムなデータを受け渡し、スループットの誤り確率を計算する、WiMAXのようなデジタル通信システムがある。それぞれのMapperは必要なサンプリングの1/N番目の決められた量のデータのシミュレーションを行い、誤り確率を出力する。Reducerは誤り確率の平均を計算する。


応用:
物理的、工学的シミュレーション、数値解析、パフォーマンステスト

ソート

問題:
レコードの集合があってそれらのレコードをいくつかのルールで整列する必要があり、またはレコードを特定の順序で処理する必要がある。


解法:
単純なソートは完全にそのままである。Mapperはまさにすべての要素をソートキーに結び付けられた値として出力し、要素の関数のして組み立てられる。それでも、実際のソートはしばしば本当に技巧的な方法で使われ、MapReduce(Hadoop)の心臓部だといわれる理由でもある。特に、複合キーを二次的なソートやグルーピングに使うのは非常に一般的である。


MapReduceでのソートはもともとキーと値のペアをキーでソートするようになっているが、Hadoop実装を活用して値によるソートを行うがテクニックが存在する。
詳細についてはこちらのブログを参照。
http://www.riccomini.name/Topics/DistributedComputing/Hadoop/SortByValue/


もしMapReduceを(途中のデータはなく)元のデータをソートするために使うなら注目に値し、BigTableのコンセプトを使って継続的にデータがソートされた状態を維持するには、しばしば良いアイデアになる。言い換えれば、データをMapReduceクエリーのたびではなくデータ挿入の間に一度ソートすることは、もっと効果的になりうる。


応用:
ETL、データ解析

基本的ではないMapReduceパターン

走査的メッセージパッシング(グラフ処理)

問題:
エンティティとそれらの間の関係によるネットワークがある。それらの隣のエンティティの属性に基づいた、それぞれのエンティティの状態を計算する必要がある。この状態は他のノードへの距離や、隣に特定の属性をもったエンティティがあること、近所の密度の特徴などを示します。


解法:
ネットワークはノードのセットとして保存され、それぞれのノードは近隣のノードのIDの一覧を含む。概念的に、MapReduceジョブは走査的な方法でふるまい、それぞれのノードへのそれぞれの走査で、隣のエンティティにメッセージを送る。それぞれの隣人は受け取ったメッセージに基づいて状態を更新する。
走査は固定の最大走査回数(ネットワークの直径)や、2つの連続した走査の間での無視できる更新のような、いくつかの条件によって終了する。技術的な観点からは、Mapperは近隣のノードのIDをキーとして使ってそれぞれのノードを出力する。結果として、すべてのメッセージは受け取り側ノードでグループ化され、Reducerは状態を再計算して新しい状態でノードを上書きできる。このアルゴリズムを次のコードに示す。

class Mapper
   method Map(id n, object N)
      Emit(id n, object N)
      for all id m in N.OutgoingRelations do
         Emit(id m, message getMessage(N))

class Reducer
   method Reduce(id m, [s1, s2,...])
      M = null
      messages = []
      for all s in [s1, s2,...] do
          if IsObject(s) then
             M = s
          else               // s is a message
             messages.add(s)
      M.State = calculateState(messages)
      Emit(id m, item M)

ひとつのノードの状態はスパース過ぎないネットワークのすべてのネットワークの間を迅速に伝播することは強調すべきである。すべてのノードはこのノードがすべての隣人たちへ「感染」を開始することで「感染される」ためである。このプロセスを下の図に示す。


ケーススタディ:カテゴリのツリーを通じた可能性伝播
問題:
この問題は実生活でのeコマースタスクに触発された。大きなカテゴリ(男性・女性・子供など)から小さなカテゴリ(男性用ジーンズ・女性用ドレスなど)に枝分かれして、最終的に末端の小さなカテゴリ(男性用ブルージーンズなど)に行きつくようなカテゴリーのツリーがある。末端のカテゴリは有効(商品を含む)であったり無効であったりする。いくつかの高階層のカテゴリは、サブツリーの中に少なくともひとつの有効な末端カテゴリがある場合に有効になる。コールは末端のカテゴリの有効性がわかっているときに、すべてのカテゴリの有効性を計算することである。


解法:
この問題は前節で記述したフレームワークを使って解決することができる。getMessageメソッドとcalculateStateメソッドを次のように定義する。

class N
   State in {True = 2, False = 1, null = 0}, initialized 1 or 2 for end-of-line categories, 0 otherwise

method getMessage(object N)
   return N.State

method calculateState(state s, data [d1, d2,...])
   return max( [d1, d2,...] )


ケーススタディ幅優先探索
問題:
グラフがあり、ひとつのソースノードからグラフ中のほかの全てのノードへの距離(ホップ数)を計算する必要がある。


解法:
ソースノードはoを全ての隣人に出力し、それらの隣人はこのカウンターを1増やしてそれぞれのホップの間で伝播させる。

class N
   State is distance, initialized 0 for source node, INFINITY for all other nodes

method getMessage(N)
   return N.State + 1

method calculateState(state s, data [d1, d2,...])
   min( [d1, d2,...] )


ケーススタディページランクとマッパー側データ集約
このアルゴリズムGoogleによって提案され、Webページの妥当性をこのページにリンクしているページの信頼性(PageRank)の関数として計算する。実際のアルゴリズムは全く複雑だが、その核心部分は、それぞれのノードで入ってくる重みの平均を重みとして計算する、ノード間の重みの伝播である。

class N
    State is PageRank

method getMessage(object N)
    return N.State / N.OutgoingRelations.size()

method calculateState(state s, data [d1, d2,...])
    return ( sum([d1, d2,...]) )

利用するスキーマは一般的すぎて、状態が数値であるという事実を活用していないことを言及する価値がある。多くの実用的な場面で、この事実のおかげで値の集計をMapper側で行うことができる。この最適化は次のコードスニペットで(PageRankアルゴリズム用に)示される。

class Mapper
   method Initialize
      H = new AssociativeArray
   method Map(id n, object N)
      p = N.PageRank  / s.OutgoingRelations.size()
      Emit(id n, object N)
      for all id m in N.OutgoingRelations do
         H{m} = H{m} + p
   method Close
      for all id n in H do
         Emit(id n, value H{n})

class Reducer
   method Reduce(id m, [s1, s2,...])
      M = null
      for all s in [s1, s2,...] do
          if IsObject(s) then
             M = s
          else
             s = s + p
      M.PageRank = s
      Emit(id m, item M)


応用:
グラフ解析、Webインデクシング

別個の値(ユニークアイテムカウント)

問題:
フィールドとしてFとGを含むレコードの集合がある。同じGを持つレコードのサブセットについて、フィールドFのユニーク値の合計を数える(Gでグループ化される)。


この問題は、ファセット検索としてちょっとした一般化と定式化をすることができる。
(訳注 ファセット検索:タグなどでの検索)


問題:
レコードの集合がある。それぞれのレコードはフィールドFと任意の数のカテゴリラベルG={G1, G2, ...}を持つ。それぞれのラベルの値によるそれぞれのレコードサブセットについてフィールドFのユニーク値の数を数える。


例:

Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}

Result:
a -> 3   // F=1, F=2, F=3
b -> 2   // F=1, F=3
d -> 1   // F=2
e -> 1   // F=2


解法I:
最初のアプローチは2つのステージで問題を解く。最初のステージでは、MapperはFとGのそれぞれのペアについてダミーカウンタを出力する。Reducerはそのようなペアのそれぞれについて発生数の合計を計算する。このフェーズの主なゴールはFの値のユニークさの保証である。2番目のフェーズでは、ペアはGでグループ化され、それぞれのグループの要素の総個数が計算される。


フェーズI:

class Mapper
   method Map(null, record [value f, categories [g1, g2,...]])
      for all category g in [g1, g2,...]
         Emit(record [g, f], count 1)

class Reducer
   method Reduce(record [g, f], counts [n1, n2, ...])
      Emit(record [g, f], null )


フェーズII:

class Mapper
   method Map(null, record [value f, categories [g1, g2,...] )
      for all category g in [g1, g2,...]
          Emit(value f, category g)

class Reducer
   method Initialize
      H = new AssociativeArray : category -> count
   method Reduce(value f, categories [g1, g2,...])
      [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
      for all category g in [g1', g2',...]
         H{g} = H{g} + 1
   method Close
      for all category g in H do
         Emit(category g, count H{g})


解法II:
2番目の解法はひとつのMapReduceジョブしか必要としない。しかしそれは本当にスケールするわけではなく、応用性は制限されている。アルゴリズムはシンプルである。Mapperは値とカテゴリを出力し、Reducerはそれぞれの値についてのカテゴリのリストから重複を除き、それぞれのカテゴリについてカウンタを増加させます。最終ステップでは、Reducerが出力したすべてのカウンタを合計します。
このアプローチは、同じfの値をもつレコードの数がそんなに高くなく、カテゴリの総数もまた制限されているときに適用できます。例えば、このアプローチはWebログの処理やユーザー分類の処理について適用できます。ユーザーの総数が高く、しかしひとりのユーザーに対するイベントの数が制限されて、分類するカテゴリーも同様の場合。このやり方では、Reducerにデータが送られる前に、カテゴリリストから重複を排除するためにCombinerが使えることは特筆に価する。

class Mapper
   method Map(null, items [i1, i2,...] )
      for all item i in [i1, i2,...]
         for all item j in [i1, i2,...]
            Emit(pair [i j], count 1)

class Reducer
   method Reduce(pair [i j], counts [c1, c2,...])
      s = sum([c1, c2,...])
      Emit(pair[i j], count s)


応用:
ログ解析、ユニークユーザーカウント

相互相関

問題:
要素のタプルの集合がある。要素のそれぞれのありうるペアについて、それらの要素が同時に起こるタプルの数を計算する。要素の総個数がNならば、N*N個の値が報告される。


この問題はテキスト解析(要素は単語、タプルはセンテンスと言う)、市場分析(これを買った顧客があれも買う傾向があるか)で表れる。N*Nがまったく小さく、そのようなマトリクスが一台のマシンのメモリに納まれば、実装はそのままできる。


ペアアプローチ:
最初のアプローチはすべてのペアとダミーのカウンタをMapperから出力し、Reducerでそれらのカウンタを合計する。欠点は

  • Combinerの利点が、すべてのペアが別個のような場合など限定的
  • インメモリでの計算がない
class Mapper
   method Map(null, items [i1, i2,...] )
      for all item i in [i1, i2,...]
         for all item j in [i1, i2,...]
            Emit(pair [i j], count 1)

class Reducer
   method Reduce(pair [i j], counts [c1, c2,...])
      s = sum([c1, c2,...])
      Emit(pair[i j], count s)

ストライプアプローチ:

2番目のアプローチはペアの最初の要素によってデータをグループ化し、すべての隣接要素についてのカウンタが計算される連想配列(ストライプ)として扱う。Reducerは基準になる要素iについてのすべてのストライプを受け取り、統合し、ペア亜プローとと同じ結果を出力する。

  • 少ない中間キーの生成。ゆえにフレームワークはソートを少なくできる。
  • Combinerからの多大な貢献
  • インメモリでの計算を行う。これは適切に実装されないと問題になりうる。
  • より複雑な実装
  • 一般的に、「ストライプ」は「ペア」より速い。
class Mapper
   method Map(null, items [i1, i2,...] )
      for all item i in [i1, i2,...]
         H = new AssociativeArray : item -> counter
         for all item j in [i1, i2,...]
            H{j} = H{j} + 1
         Emit(item i, stripe H)

class Reducer
   method Reduce(item i, stripes [H1, H2,...])
      H = new AssociativeArray : item -> counter
      H = merge-sum( [H1, H2,...] )
      for all item j in H.keys()
         Emit(pair [i j], H{j})

応用:
テキスト解析、市場分析


参照:
Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce
(邦訳:Hadoop MapReduce デザインパターン ―MapReduceによる大規模テキストデータ処理)

リレーショナルMapReduceパターン

この章では、主なリレーショナル操作を並べて、これらの操作がMapReduceの文法でどのように実装できるか議論する。

セレクション
class Mapper
   method Map(rowkey key, tuple t)
      if t satisfies the predicate
         Emit(tuple t, null)
プロジェクション(投影)

プロジェクションはセレクションに比べて少し複雑だが、この場合ありうる重複の排除にReducerを使うべきである。

class Mapper
   method Map(rowkey key, tuple t)
      tuple g = project(t)  // extract required fields to tuple g
      Emit(tuple g, null)

class Reducer
   method Reduce(tuple t, array n)   // n is an array of nulls
      Emit(tuple t, null)
ユニオン

Mapperには複合させる二つの集合のすべてのレコードが与えられる。Reducerは重複を排除するために使われる。

class Mapper
   method Map(rowkey key, tuple t)
      Emit(tuple t, null)

class Reducer
   method Reduce(tuple t, array n)   // n is an array of one or two nulls
      Emit(tuple t, null)
交差

Mapperには交差させる二つの集合のすべてのレコードが与えられる。Reducerは2度現れたレコードだけを出力する。これは両方の集合がこのレコードを含む場合だけ可能である。なぜならレコードは主キーを含みひとつの集合に一度だけしか現れない。

class Mapper
   method Map(rowkey key, tuple t)
      Emit(tuple t, null)

class Reducer
   method Reduce(tuple t, array n)   // n is an array of one or two nulls
      if n.size() = 2
          Emit(tuple t, null)
相違

2つのレコード集合を持ってみよう。RとSとする。RとSの違いを計算したい。Mapperはすべてのタプルとレコードがどの集合から来たかわかる名前のタグを出力する。ReducerはRからきてSから来ていないレコードのみを出力する。

class Mapper
   method Map(rowkey key, tuple t)
      Emit(tuple t, string t.SetName)    // t.SetName is either 'R' or 'S'

class Reducer
   method Reduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R']
      if n.size() = 1 and n[1] = 'R'
          Emit(tuple t, null)
グループ化と集約

グループ化と集約は、次のようにひとつのMapReduceジョブで行える。Mapperはそれぞれのタプルの値を抜き出してグループ化や集約、それらの出力を行う。Reducerはすでにグループ化された集約する値を受け取り、集約関数の演算を行う。合計や最大値のような典型的な集約関数はストリーミング方式で計算できる。ゆえにすべての値のハンドルは同時には不要である。とはいえ、いくつかの場合では2フェーズMapReduceジョブが求められるだろう。「別個の値」パターンをサンプルとして見るといい。

class Mapper
   method Map(null, tuple [value GroupBy, value AggregateBy, value ...])
      Emit(value GroupBy, value AggregateBy)
class Reducer
   method Reduce(value GroupBy, [v1, v2,...])
      Emit(value GroupBy, aggregate( [v1, v2,...] ) )  // aggregate() : sum(), max(),...
ジョイン

ジョインはMapReduceフレームワーク内で完全に可能であるが、効率や適応するデータのボリュームで異なるいくつかのテクニックが存在する。この章では、いくつかの基本的なアプローチについて勉強する。参照のセクションにはジョインのテクニックの詳細な研究へのリンクを含む。


パーティションジョイン(Reduceジョイン、ソートマージジョイン)


このアルゴリズムは、二つの集合RとLをあるキーkでジョインする。MapperはRとLのすべてのタプルを処理し、タプルからキーkを抜き出し、このタプルが(RかLか)どの集合から来たか示すタグで印をつけ、タグ付けされたタプルをkをキーとして出力する。Reducerは特定のキーkについてのすべてのタプルを受け取り、RについてとLについてのふたつのバケットに入れる。ふたつのバケットが満たされたとき、Reducerはそれらの上でネストしたループを走らせ、バケットのクロスジョインを出力する。出力されたそれぞれのタプルはRタプル、Lタプル、そしてキーkで連結される。このアプローチは次の点で不利である。

  • Mapperは絶対的にすべてのデータを、ひとつの集合でしか現れずもう片方にペアがないキーでさえも出力する。
  • Reducerはひとつのキーに対するすべてのデータをメモリに置くことになる。もしデータがメモリにフィットしないならば、これをある種のスワップによってとりまわすのはReducerの責任である。


とはいえリパーティションジョインは、ほかの最適化されたテクニックが実用的でないときでもうまく使える、最も一般的なテクニックである。

class Mapper
   method Map(null, tuple [join_key k, value v1, value v2,...])
      Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] )

class Reducer
   method Reduce(join_key k, tagged_tuples [t1, t2,...])
      H = new AssociativeArray : set_name -> values
      for all tagged_tuple t in [t1, t2,...]     // separate values into 2 arrays
         H{t.tag}.add(t.values)
      for all values r in H{'R'}                 // produce a cross-join of the two arrays
         for all values l in H{'L'}
            Emit(null, [k r l] )


リプリケートジョイン(マップジョイン、ハッシュジョイン)


実際には、小さな集合と大きな集合(言うなら、ユーザーのリストとログレコードのリスト)でのジョインが典型的である。
RとLの2つの集合をジョインすると仮定しよう。Rは相対的に小さいとする。もしそうなら、RはすべてのMapperに分散できて、それぞれのMapperはそれを読み込み、ジョインキーによってインデックスできる。ここでのもっとも共通で効果的なインデックステクニックはハッシュテーブルである。このあと、Mapperは集合Lのタプルを走査し、ハッシュテーブルに保持されているRからの対応するタプルとジョインする。このアプローチは非常に効果的である。なぜならネットワーク越しでの集合Lのソートや変換の必要がないからである。しかし、RはすべてのMapperに分散できるほど本当に小さくなくてはならない。

class Mapper
   method Initialize
      H = new AssociativeArray : join_key -> tuple from R
      R = loadR()
      for all [ join_key k, tuple [r1, r2,...] ] in R
         H{k} = H{k}.append( [r1, r2,...] )

   method Map(join_key k, tuple l)
      for all tuple r in H{k}
         Emit(null, tuple [k r l] )


参照:

  1. Join Algorithms using Map/Reduce
  2. Optimizing Joins in a MapReduce Environment