HBaseを使ったMapReduceを実装してみました。題材はいつもと同じですが、Scala版のみ実装しました。
- Hadoop: 0.20.2
- HBase: 0.20.5
- Scala: 2.8.0.RC6
を 利用しています。
$ git clone https://github.com/ueshin/hbase-aggregate.git
$ cd hbase-aggregate
$ git checkout hbase-aggregate-0.20.1で 利用できます。
また https://github.com/ueshin/hbase-aggregate/tree/hbase-aggregate-0.20.1 でブラウズできます。
概要
2つのMapReduceプログラムを作成します。CombinedLogLoaderAggregator
プログラムで利用する
accessテーブルの定義は次の通りです。- 行キー : リモートアドレス
- 列ファミリー : "log"
- 修飾子 : リクエストパス
- バージョン : リクエスト日時
- 値 :
CombinedLogインスタンス
hbase> create 'access', { NAME => 'log', VERSIONS => java.lang.Integer::MAX_VALUE }実装
準備
HBaseのライブラリは、Maven2レポジトリに登録されていないようですので、ローカルでインストールしてしまいます。必要なファイルは
hbase-0.20.5.jarとzookeeper-3.2.2.jarの2つです。$ mvn install:install-file -Dfile=${HBASE_HOME}/hbase-0.20.5.jar -DgroupId=org.apache.hadoop -DartifactId=hbase -Dversion=0.20.5 -Dpackaging=jar
$ mvn install:install-file -Dfile=${HBASE_HOME}/lib/zookeeper-3.2.2.jar -DgroupId=org.apache.hadoop -DartifactId=zookeeper -Dversion=3.2.2 -Dpackaging=jarCombinedLogLoader
HDFS上にあるアクセスログをパースしつつHBaseのaccessテーブルへロードします。CombinedLogLoader.scala
CombinedLogLoadMapper.scala
入力がファイルシステム上のテキストファイルなので、
Mapperへの入力はTextInputFormatを使います。また、今回は
Mapperの出力をそのままHBaseへと流せばよいので、Reducerにはorg.apache.hadoop.hbase.mapreduce.IdentityTableReducerを利用します。この場合、
Mapperの出力キーをImmutableBytesWritableクラス、出力値をPutクラスもしくはDeleteクラスとします。設定には
TableMapReduceUtil#initTableReduceJobメソッドを利用し、出力先テーブル名とReducerクラスを指定します。CombinedLogParser.scala
ログのパースには、以前作成した
CombinedLogParserを利用しました。Aggregator
HBaseにロードされたログを集計します。Aggregator.scala
AggregateMapper.scala
AggregateCombiner.scala
AggregateReducer.scala
こちらは
Mapperの入力がHBaseのテーブルとなるので、TableMapReduceUtil#initTableMapperJobメソッドで入力テーブル名、scan、Mapperクラスなどを設定します。Scanで、Mapperへの入力対象となる行や列、バージョンを指定できます。Mapperへは、1回の処理で1つの行がResultインスタンスとして入ってきますので、必要なデータを取り出して処理します。Combiner、Reducerはよくある集計処理です。実行
HadoopとHBaseを起動しておきます。また、
accessテーブルを作成しておきます。コンパイル
Maven2でコンパイルします。$ mvn clean packageデータロード
HDFS上にアクセスログを設置し、データロードを実行します。$ /usr/local/hadoop/bin/hadoop fs -ls input
Found 3 items
-rw-r--r-- 1 ueshin staff 8216533 2010-05-17 23:56 /user/ueshin/input/201001.log
-rw-r--r-- 1 ueshin staff 7994287 2010-05-17 23:56 /user/ueshin/input/201002.log
-rw-r--r-- 1 ueshin staff 8948672 2010-05-17 23:56 /user/ueshin/input/201003.log
$ sh target/appassembler/bin/loader inputロードできました。
HBase Shell で確認してみます。
hbase> count 'access'
Current count: 1000, row: 188.92.74.80
Current count: 2000, row: 219.42.56.38
Current count: 3000, row: 65.55.215.84
3451 row(s) in 0.9290 secondsデータが入っています。
集計
引き続き集計を行ないます。$ sh target/appassembler/bin/aggregator output
$ ${HADOOP_HOME}/bin/hadoop -ls output
$ ${HADOOP_HOME}/bin/hadoop -cat output/part-r-00000集計結果が表示されます。

