この記事は Scala Advent Calendar jp 2010 の9日目です。
と言いつつ空気を読まずにMapReduceやっちゃいますよ。
簡易的にではありますが、GoogleやHadoopでおなじみ(?)のMapReduceフレームワークをScalaで実装してみました。
というわけで、これを実装したときのポイントや便利な機能などを挙げていこうと思います。
と言いつつ空気を読まずにMapReduceやっちゃいますよ。
簡易的にではありますが、GoogleやHadoopでおなじみ(?)のMapReduceフレームワークをScalaで実装してみました。
というわけで、これを実装したときのポイントや便利な機能などを挙げていこうと思います。
MapReduceって?
Googleが提唱した、シンプルかつ強力な大規模分散処理のためのプログラミングモデルです。Hadoopというプロダクトがオープンソースで公開されていて、比較的容易に大規模分散処理を実現できるようになっています。
詳しい説明は他のサイト(HadoopWikiとか@ITとかmapreduceの画像検索結果とか)に譲ります。
実装
( ソースコードはgistにも置いてあります。 )mapreduce.scala
実装したソースコードが以下です。package object mapreduce {
import _root_.scala.actors.Futures._
import _root_.scala.collection.SortedMap
class Mappable[KEYIN, VALUEIN](mappee: Iterable[(KEYIN, VALUEIN)]) {
def mapper[KEYOUT, VALUEOUT](mapper: (KEYIN, VALUEIN) => Iterable[(KEYOUT, VALUEOUT)])(implicit ord: Ordering[KEYOUT]) : Iterable[(KEYOUT, VALUEOUT)] = {
mappee.map { case (key, value) => future { mapper(key, value) } }.flatMap { _() }
}
}
implicit def iterable2Mappable[A, B](m: Iterable[(A, B)]) = new Mappable(m)
class Reducable[KEYIN, VALUEIN](reducee: Iterable[(KEYIN, VALUEIN)])(implicit ord: Ordering[KEYIN]) {
def reducer[KEYOUT, VALUEOUT](reducer: (KEYIN, Iterable[VALUEIN]) => (KEYOUT, VALUEOUT)) : Iterable[(KEYOUT, VALUEOUT)] = {
reducee.foldLeft(SortedMap.empty[KEYIN, List[VALUEIN]](ord)) {
case (map, (key, value)) => {
map + (key -> (value :: map.getOrElse(key, Nil)))
}
}.map { case (key, values) => future { reducer(key, values) } }.map { _() }
}
}
implicit def iterable2Reducable[A, B](r: Iterable[(A, B)])(implicit ord: Ordering[A]) = new Reducable(r)(ord)
}
WordCount.scala
MapReduceのサンプルとしてよくあるWordCount
プログラムを作りました。object WordCount {
def main(args: Array[String]) {
import mapreduce._
import _root_.scala.io.Source
def textInputFormat(lines: Iterator[String], offset: Long = 0): Stream[(Long, String)] = {
if(lines.hasNext) {
val line = lines.next
Stream.cons((offset, line), textInputFormat(lines, offset+line.length))
}
else {
Stream.empty
}
}
val source = Source.fromFile(args(0))
try {
textInputFormat(source.getLines).mapper {
(offset, str) => {
str.split("\\W+").collect { case word if word != "" => (word -> 1) }
}
}.reducer {
(word, counts) => {
word -> (counts.sum)
}
}.foreach { case (key, value) => println("%s: %d".format(key, value)) }
} finally {
source.close
}
}
}
コンパイルして実行してみてください。(第1引数に集計対象ファイル名を指定します。)
ファイルに含まれる単語と、その出現回数が表示されます。
単語はアルファベット順にソートされています。
ポイント
サンプルの前半では入力値をゴニョゴニョしています(textInputFormat
関数)が、本体は後半です。このサンプルの
map
フェーズでは、ファイルの各行毎にmapper
関数が呼ばれ、各行を単語に分割して、 (word -> 1)
の組のリストを出力しています。reduce
フェーズでは、reducer
関数が呼ばれる前にキーで並べ替え&同じキーに対応するバリューをリストにまとめる(shuffle
フェーズ、正確にはreduce
フェーズの前)という処理をします。その後、各キー&バリューのリスト
( word -> ( 1, 1, 1, ... ) )
に対してreducer
関数が呼ばれ、最終的な結果となります。実はこの裏で行われる
shuffle
フェーズのおかげで、MapReduceがシンプルかつ強力なプログラミングモデルとなっています。shuffle
フェーズは、「魔法が生まれる場所」と言われています。Scala的に
さて、本題です。これを実装するにあたって利用したScala的なあれこれを少々。
Future
分散環境を模擬するために、mapper
/reducer
関数の呼び出しにfuture
を使っています。Futureとは、スレッドやアクターなどの非同期処理から返り値を受け取るためのパターンです。
Scalaでは標準でライブラリとして実装されていますので、お手軽に利用です。
返り値を受け取るためには、
apply()
メソッドを呼び出します。もし処理が終わっていれば、その返り値を受け取れますし、終っていなければ終わるまで待ちます。
(implicit ord: Ordering[A])
あるメソッドの型引数が並べ替え可能であることを保証する必要がある場合があります。このような場合には、implicitパラメータを使えばいいと思います。
def iterable2Reducable[A, B](r: Iterable[(A, B)])(implicit ord: Ordering[A]) = new Reducable(r)(ord)
このようにすると、
Ordering[A]
がどこかで定義されていなければ、メソッド呼び出しができないので、A
は並べ替え可能である、と保証できます。A <: Ordered[A]
である場合にも Ordering[A]
が自動的に導かれるようになっています。Stream
Stream
は、無限リストを実現するためのクラスです。この例では引数に指定したファイル長で終わってしまいますが、作り方によっては無限長にすることができます。
例えばフィボナッチ数列は
lazy val fib: Stream[BigInt] = Stream.cons(0, Stream.cons(1, fib.zip(fib.tail).map(p => p._1 + p._2)))
と書けるそうです。
実はまだあまり
Stream
を使いこなせないんですが、ハマれば強力な武器になります。implicit conversion
これはさほど触れるまでもなく各所で使われている機能ですが、今回もこれを用いて元のIterable
インスタンスからMappable
/Reducable
クラスのインスタンスに変換しています。本当は
mapper
/reducer
というメソッド名ではなく、map
/reduce
としたかったのですが、元からあるメソッドと同名のメソッド(引数違い)ではimplicit conversionの手がかりにはならない(?)ようで、うまく変換されませんでした。うまく行くやり方があるのであれば、教えていただきたいです。
まとめ
というわけでMapReduceの簡単な紹介とこれにまつわるScalaあれこれでした。MapReduceの実装も簡易版ではありますが、Hadoopは敷居が高いな〜という人の入門編くらいには使えるのではないでしょうか。
いろいろなものをMapReduceして遊んでみると面白いと思います。
Hadoopでやってみたいという方は Hadoopカテゴリ にてすこしずつ紹介していますので、そちらも御覧いただければと思います。