MapReduce in Scala

| # Comments
この記事は Scala Advent Calendar jp 2010 の9日目です。

と言いつつ空気を読まずにMapReduceやっちゃいますよ。
簡易的にではありますが、GoogleやHadoopでおなじみ(?)のMapReduceフレームワークをScalaで実装してみました。

というわけで、これを実装したときのポイントや便利な機能などを挙げていこうと思います。

MapReduceって?

Googleが提唱した、シンプルかつ強力な大規模分散処理のためのプログラミングモデルです。
Hadoopというプロダクトがオープンソースで公開されていて、比較的容易に大規模分散処理を実現できるようになっています。

詳しい説明は他のサイト(HadoopWikiとか@ITとかmapreduceの画像検索結果とか)に譲ります。

実装

( ソースコードはgistにも置いてあります。 )

mapreduce.scala

実装したソースコードが以下です。

package object mapreduce {

  import _root_.scala.actors.Futures._
  import _root_.scala.collection.SortedMap

  class Mappable[KEYIN, VALUEIN](mappee: Iterable[(KEYIN, VALUEIN)]) {

    def mapper[KEYOUT, VALUEOUT](mapper: (KEYIN, VALUEIN) => Iterable[(KEYOUT, VALUEOUT)])(implicit ord: Ordering[KEYOUT]) : Iterable[(KEYOUT, VALUEOUT)] = {
      mappee.map { case (key, value) => future { mapper(key, value) } }.flatMap { _() }
    }
  }

  implicit def iterable2Mappable[A, B](m: Iterable[(A, B)]) = new Mappable(m)

  class Reducable[KEYIN, VALUEIN](reducee: Iterable[(KEYIN, VALUEIN)])(implicit ord: Ordering[KEYIN]) {

    def reducer[KEYOUT, VALUEOUT](reducer: (KEYIN, Iterable[VALUEIN]) => (KEYOUT, VALUEOUT)) : Iterable[(KEYOUT, VALUEOUT)] = {
      reducee.foldLeft(SortedMap.empty[KEYIN, List[VALUEIN]](ord)) {
        case (map, (key, value)) => {
          map + (key -> (value :: map.getOrElse(key, Nil)))
        }
      }.map { case (key, values) => future { reducer(key, values) } }.map { _() }
    }
  }

  implicit def iterable2Reducable[A, B](r: Iterable[(A, B)])(implicit ord: Ordering[A]) = new Reducable(r)(ord)
}

WordCount.scala

MapReduceのサンプルとしてよくあるWordCountプログラムを作りました。

object WordCount {

  def main(args: Array[String]) {

    import mapreduce._
    import _root_.scala.io.Source

    def textInputFormat(lines: Iterator[String], offset: Long = 0): Stream[(Long, String)] = {
      if(lines.hasNext) {
        val line = lines.next
        Stream.cons((offset, line), textInputFormat(lines, offset+line.length))
      }
      else {
        Stream.empty
      }
    }

    val source = Source.fromFile(args(0))
    try {
      textInputFormat(source.getLines).mapper {
        (offset, str) => {
          str.split("\\W+").collect { case word if word != "" => (word -> 1) }
        }
      }.reducer {
        (word, counts) => {
          word -> (counts.sum)
        }
      }.foreach { case (key, value) => println("%s: %d".format(key, value)) }
    } finally {
      source.close
    }
  }
}

コンパイルして実行してみてください。(第1引数に集計対象ファイル名を指定します。)
ファイルに含まれる単語と、その出現回数が表示されます。
単語はアルファベット順にソートされています。

ポイント

サンプルの前半では入力値をゴニョゴニョしています(textInputFormat関数)が、本体は後半です。

このサンプルのmapフェーズでは、ファイルの各行毎にmapper関数が呼ばれ、各行を単語に分割して、 (word -> 1) の組のリストを出力しています。

reduceフェーズでは、reducer関数が呼ばれる前にキーで並べ替え&同じキーに対応するバリューをリストにまとめる(shuffleフェーズ、正確にはreduceフェーズの前)という処理をします。
その後、各キー&バリューのリスト ( word -> ( 1, 1, 1, ... ) ) に対してreducer関数が呼ばれ、最終的な結果となります。

実はこの裏で行われるshuffleフェーズのおかげで、MapReduceがシンプルかつ強力なプログラミングモデルとなっています。
shuffleフェーズは、「魔法が生まれる場所」と言われています。

Scala的に

さて、本題です。
これを実装するにあたって利用したScala的なあれこれを少々。

Future

分散環境を模擬するために、mapper/reducer関数の呼び出しにfutureを使っています。

Futureとは、スレッドやアクターなどの非同期処理から返り値を受け取るためのパターンです。
Scalaでは標準でライブラリとして実装されていますので、お手軽に利用です。

返り値を受け取るためには、apply()メソッドを呼び出します。
もし処理が終わっていれば、その返り値を受け取れますし、終っていなければ終わるまで待ちます。

(implicit ord: Ordering[A])

あるメソッドの型引数が並べ替え可能であることを保証する必要がある場合があります。
このような場合には、implicitパラメータを使えばいいと思います。

def iterable2Reducable[A, B](r: Iterable[(A, B)])(implicit ord: Ordering[A]) = new Reducable(r)(ord)

このようにすると、Ordering[A]がどこかで定義されていなければ、メソッド呼び出しができないので、Aは並べ替え可能である、と保証できます。
A <: Ordered[A] である場合にも Ordering[A] が自動的に導かれるようになっています。

Stream

Streamは、無限リストを実現するためのクラスです。
この例では引数に指定したファイル長で終わってしまいますが、作り方によっては無限長にすることができます。

例えばフィボナッチ数列は

lazy val fib: Stream[BigInt] = Stream.cons(0, Stream.cons(1, fib.zip(fib.tail).map(p => p._1 + p._2)))

と書けるそうです。

実はまだあまりStreamを使いこなせないんですが、ハマれば強力な武器になります。

implicit conversion

これはさほど触れるまでもなく各所で使われている機能ですが、今回もこれを用いて元のIterableインスタンスからMappable/Reducableクラスのインスタンスに変換しています。

本当はmapper/reducerというメソッド名ではなく、map/reduceとしたかったのですが、元からあるメソッドと同名のメソッド(引数違い)ではimplicit conversionの手がかりにはならない(?)ようで、うまく変換されませんでした。

うまく行くやり方があるのであれば、教えていただきたいです。

まとめ

というわけでMapReduceの簡単な紹介とこれにまつわるScalaあれこれでした。
MapReduceの実装も簡易版ではありますが、Hadoopは敷居が高いな〜という人の入門編くらいには使えるのではないでしょうか。
いろいろなものをMapReduceして遊んでみると面白いと思います。

Hadoopでやってみたいという方は Hadoopカテゴリ にてすこしずつ紹介していますので、そちらも御覧いただければと思います。

comments powered by Disqus

Twitter Icon

AdSense

Creative Commons License
このブログはクリエイティブ・コモンズでライセンスされています。
Powered by Movable Type 5.14-ja

Google検索

カスタム検索

2013年10月

    1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31